我正在尝试创建一个 Python 脚本来自动创建和配置主题。
我已经成功地使主题创建部分发挥作用。尝试更改配置时出现运行时错误。
ValueError:预期要更改的 ConfigEntry 的非空列表 在incremental_configs字段中增量
这是我的代码:
config = {
'min.insync.replicas': '3'
}
resource = ConfigResource(ResourceType.TOPIC, name=topic_name, set_config=config, described_configs=None)
futures = admin_client.incremental_alter_configs(resources=[resource])
for config_resource, future in futures.items():
try:
future.result()
print(f'Updated topic config for topic {config_resource}')
except Exception as exception:
print(f'Failed to update topic config for topic {config_resource}, {exception}')
我发现文档很难理解。
我将此代码基于此示例。从阅读文档来看,改变配置似乎可以通过与主题创建类似的方式来完成。我不完全确定出了什么问题。
以下是如何打印一些有用的调试信息,这有助于解释
ConfigEntry
项目的预期数据结构是什么样的。
# Print some debug info
config_resource = ConfigResource(ResourceType.TOPIC, name=topic_name)
futures = admin_client.describe_configs([config_resource])
for config_resource, future in futures.items():
print(f'config_resource={config_resource}')
try:
dictionary = future.result()
print(dictionary)
except Exception as exception:
print(f'Failed to create topic {config_resource}, {exception}')
设置单个
ConfigEntry
。
resource = \
ConfigResource(
ResourceType.TOPIC,
name=topic_name,
set_config=None,
described_configs=None,
incremental_configs=[
ConfigEntry(
name='min.insync.replicas',
value='3',
source=ConfigSource.DYNAMIC_TOPIC_CONFIG,
incremental_operation=AlterConfigOpType.SET)
]
)
resources = [resource]
futures = admin_client.incremental_alter_configs(resources)
for config_resource, future in futures.items():
try:
future.result()
print(f'Updated topic config for topic {config_resource}')
except Exception as exception:
print(f'Failed to update topic config for topic {config_resource}, {exception}')
您还可以定义多个
ConfigEntry
对象来一次设置。
#Update Topic Config
config_entry_cleanup_policy = \
ConfigEntry(
name='cleanup.policy',
value='compact',
source=ConfigSource.DYNAMIC_TOPIC_CONFIG,
incremental_operation=AlterConfigOpType.SET)
config_entry_min_insync_replicas = \
ConfigEntry(
name='min.insync.replicas',
value='3',
source=ConfigSource.DYNAMIC_TOPIC_CONFIG,
incremental_operation=AlterConfigOpType.SET)
resource = \
ConfigResource(
ResourceType.TOPIC,
name=topic_name,
set_config=None,
described_configs=None,
incremental_configs=[
config_entry_cleanup_policy,
config_entry_min_insync_replicas
]
)
resources = [resource]
futures = admin_client.incremental_alter_configs(resources)