如何使用incremental_alter_configs更改主题配置?

问题描述 投票:0回答:1

我正在尝试创建一个 Python 脚本来自动创建和配置主题。

我已经成功地使主题创建部分发挥作用。尝试更改配置时出现运行时错误。

ValueError:预期要更改的 ConfigEntry 的非空列表 在incremental_configs字段中增量

这是我的代码:

config = {
    'min.insync.replicas': '3'
}

resource = ConfigResource(ResourceType.TOPIC, name=topic_name, set_config=config, described_configs=None)

futures = admin_client.incremental_alter_configs(resources=[resource])

for config_resource, future in futures.items():
    try:
        future.result()
        print(f'Updated topic config for topic {config_resource}')
    except Exception as exception:
        print(f'Failed to update topic config for topic {config_resource}, {exception}')

我发现文档很难理解。

我将此代码基于此示例。从阅读文档来看,改变配置似乎可以通过与主题创建类似的方式来完成。我不完全确定出了什么问题。

python apache-kafka confluent-kafka-python
1个回答
1
投票

以下是如何打印一些有用的调试信息,这有助于解释

ConfigEntry
项目的预期数据结构是什么样的。

# Print some debug info
config_resource = ConfigResource(ResourceType.TOPIC, name=topic_name)
futures = admin_client.describe_configs([config_resource])

for config_resource, future in futures.items():
    print(f'config_resource={config_resource}')
    try:
        dictionary = future.result()
        print(dictionary)
    except Exception as exception:
        print(f'Failed to create topic {config_resource}, {exception}')

解决方案

设置单个

ConfigEntry

resource = \
    ConfigResource(
        ResourceType.TOPIC,
        name=topic_name,
        set_config=None,
        described_configs=None,
        incremental_configs=[
            ConfigEntry(
                name='min.insync.replicas', 
                value='3',
                source=ConfigSource.DYNAMIC_TOPIC_CONFIG,
                incremental_operation=AlterConfigOpType.SET)
        ]
    )

resources = [resource]

futures = admin_client.incremental_alter_configs(resources)

for config_resource, future in futures.items():
    try:
        future.result()
        print(f'Updated topic config for topic {config_resource}')
    except Exception as exception:
        print(f'Failed to update topic config for topic {config_resource}, {exception}')

您还可以定义多个

ConfigEntry
对象来一次设置。

#Update Topic Config
config_entry_cleanup_policy = \
    ConfigEntry(
        name='cleanup.policy',
        value='compact',
        source=ConfigSource.DYNAMIC_TOPIC_CONFIG,
        incremental_operation=AlterConfigOpType.SET)

config_entry_min_insync_replicas = \
    ConfigEntry(
        name='min.insync.replicas',
        value='3',
        source=ConfigSource.DYNAMIC_TOPIC_CONFIG,
        incremental_operation=AlterConfigOpType.SET)

resource = \
    ConfigResource(
        ResourceType.TOPIC,
        name=topic_name,
        set_config=None,
        described_configs=None,
        incremental_configs=[
            config_entry_cleanup_policy,
            config_entry_min_insync_replicas
        ]
    )

resources = [resource]

futures = admin_client.incremental_alter_configs(resources)
© www.soinside.com 2019 - 2024. All rights reserved.