apache-kafka 相关问题

Apache Kafka是一个分布式流媒体平台,用于存储和处理高吞吐量数据流。

我有一个卡夫卡消费者:

然后在处理过程中使用FileDownLoadService。

回答 0 投票 0


ZURE活动中心与Kafka作为服务经纪人

I'm evaluating the use of Azure Event Hub vs Kafka as a Service Broker. 我希望我能够并排创建两个本地应用程序,一个使用Kafka使用Azure Event Hub使用Kafka消费消息。 我已经设置了一个Docker容器,该容器是一个Kafka实例,并且正在使用Azure帐户设置Azure Event Hub的过程(据我所知,没有其他方法可以为Azure Event Hub创建本地/开发实例)。

回答 5 投票 0

为什么Kafka GO的ListOffsets方法在向主题产生消息后显示偏移的零时间? 我正在尝试运行这个简化的示例程序: 包装主 进口 ( “语境” “编码/JSON” “ FMT” “日志” “时间” “

package main import ( "context" "encoding/json" "fmt" "log" "time" "github.com/google/uuid" "github.com/segmentio/kafka-go" ) func main() { addr := kafka.TCP("localhost:9092") topic := "topic-" + uuid.New().String() client := &kafka.Client{ Addr: addr, Timeout: 10 * time.Second, } if _, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{ Addr: addr, Topics: []kafka.TopicConfig{ { Topic: topic, NumPartitions: 1, ReplicationFactor: 1, }, }, }); err != nil { log.Fatalf("create topic: %v", err) } log.Println("Created topic", topic) writer := &kafka.Writer{ Addr: addr, Topic: topic, } writeMessage := func(msg string) { if err := writer.WriteMessages(context.Background(), kafka.Message{ Value: []byte(msg), }); err != nil { log.Fatalf("write messages: %v", err) } } writeMessage("one") writeMessage("two") listOffsetsResponse, err := client.ListOffsets(context.Background(), &kafka.ListOffsetsRequest{ Addr: addr, Topics: map[string][]kafka.OffsetRequest{ topic: { {Partition: 0}, }, }, }) if err != nil { log.Fatalf("list offsets: %v", err) } fmt.Println(jsonMarshal(listOffsetsResponse.Topics[topic])) } func jsonMarshal(v any) string { b, err := json.Marshal(v) if err != nil { log.Fatalf("marshal %T to JSON: %v", v, err) } return string(b) }

回答 1 投票 0

我有以下问题。

version: '3.7' services: zookeeper: image: 'bitnami/zookeeper:latest' ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:latest' ports: - '9092:9092' environment: - KAFKA_BROKER_ID=1 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes depends_on: - zookeeper pinot-controller: image: apachepinot/pinot:latest hostname: pinot-controller ports: - "9000:9000" command: StartController -zkAddress zookeeper:2181 environment: JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx8G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xlog:gc:gc-pinot-controller.log" depends_on: - kafka pinot-broker: image: apachepinot/pinot:latest hostname: pinot-broker ports: - "8099:8099" command: StartBroker -zkAddress zookeeper:2181 environment: JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xlog:gc:gc-pinot-broker.log" depends_on: - zookeeper - kafka - pinot-controller pinot-server: image: apachepinot/pinot:latest hostname: pinot-server ports: - "8098:8098" command: StartServer -zkAddress zookeeper:2181 environment: JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx16G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xlog:gc:gc-pinot-server.log" depends_on: - zookeeper - kafka - pinot-controller

回答 0 投票 0




回答 1 投票 0



KAFKAPYTHON消费者组会话定时出现

I使用Confluent-kafka v1.3.0,我在消费者组会话超时方面存在以下问题。 我的配置看起来像: c ['kafka'] = { 'bootstrap.servers':'host.docker.internal:9104',, '缺点...

回答 1 投票 0


我正在为我的应用程序开发基于微服务的体系结构,该架构由以下服务组成:

应该如何完成微服务的身份验证...使用私有式钥匙并通过每种服务验证令牌,或者我应该使用API网关处理身份验证并重定向?

回答 0 投票 0


Nifi-以JSON格式管理Kafka消息的消费并在CSV中以高性能写作

感谢您的帮助 我想尝试以下NIFI流量:divrapekafka-> mergeContent-> convertjsontoavro-> updaterecord-> convertavrotocsv-> putfile 因此,请阅读10,000个块的Kafka消息,通过合并将消息分组,然后将所有消息处理在一起,然后创建一个CSV文件。你怎么认为 ?您还有其他建议吗?

回答 1 投票 0


回答 2 投票 0

用卡夫卡水印策略

我想汇总我的数据流以每5秒返回值的总和(最终目标是平均5秒)。使用以下类总和()我能够返回继续...

回答 1 投票 0


最新问题
© www.soinside.com 2019 - 2025. All rights reserved.