我已经通过Docker compose运行了Kafka和Zookeeper。我可以使用Kafka终端向主题发送/使用消息,并且可以通过Conduktor监视所有内容。但是不幸的是,我无法使用Alpakka连接器通过我的Scala应用程序使用msg。该应用程序连接到该主题,但是每当我向该主题发送msg时,都不会发生任何事情。
只是Kafka和Zookeeper正在通过docker-compose运行。我直接在主机上运行Scala消费者应用程序。
Docker Compose
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
Scala App
object Main extends App {
implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
val kafkaConsumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withGroupId("new_id")
.withCommitRefreshInterval(1.seconds)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withBootstrapServers("localhost:9092")
Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics("test1"))
.map(msg => msg.value())
.runWith(Sink.foreach(println)).onComplete {
case Failure(exception) => exception.printStackTrace()
case Success(value) => println("done")
}
}
App-控制台输出
16:58:33.877 INFO [akka.event.slf4j.Slf4jLogger] Slf4jLogger started
16:58:34.470 INFO [akka.kafka.internal.SingleSourceLogic] [1955f] Starting. StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-591284224]
16:58:34.516 INFO [org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = novo_id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
16:58:34.701 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka version: 2.4.0
16:58:34.702 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka commitId: 77a89fcf8d7fa018
16:58:34.702 INFO [org.apache.kafka.common.utils.AppInfoParser] Kafka startTimeMs: 1585256314699
16:58:34.715 INFO [org.apache.kafka.clients.consumer.KafkaConsumer] [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Subscribed to topic(s): test1
16:58:35.308 INFO [org.apache.kafka.clients.Metadata] [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Cluster ID: c2XBuDIJTI-gBs9guTvG
描述如何通过以下方式通告和访问主机名客户。该值已发布到ZooKeeper供客户端使用。
如果使用
KAFKA_ADVERTISED_LISTENERS
或SSL
协议,则端点值必须指定协议,格式如下:
SSL:
SASL
或SSL://
SASL:
SASL_SSL://
或SASL_PLAINTEXT://
SASL_SSL://
现在您的使用者可以使用端口KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092
:
29092