我正在尝试设置一个 Kafka 集群(实际上是集群中的第一个节点)。
我有一个单节点 Zookeeper 集群设置。 我正在一个单独的节点上设置 kafka。
两者都运行 CentOS 6.4,运行 IPV6,这有点像 PITA。 我验证了机器可以使用 netcat 相互通信。
当我启动 kafka 时,出现以下异常(这会导致 kafka 关闭)。 编辑:我启动了 kafka,我必须在 server.config 文件中设置
host.name
属性。
我能够创建一个测试主题并从 kafka 服务器发送消息。
但是,在尝试使用消息时,我遇到了同样的错误。
有什么帮助、建议吗?
bin/kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning
Exception in thread "main" java.net.UnknownHostException: kafka: kafka: Name or service not known
at java.net.InetAddress.getLocalHost(InetAddress.java:1473)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:128)
at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:178)
at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: java.net.UnknownHostException: kafka: Name or service not known
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901)
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293)
at java.net.InetAddress.getLocalHost(InetAddress.java:1469)
... 5 more
当您运行
> bin/kafka-console-consumer.sh
命令时,kafka 会加载 ConsoleConsumer
,它将尝试使用自动生成的消费者 ID 创建消费者。 Kafka 生成消费者 id 的方式是将本地主机的名称与其连接起来。 因此,问题在于 java 无法解析我正在使用的 Open Stack VM 上本地主机的 IP 地址。
所以答案是 Open Stack 虚拟机将本地主机名解析为
kafka
,这是虚拟机的名称。 我在 Kafka 和 Zookeeper 实例中将所有内容设置为 kafka1
。
因此,当 java 调用 getLocalHost 时,它试图查找
kafka
的 IP 地址,而我在 /etc/hosts 文件中没有。
我只是在我的 /etc/hosts 文件中添加了一个
kafka
条目,一切都开始正常工作!!!
我本以为它会解析为
localhost
,但事实并非如此,它解析为虚拟机的名称,kafka
。
正如 noplay 指出的,问题是 Kafka 无法解析正确的 IP,例如,在没有分配公共 IP 的情况下在私有子网中运行的 EC2 实例上可能会发生这种情况。解决方案总结:
hostname
这将显示主机名,例如 ip-10-180-128-217。 然后只需更新您的 /etc/hosts
sudo nano /etc/hosts
编辑,例如
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ip-10-180-128-217
在你的 kafka_folder/config (sudo vim /etc/hosts) 下,你的文件应该如下所示:
127.0.0.1 kafka localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain <localhost_OR_DNS_OR_ip_address>
<localhost_OR_DNS_OR_ip_address> kafka
请注意,kafka 是我想要的主机名。
然后,下
kafka_folder/config/server.properties
有一个字段“listeners=”。
看起来是这样的
# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
只需删除该字段前面的“#”即可取消对该字段的注释。应该看起来像这样:
听众=明文://
:9092
如果您配置了 dns 服务器,kafka 会考虑您的 dns 域 姓名 所以将此命令粘贴到您的服务器上
主机名 结果将是您的本地域,现在复制您的本地域然后打开主机文件
nano /etc/hosts 添加您的本地域。
127.0.0.1 localhost localhost.localdomain“您的本地域”
::1 localhost localhost.localdomain“您的本地域”
使用 Kafka 时,特别是在默认主机解析设置可能不起作用的环境中(例如,当无法修改
/etc/hosts
文件时),为代理配置 DNS 可能具有挑战性。为了解决这个问题,我实现了一个自定义 DNS 解析器,它直接集成到 Kafka 生产者中。虽然它可能不是最优雅的解决方案,但它对生产者和消费者都有效。
生产者使用实用方法来注入自定义 DNS 解析器。如果启用,它可确保代理主机名解析为配置的 IP:
public abstract class CustomAvroProducer<T> extends AbstractKafkaClient implements CustomProducer<T> {
private static final Logger LOG = LogManager.getLogger(CustomAvroProducer.class);
private final List<T> records = Collections.synchronizedList(new ArrayList<>());
private final KafkaProducer<String, T> producer = new KafkaProducer<>(getProperties());
public CustomAvroProducer(KafkaTopic topic) {
super(topic);
configureCustomDnsResolver();
}
private void configureCustomDnsResolver() {
if (BROKER_DNS_ENABLE) { // Remove if need
try {
// DEFAULT_BROKER_HOST = kafka
// BROKER_IP = 10.1.1.9
KafkaUtil.injectDnsResolverForBroker(producer, DEFAULT_BROKER_HOST, BROKER_IP);
LOG.info("Custom DNS resolver activated for broker: {}", BROKER_IP);
} catch (Exception e) {
LOG.error("Failed to add DNS for hostname '{}', broker IP '{}'", DEFAULT_BROKER_HOST, BROKER_IP, e);
}
}
}
...
}
/**
* Utility class for Kafka-related operations.
* This class provides methods to inject a custom DNS resolver
* into the Kafka producer, ensuring the broker name is recognized
* and messages are sent correctly.
*/
public class KafkaUtil {
/**
* Injects a custom DNS resolver into the Kafka producer,
* configuring the broker's IP address.
*
* @param kafka The Kafka producer where the DNS resolver will be injected.
* @param brokerDefault The default broker name to be used by the DNS resolver.
* @param brokerIp The IP address of the Kafka broker.
* @throws NoSuchFieldException If one of the expected fields is not found.
* @throws IllegalAccessException If access to the field is not allowed.
*/
public static <T> void injectDnsResolverForBroker(KafkaProducer<String, T> kafka, String brokerDefault, String brokerIp) throws NoSuchFieldException, IllegalAccessException {
Object connectionStates = getProducerConnectionStates(kafka);
Field hostResolver = connectionStates.getClass().getDeclaredField("hostResolver");
hostResolver.setAccessible(true);
hostResolver.set(connectionStates, new CustomDnsResolver(brokerDefault, brokerIp));
}
private static <T> Object getProducerConnectionStates(KafkaProducer<String, T> kafka) throws NoSuchFieldException, IllegalAccessException {
Field senderField = KafkaProducer.class.getDeclaredField("sender");
senderField.setAccessible(true);
Sender sender = (Sender) senderField.get(kafka);
Field clientField = Sender.class.getDeclaredField("client");
clientField.setAccessible(true);
NetworkClient networkClient = (NetworkClient) clientField.get(sender);
Field networkField = NetworkClient.class.getDeclaredField("connectionStates");
networkField.setAccessible(true);
return networkField.get(networkClient);
}
}
public class CustomDnsResolver implements HostResolver {
private static final Logger LOG = LogManager.getLogger(CustomDnsResolver.class);
protected final String brokerIp;
protected final boolean isBrokerIpConfigured;
protected final String defaultBrokerHost;
public CustomDnsResolver(String defaultBrokerHost, String brokerIp) {
this.brokerIp = brokerIp;
this.defaultBrokerHost = defaultBrokerHost;
this.isBrokerIpConfigured = Strings.isNotEmpty(brokerIp);
}
@Override
public InetAddress[] resolve(String host) throws UnknownHostException {
if (isBrokerHost(host)) {
LOG.info("Resolving host '{}' to IP: {}", defaultBrokerHost, brokerIp);
return InetAddress.getAllByName(brokerIp);
}
return InetAddress.getAllByName(host);
}
private boolean isBrokerHost(String host) {
return host.equals(defaultBrokerHost) && this.isBrokerIpConfigured;
}
}