Docker Kafka 与 Python 消费者

问题描述 投票:0回答:4

我正在使用 dockerized Kafka 并编写了一个 Kafka 消费者程序。当我在本地计算机上的 docker 和应用程序中运行 Kafka 时,它运行得很好。但是当我在 docker 中配置本地应用程序时,我遇到了问题。该问题可能是由于直到应用程序启动时才创建主题所致。

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  parse-engine:
    build: .
    depends_on:
      - "kafka"
    command: python parse-engine.py
    ports:
     - "5000:5000"

解析引擎.py

from kafka import KafkaConsumer
import json

try:
    print('Welcome to parse engine')
    consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')
    for message in consumer:
        print(message)
except Exception as e:
    print(e)
    # Logs the error appropriately. 
    pass

错误日志

kafka_1         | [2018-09-21 06:27:17,400] INFO [SocketServer brokerId=1001] Started processors for 1 acceptors (kafka.network.SocketServer)
kafka_1         | [2018-09-21 06:27:17,404] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1         | [2018-09-21 06:27:17,404] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1         | [2018-09-21 06:27:17,431] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
**parse-engine_1  | Welcome to parse engine
parse-engine_1  | NoBrokersAvailable 
parseengine_parse-engine_1 exited with code 0**
kafka_1         | creating topics: test:1:1

由于我已经在 docker-compose 中添加了 depends_on 属性,但在启动主题应用程序连接之前,因此发生了错误。

我读到我可以在 docker-compose 文件中添加脚本,但我正在寻找一些简单的方法。

感谢您的帮助

python docker apache-kafka docker-compose kafka-consumer-api
4个回答
104
投票

您的问题是网络。在您正在设置的 Kafka 配置中

KAFKA_ADVERTISED_HOST_NAME: localhost

但这意味着任何客户端(包括您的Python应用程序)都将连接到代理,然后代理告诉您对任何连接使用

localhost
。由于您的客户端计算机(例如您的 python 容器)的本地主机不是代理所在的位置,因此请求将失败。

您可以在此处详细了解如何修复 Kafka 侦听器的问题

因此,要解决您的问题,您可以执行以下两种操作之一:

  1. 只需更改您的撰写即可使用 Kafka 的内部主机名 (

    KAFKA_ADVERTISED_HOST_NAME: kafka
    )。这意味着 docker 网络内的任何客户端都能够正常访问它,但没有外部客户端能够(例如从您的主机): version: '3' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: kafka KAFKA_CREATE_TOPICS: "test:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock parse-engine: build: . depends_on: - "kafka" command: python parse-engine.py ports: - "5000:5000" 然后,您的客户将通过 kafka:9092 访问代理,因此您的 python 应用程序将更改为

     consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')
    

    添加
  2. Kafka 的新监听器。这使得它可以在 docker 网络的内部和外部访问。端口 29092 用于对 docker 网络进行
  3. 外部

    访问(例如从您的主机),端口 9092 用于内部访问。 您仍然需要更改 python 程序才能在正确的地址访问 Kafka。在这种情况下,由于它是 Docker 网络的内部,因此您可以使用:

    consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092') 由于我不熟悉

    wurstmeister
     图像,这个 docker-compose 基于我所知道的 Confluence 图像:

    (编辑器损坏了我的yaml,你可以在这里找到

    --- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,- # An important note about accessing Kafka from clients on other machines: # ----------------------------------------------------------------------- # # The config used here exposes port 29092 for _external_ connections to the broker # i.e. those from _outside_ the docker network. This could be from the host machine # running docker, or maybe further afield if you've got a more complicated setup. # If the latter is true, you will need to change the value 'localhost' in # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those # remote clients # # For connections _internal_ to the docker network, such as from other services # and components, use kafka:9092. # # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,- # image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

    免责声明:我为 Confluence 工作

这条线

KAFKA_ADVERTISED_HOST_NAME: localhost

7
投票
表示经纪商将自己宣传为仅在

localhost

上可用,这意味着所有 Kafka 客户端只能返回其自身,而不是真实经纪商地址的实际列表。如果您的客户端仅位于您的主机上,那就没问题了 - 请求始终发送到本地主机,即“转发到容器”。

但是,对于其他容器中的应用程序,它们需要指向 Kafka 容器,因此应该显示

KAFKA_ADVERTISED_HOST_NAME: kafka

,其中 
kafka 是 Docker Compose 服务的名称。然后其他容器中的客户端会尝试连接到该容器

话虽这么说,那么,这条线


consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')

您将 Python 容器

指向其本身
,而不是

kafka

容器。

应该说

kafka:9092而不是


就我而言,我想从本地运行的外部 python 客户端(作为生产者)访问 Kafka 容器,这里是对我有用的容器和 python 代码的组合(平台 MAC OS 和 docker 版本 2.4.0):

动物园管理员容器:

docker run -d \ -p 2181:2181 \ --name=zookeeper \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:5.2.3

1
投票
kafka 容器:

docker run -d \ -p 29092:29092 \ -p 9092:9092 \ --name=kafka \ -e KAFKA_ZOOKEEPER_CONNECT=host.docker.internal:2181 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,BROKER://localhost:9092 \ -e KAFKA_INTER_BROKER_LISTENER_NAME=BROKER \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_CREATE_TOPICS="test:1:1" \ confluentinc/cp-enterprise-kafka:5.2.3

python 客户端:
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['localhost:29092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
security_protocol='PLAINTEXT')
acc_ini = 523416
print("Sending message")
producer.send('test', {'model_id': '1','acc':str(acc_ini), 'content':'test'})
producer.flush()

在我的本地设置中,我遇到了同样的问题,所有容器在 Docker 内都工作正常,但连接器无法连接到 Kafka。

我的Kafka配置里面
docker-compose.yml
:

0
投票
broker: image: confluentinc/cp-server:7.0.1 hostname: broker container_name: broker depends_on: - zookeeper ports: - "29092:29092" - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092 KAFKA_ADVERTISED_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL

根据谷歌的回复,我尝试将

localhost:port

127.0.0.1:port

0.0.0.0:port
作为连接器配置中的 URL,但没有任何效果。

最终,我必须传递本地系统的实际 IP 地址,它成功了。
我在 Windows 10 上使用 Docker。
ActiveMQ 连接器配置:
{
  "connector.class": "io.confluent.connect.activemq.ActiveMQSourceConnector",
  "activemq.url": "tcp://<my-system-ip>:61616", <-- my system's IP address
  "max.poll.duration": "60000",
  "tasks.max": "1",
  "batch.size": "1",
  "name": "activemq-jms-connector",
  "jms.destination.name": "jms-test",
  "kafka.topic": "topic-1",
  "activemq.password": "password",
  "jms.destination.type": "topic",
  "use.permissive.schema": "false",
  "activemq.username": "username"
}

我希望这能帮助任何在 Windows 和 Docker 上设置

Kafka-connect

的人。


© www.soinside.com 2019 - 2024. All rights reserved.