services:
kafka:
image: apache/kafka
ports:
- "9092:9092"
environment:
# Configure listeners for both docker and host communication
KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://kafka:9093
KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT
# Settings required for KRaft mode
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091
# Listener to use for broker-to-broker communication
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
# Required for a single node cluster
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
healthcheck:
test: ["CMD", "bash", "-c", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list"]
interval: 10s
timeout: 5s
retries: 5
networks:
- app-network
kafka-ui:
image: ghcr.io/kafbat/kafka-ui:latest
ports:
- 8080:8080
environment:
DYNAMIC_CONFIG_ENABLED: "true"
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
depends_on:
- kafka
networks:
- app-network
consumer:
build:
context: ./kafka_consumer
dockerfile: Dockerfile
container_name: consumer
depends_on:
factory-service:
condition: service_started
kafka:
condition: service_healthy
ports:
- "8099:80"
networks:
- app-network
bytewax:
build:
context: ./consumer
dockerfile: Dockerfile
container_name: bytewax
depends_on:
- kafka
networks:
- app-network
networks:
app-network:
driver: bridge
consumer.py(prints dataStream)
from kafka import KafkaConsumer
KAFKA_BROKER = "kafka:9093"
KAFKA_TOPIC = ["factory_001","factory_002"]
consumer = KafkaConsumer(
*KAFKA_TOPIC,
group_id='my-group',
bootstrap_servers=KAFKA_BROKER,
value_deserializer=lambda x: json.loads(x.decode("utf-8"))
)
Stream_process.py(不工作)
from bytewax import operators as op
from bytewax.connectors.kafka import KafkaSource
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
KAFKA_BROKER = ["kafka:9093"]
KAFKA_TOPIC = ["factory_001"]
flow = Dataflow("Average Aggregation")
stream = op.input("kafka-in", flow, KafkaSource(KAFKA_BROKER, KAFKA_TOPIC))
op.output("out", stream, StdOutSink())
log消息:
%3|1742203770.748|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 1ms in state CONNECT)
%3|1742203771.749|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
%3|1742203770.748|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 1ms in state CONNECT)
%3|1742203771.749|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 172, in _list_parts
raise RuntimeError(msg)
RuntimeError: error listing partitions for Kafka topic `'factory_001'`: Broker: Unknown topic or partition
The above exception was the direct cause of the following exception:
bytewax.errors.BytewaxRuntimeError: (src/inputs.rs:252:47): error calling `FixedPartitionSource.list_parts` in step "Average Aggregation.kafka-in"
The above exception was the direct cause of the following exception:
bytewax.errors.BytewaxRuntimeError: (src/worker.rs:354:34): error building FixedPartitionedSource
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "/usr/local/lib/python3.11/site-packages/bytewax/run.py", line 355, in <module>
cli_main(**kwargs)
bytewax.errors.BytewaxRuntimeError: (src/worker.rs:149:10): error building production dataflow
在
docker-compose.yml您忘记端口映射9093
ports:
- "9092:9092"
- "9093:9093" # add this
该端口应将您的本地主机映射到您定义的广告听众
kafka:9093