Apache Kafka是一个分布式流媒体平台,用于存储和处理高吞吐量数据流。
I'm evaluating the use of Azure Event Hub vs Kafka as a Service Broker. 我希望我能够并排创建两个本地应用程序,一个使用Kafka使用Azure Event Hub使用Kafka消费消息。 我已经设置了一个Docker容器,该容器是一个Kafka实例,并且正在使用Azure帐户设置Azure Event Hub的过程(据我所知,没有其他方法可以为Azure Event Hub创建本地/开发实例)。
package main import ( "context" "encoding/json" "fmt" "log" "time" "github.com/google/uuid" "github.com/segmentio/kafka-go" ) func main() { addr := kafka.TCP("localhost:9092") topic := "topic-" + uuid.New().String() client := &kafka.Client{ Addr: addr, Timeout: 10 * time.Second, } if _, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{ Addr: addr, Topics: []kafka.TopicConfig{ { Topic: topic, NumPartitions: 1, ReplicationFactor: 1, }, }, }); err != nil { log.Fatalf("create topic: %v", err) } log.Println("Created topic", topic) writer := &kafka.Writer{ Addr: addr, Topic: topic, } writeMessage := func(msg string) { if err := writer.WriteMessages(context.Background(), kafka.Message{ Value: []byte(msg), }); err != nil { log.Fatalf("write messages: %v", err) } } writeMessage("one") writeMessage("two") listOffsetsResponse, err := client.ListOffsets(context.Background(), &kafka.ListOffsetsRequest{ Addr: addr, Topics: map[string][]kafka.OffsetRequest{ topic: { {Partition: 0}, }, }, }) if err != nil { log.Fatalf("list offsets: %v", err) } fmt.Println(jsonMarshal(listOffsetsResponse.Topics[topic])) } func jsonMarshal(v any) string { b, err := json.Marshal(v) if err != nil { log.Fatalf("marshal %T to JSON: %v", v, err) } return string(b) }
version: '3.7' services: zookeeper: image: 'bitnami/zookeeper:latest' ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:latest' ports: - '9092:9092' environment: - KAFKA_BROKER_ID=1 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes depends_on: - zookeeper pinot-controller: image: apachepinot/pinot:latest hostname: pinot-controller ports: - "9000:9000" command: StartController -zkAddress zookeeper:2181 environment: JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx8G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xlog:gc:gc-pinot-controller.log" depends_on: - kafka pinot-broker: image: apachepinot/pinot:latest hostname: pinot-broker ports: - "8099:8099" command: StartBroker -zkAddress zookeeper:2181 environment: JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xlog:gc:gc-pinot-broker.log" depends_on: - zookeeper - kafka - pinot-controller pinot-server: image: apachepinot/pinot:latest hostname: pinot-server ports: - "8098:8098" command: StartServer -zkAddress zookeeper:2181 environment: JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx16G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xlog:gc:gc-pinot-server.log" depends_on: - zookeeper - kafka - pinot-controller
I创建了一个使用Kafka的Spring Boot应用程序,我将其部署在Kubernetes群集上。 我面临错误,指出已部署的Spring Boot应用程序无法解决Bootstrap URL
当我尝试部署我的春季启动应用程序时,我遇到了这个错误:
我想使用Bytewax从Kafka消耗流以执行聚合。不幸的是,我无法连接到Kafka,并且连接总是被拒绝。我认为港口SE ...
docker-compose.yml
kafka.linger-ms=5 kafka.acks=all kafka.enable-idempotence-config=true kafka.delivery-timeout-ms=25000 kafka.max-in-flight=5 kafka.retry-backoff-ms=1000 kafka.request-timeout-ms=20000
I使用Confluent-kafka v1.3.0,我在消费者组会话超时方面存在以下问题。 我的配置看起来像: c ['kafka'] = { 'bootstrap.servers':'host.docker.internal:9104',, '缺点...
docker-compose.yml
我正在为我的应用程序开发基于微服务的体系结构,该架构由以下服务组成:
应该如何完成微服务的身份验证...使用私有式钥匙并通过每种服务验证令牌,或者我应该使用API网关处理身份验证并重定向?
Nifi-以JSON格式管理Kafka消息的消费并在CSV中以高性能写作
感谢您的帮助 我想尝试以下NIFI流量:divrapekafka-> mergeContent-> convertjsontoavro-> updaterecord-> convertavrotocsv-> putfile 因此,请阅读10,000个块的Kafka消息,通过合并将消息分组,然后将所有消息处理在一起,然后创建一个CSV文件。你怎么认为 ?您还有其他建议吗?
我想汇总我的数据流以每5秒返回值的总和(最终目标是平均5秒)。使用以下类总和()我能够返回继续...