UnknownHostException kafka

问题描述 投票:0回答:5

我正在尝试设置一个 Kafka 集群(实际上是集群中的第一个节点)。

我有一个单节点 Zookeeper 集群设置。 我正在一个单独的节点上设置 kafka。

两者都运行 CentOS 6.4,运行 IPV6,这有点像 PITA。 我验证了机器可以使用 netcat 相互通信。

当我启动 kafka 时,出现以下异常(这会导致 kafka 关闭)。 编辑:我启动了 kafka,我必须在 server.config 文件中设置

host.name
属性。

我能够创建一个测试主题并从 kafka 服务器发送消息。

但是,在尝试使用消息时,我遇到了同样的错误。

有什么帮助、建议吗?

bin/kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning
Exception in thread "main" java.net.UnknownHostException: kafka: kafka: Name or service not known
    at java.net.InetAddress.getLocalHost(InetAddress.java:1473)
    at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
    at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:128)
    at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
    at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:178)
    at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: java.net.UnknownHostException: kafka: Name or service not known
    at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
    at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901)
    at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293)
    at java.net.InetAddress.getLocalHost(InetAddress.java:1469)
    ... 5 more
apache-zookeeper apache-kafka
5个回答
32
投票

当您运行

> bin/kafka-console-consumer.sh
命令时,kafka 会加载
ConsoleConsumer
,它将尝试使用自动生成的消费者 ID 创建消费者。 Kafka 生成消费者 id 的方式是将本地主机的名称与其连接起来。 因此,问题在于 java 无法解析我正在使用的 Open Stack VM 上本地主机的 IP 地址。

所以答案是 Open Stack 虚拟机将本地主机名解析为

kafka
,这是虚拟机的名称。 我在 Kafka 和 Zookeeper 实例中将所有内容设置为
kafka1

因此,当 java 调用 getLocalHost 时,它试图查找

kafka
的 IP 地址,而我在 /etc/hosts 文件中没有

我只是在我的 /etc/hosts 文件中添加了一个

kafka
条目,一切都开始正常工作!!!

我本以为它会解析为

localhost
,但事实并非如此,它解析为虚拟机的名称,
kafka


7
投票

正如 noplay 指出的,问题是 Kafka 无法解析正确的 IP,例如,在没有分配公共 IP 的情况下在私有子网中运行的 EC2 实例上可能会发生这种情况。解决方案总结:

hostname

这将显示主机名,例如 ip-10-180-128-217。 然后只需更新您的 /etc/hosts

sudo nano /etc/hosts

编辑,例如

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4 ip-10-180-128-217

3
投票

在你的 kafka_folder/config (sudo vim /etc/hosts) 下,你的文件应该如下所示:

127.0.0.1 kafka localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain <localhost_OR_DNS_OR_ip_address>
<localhost_OR_DNS_OR_ip_address> kafka

请注意,kafka 是我想要的主机名。

然后,下

kafka_folder/config/server.properties

有一个字段“listeners=”。

看起来是这样的

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092

只需删除该字段前面的“#”即可取消对该字段的注释。应该看起来像这样:

听众=明文://:9092


0
投票
  • 如果您配置了 dns 服务器,kafka 会考虑您的 dns 域 姓名 所以将此命令粘贴到您的服务器上

    主机名 结果将是您的本地域,现在复制您的本地域然后打开主机文件

    nano /etc/hosts 添加您的本地域。

    127.0.0.1 localhost localhost.localdomain“您的本地域”

    ::1 localhost localhost.localdomain“您的本地域”


0
投票

使用 Kafka 时,特别是在默认主机解析设置可能不起作用的环境中(例如,当无法修改

/etc/hosts
文件时),为代理配置 DNS 可能具有挑战性。为了解决这个问题,我实现了一个自定义 DNS 解析器,它直接集成到 Kafka 生产者中。虽然它可能不是最优雅的解决方案,但它对生产者和消费者都有效。

实施

带有 DNS 解析器的自定义生产者

