我正在使用 dockerized Kafka 并编写了一个 Kafka 消费者程序。当我在本地计算机上的 docker 和应用程序中运行 Kafka 时,它运行得很好。但是当我在 docker 中配置本地应用程序时,我遇到了问题。该问题可能是由于直到应用程序启动时才创建主题所致。
docker-compose.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
parse-engine:
build: .
depends_on:
- "kafka"
command: python parse-engine.py
ports:
- "5000:5000"
解析引擎.py
from kafka import KafkaConsumer
import json
try:
print('Welcome to parse engine')
consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')
for message in consumer:
print(message)
except Exception as e:
print(e)
# Logs the error appropriately.
pass
错误日志
kafka_1 | [2018-09-21 06:27:17,400] INFO [SocketServer brokerId=1001] Started processors for 1 acceptors (kafka.network.SocketServer)
kafka_1 | [2018-09-21 06:27:17,404] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1 | [2018-09-21 06:27:17,404] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1 | [2018-09-21 06:27:17,431] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
**parse-engine_1 | Welcome to parse engine
parse-engine_1 | NoBrokersAvailable
parseengine_parse-engine_1 exited with code 0**
kafka_1 | creating topics: test:1:1
由于我已经在 docker-compose 中添加了 depends_on 属性,但在启动主题应用程序连接之前,因此发生了错误。
我读到我可以在 docker-compose 文件中添加脚本,但我正在寻找一些简单的方法。
感谢您的帮助
您的问题是网络。在您正在设置的 Kafka 配置中
KAFKA_ADVERTISED_HOST_NAME: localhost
但这意味着任何客户端(包括您的Python应用程序)都将连接到代理,然后代理告诉您对任何连接使用
localhost
。由于您的客户端计算机(例如您的 python 容器)的本地主机不是代理所在的位置,因此请求将失败。
您可以在此处详细了解如何修复 Kafka 侦听器的问题
因此,要解决您的问题,您可以执行以下两种操作之一:
只需更改您的撰写即可使用 Kafka 的内部主机名 (
KAFKA_ADVERTISED_HOST_NAME: kafka
)。这意味着 docker 网络内的任何客户端都能够正常访问它,但没有外部客户端能够(例如从您的主机):
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
parse-engine:
build: .
depends_on:
- "kafka"
command: python parse-engine.py
ports:
- "5000:5000"
然后,您的客户将通过 kafka:9092 访问代理,因此您的 python 应用程序将更改为
consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')
添加
访问(例如从您的主机),端口 9092 用于内部访问。 您仍然需要更改 python 程序才能在正确的地址访问 Kafka。在这种情况下,由于它是 Docker 网络的内部,因此您可以使用:
consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')
由于我不熟悉
wurstmeister
图像,这个 docker-compose 基于我所知道的 Confluence 图像:
(编辑器损坏了我的yaml,你可以在这里找到
)
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
# An important note about accessing Kafka from clients on other machines:
# -----------------------------------------------------------------------
#
# The config used here exposes port 29092 for _external_ connections to the broker
# i.e. those from _outside_ the docker network. This could be from the host machine
# running docker, or maybe further afield if you've got a more complicated setup.
# If the latter is true, you will need to change the value 'localhost' in
# KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
# remote clients
#
# For connections _internal_ to the docker network, such as from other services
# and components, use kafka:9092.
#
# See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
#
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
免责声明:我为 Confluence 工作这条线
KAFKA_ADVERTISED_HOST_NAME: localhost
localhost
但是,对于其他容器中的应用程序,它们需要指向 Kafka 容器,因此应该显示
KAFKA_ADVERTISED_HOST_NAME: kafka
,其中
kafka
是 Docker Compose 服务的名称。然后其他容器中的客户端会尝试连接到该容器
话虽这么说,那么,这条线
consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')
您将 Python 容器 指向其本身
kafka
应该说
kafka:9092
而不是
动物园管理员容器:
docker run -d \
-p 2181:2181 \
--name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:5.2.3
docker run -d \
-p 29092:29092 \
-p 9092:9092 \
--name=kafka \
-e KAFKA_ZOOKEEPER_CONNECT=host.docker.internal:2181 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,BROKER://localhost:9092 \
-e KAFKA_INTER_BROKER_LISTENER_NAME=BROKER \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_CREATE_TOPICS="test:1:1" \
confluentinc/cp-enterprise-kafka:5.2.3
python 客户端:
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['localhost:29092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
security_protocol='PLAINTEXT')
acc_ini = 523416
print("Sending message")
producer.send('test', {'model_id': '1','acc':str(acc_ini), 'content':'test'})
producer.flush()
在我的本地设置中,我遇到了同样的问题,所有容器在 Docker 内都工作正常,但连接器无法连接到 Kafka。
我的Kafka配置里面
docker-compose.yml
: broker:
image: confluentinc/cp-server:7.0.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
根据谷歌的回复,我尝试将
localhost:port
、
127.0.0.1:port
和 0.0.0.0:port
作为连接器配置中的 URL,但没有任何效果。
最终,我必须传递本地系统的实际 IP 地址,它成功了。 我在 Windows 10 上使用 Docker。
ActiveMQ 连接器配置:
{
"connector.class": "io.confluent.connect.activemq.ActiveMQSourceConnector",
"activemq.url": "tcp://<my-system-ip>:61616", <-- my system's IP address
"max.poll.duration": "60000",
"tasks.max": "1",
"batch.size": "1",
"name": "activemq-jms-connector",
"jms.destination.name": "jms-test",
"kafka.topic": "topic-1",
"activemq.password": "password",
"jms.destination.type": "topic",
"use.permissive.schema": "false",
"activemq.username": "username"
}
我希望这能帮助任何在 Windows 和 Docker 上设置Kafka-connect