我想使用Bytewax从Kafka消耗流以执行聚合。不幸的是,我无法连接到Kafka,并且连接总是被拒绝。我认为港口SE ...

问题描述 投票:0回答:1
services: kafka: image: apache/kafka ports: - "9092:9092" environment: # Configure listeners for both docker and host communication KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://kafka:9093 KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT # Settings required for KRaft mode KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091 # Listener to use for broker-to-broker communication KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER # Required for a single node cluster KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 healthcheck: test: ["CMD", "bash", "-c", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list"] interval: 10s timeout: 5s retries: 5 networks: - app-network kafka-ui: image: ghcr.io/kafbat/kafka-ui:latest ports: - 8080:8080 environment: DYNAMIC_CONFIG_ENABLED: "true" KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093 depends_on: - kafka networks: - app-network consumer: build: context: ./kafka_consumer dockerfile: Dockerfile container_name: consumer depends_on: factory-service: condition: service_started kafka: condition: service_healthy ports: - "8099:80" networks: - app-network bytewax: build: context: ./consumer dockerfile: Dockerfile container_name: bytewax depends_on: - kafka networks: - app-network networks: app-network: driver: bridge

consumer.py(prints dataStream)

from kafka import KafkaConsumer KAFKA_BROKER = "kafka:9093" KAFKA_TOPIC = ["factory_001","factory_002"] consumer = KafkaConsumer( *KAFKA_TOPIC, group_id='my-group', bootstrap_servers=KAFKA_BROKER, value_deserializer=lambda x: json.loads(x.decode("utf-8")) )
Stream_process.py(不工作)

from bytewax import operators as op from bytewax.connectors.kafka import KafkaSource from bytewax.connectors.stdio import StdOutSink from bytewax.dataflow import Dataflow KAFKA_BROKER = ["kafka:9093"] KAFKA_TOPIC = ["factory_001"] flow = Dataflow("Average Aggregation") stream = op.input("kafka-in", flow, KafkaSource(KAFKA_BROKER, KAFKA_TOPIC)) op.output("out", stream, StdOutSink())
log消息:

%3|1742203770.748|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 1ms in state CONNECT) %3|1742203771.749|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed) thread '<unnamed>' panicked at src/run.rs:128:17: Box<dyn Any> note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts return list(_list_parts(client, self._topics)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ %3|1742203770.748|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 1ms in state CONNECT) %3|1742203771.749|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed) thread '<unnamed>' panicked at src/run.rs:128:17: Box<dyn Any> note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts return list(_list_parts(client, self._topics)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ONNECT, 1 identical error(s) suppressed) thread '<unnamed>' panicked at src/run.rs:128:17: Box<dyn Any> note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts return list(_list_parts(client, self._topics)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Box<dyn Any> note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts return list(_list_parts(client, self._topics)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts return list(_list_parts(client, self._topics)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ return list(_list_parts(client, self._topics)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 172, in _list_parts raise RuntimeError(msg) RuntimeError: error listing partitions for Kafka topic `'factory_001'`: Broker: Unknown topic or partition The above exception was the direct cause of the following exception: bytewax.errors.BytewaxRuntimeError: (src/inputs.rs:252:47): error calling `FixedPartitionSource.list_parts` in step "Average Aggregation.kafka-in" The above exception was the direct cause of the following exception: bytewax.errors.BytewaxRuntimeError: (src/worker.rs:354:34): error building FixedPartitionedSource The above exception was the direct cause of the following exception: Traceback (most recent call last): File "<frozen runpy>", line 198, in _run_module_as_main File "<frozen runpy>", line 88, in _run_code File "/usr/local/lib/python3.11/site-packages/bytewax/run.py", line 355, in <module> cli_main(**kwargs) bytewax.errors.BytewaxRuntimeError: (src/worker.rs:149:10): error building production dataflow

	

docker-compose.yml
您忘记端口映射
9093
docker apache-kafka docker-compose docker-network stream-processing
1个回答
0
投票

ports: - "9092:9092" - "9093:9093" # add this 该端口应将您的本地主机映射到您定义的广告听众 kafka:9093

然后,您应该能够连接到Localhost上的9093,这将重定向到您的听众。
	

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.