生产者使用实用方法来注入自定义 DNS 解析器。如果启用,它可确保代理主机名解析为配置的 IP:

public abstract class CustomAvroProducer<T> extends AbstractKafkaClient implements CustomProducer<T> {

    private static final Logger LOG = LogManager.getLogger(CustomAvroProducer.class);

    private final List<T> records = Collections.synchronizedList(new ArrayList<>());

    private final KafkaProducer<String, T> producer = new KafkaProducer<>(getProperties());

    public CustomAvroProducer(KafkaTopic topic) {
        super(topic);
        configureCustomDnsResolver();
    }

    private void configureCustomDnsResolver() {
        if (BROKER_DNS_ENABLE) { // Remove if need
            try {
                // DEFAULT_BROKER_HOST = kafka
                // BROKER_IP = 10.1.1.9
                KafkaUtil.injectDnsResolverForBroker(producer, DEFAULT_BROKER_HOST, BROKER_IP);
                LOG.info("Custom DNS resolver activated for broker: {}", BROKER_IP);
            } catch (Exception e) {
                LOG.error("Failed to add DNS for hostname '{}', broker IP '{}'", DEFAULT_BROKER_HOST, BROKER_IP, e);
            }
        }
    }
    
    ...
}

/**
 * Utility class for Kafka-related operations.
 * This class provides methods to inject a custom DNS resolver
 * into the Kafka producer, ensuring the broker name is recognized
 * and messages are sent correctly.
 */
public class KafkaUtil {

    /**
     * Injects a custom DNS resolver into the Kafka producer,
     * configuring the broker's IP address.
     *
     * @param kafka The Kafka producer where the DNS resolver will be injected.
     * @param brokerDefault The default broker name to be used by the DNS resolver.
     * @param brokerIp The IP address of the Kafka broker.
     * @throws NoSuchFieldException If one of the expected fields is not found.
     * @throws IllegalAccessException If access to the field is not allowed.
     */
    public static <T> void injectDnsResolverForBroker(KafkaProducer<String, T> kafka, String brokerDefault, String brokerIp) throws NoSuchFieldException, IllegalAccessException {
        Object connectionStates = getProducerConnectionStates(kafka);
        Field hostResolver = connectionStates.getClass().getDeclaredField("hostResolver");
        hostResolver.setAccessible(true);
        hostResolver.set(connectionStates, new CustomDnsResolver(brokerDefault, brokerIp));
    }

    private static <T> Object getProducerConnectionStates(KafkaProducer<String, T> kafka) throws NoSuchFieldException, IllegalAccessException {
        Field senderField = KafkaProducer.class.getDeclaredField("sender");
        senderField.setAccessible(true);

        Sender sender = (Sender) senderField.get(kafka);
        Field clientField = Sender.class.getDeclaredField("client");
        clientField.setAccessible(true);

        NetworkClient networkClient = (NetworkClient) clientField.get(sender);
        Field networkField = NetworkClient.class.getDeclaredField("connectionStates");
        networkField.setAccessible(true);
        return networkField.get(networkClient);
    }
}

public class CustomDnsResolver implements HostResolver {

    private static final Logger LOG = LogManager.getLogger(CustomDnsResolver.class);

    protected final String brokerIp;
    protected final boolean isBrokerIpConfigured;
    protected final String defaultBrokerHost;

    public CustomDnsResolver(String defaultBrokerHost, String brokerIp) {
        this.brokerIp = brokerIp;
        this.defaultBrokerHost = defaultBrokerHost;
        this.isBrokerIpConfigured = Strings.isNotEmpty(brokerIp);
    }

    @Override
    public InetAddress[] resolve(String host) throws UnknownHostException {
        if (isBrokerHost(host)) {
            LOG.info("Resolving host '{}' to IP: {}", defaultBrokerHost, brokerIp);
            return InetAddress.getAllByName(brokerIp);
        }
        return InetAddress.getAllByName(host);
    }

    private boolean isBrokerHost(String host) {
        return host.equals(defaultBrokerHost) && this.isBrokerIpConfigured;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.