我正在设置一个 C 应用程序,该应用程序应该将传感器数据发送到 kafka 服务器。该消息仅包含一个 JSON 字符串,其中包含所有传感器名称及其值。
kafka生产者是这样设置的:
int setupKafkaProducer(struct KafkaParameters *kafkaParameters, struct ClientOPCEndpointInfo* *clientInfos, int clientInfosLength, bool runTest)
{
logInfo("START - Setting up kafka producer", true);
conf = rd_kafka_conf_new();
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
rd_kafka_conf_res_t res = RD_KAFKA_CONF_OK;
// setting up parameters ...
if (res != RD_KAFKA_CONF_OK)
{
g_error("Failed to setup kafka config: %s", errstr);
logError("Failed to setup kafka config", true);
return 1;
}
producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer)
{
g_error("Failed to create new producer: %s", errstr);
logError("Failed to create new producer!", true);
return 1;
}
conf = NULL;
return 0;
}
消息回调仅用于报告发送kafka消息时可能出现的错误。
消息的发送方式如下:
int sendKafkaMessage(char *kafkaMessage)
{
int message_count = 1;
const char *topic = kafkaTopic;
const char *value = kafkaMessage;
for (int i = 0; i < message_count; i++)
{
size_t value_len = strlen(value);
rd_kafka_resp_err_t err;
err = rd_kafka_producev(producer,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_KEY(NULL, 0),
RD_KAFKA_V_VALUE((void*)value, value_len),
RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
{
// g_warning("Failed to produce to topic %s: %s", topic, rd_kafka_err2str(err));
// logError("Failed to produce topic!", true);
return 1;
}
else
{
// g_message("Produced event to topic %s: value = %12s", topic, value);
}
rd_kafka_poll(producer, 0);
}
// g_message("Flushing final messages..");
rd_kafka_flush(producer, 100);
if (rd_kafka_outq_len(producer) > 0)
{
// g_warning("%d message(s) were not delivered", rd_kafka_outq_len(producer));
// logError("Kafka message(s) were not delivered!", true);
return 1;
}
// g_message("%d events were produced to topic %s.", message_count, topic);
return 0;
}
我怀疑内存泄漏,因为程序在一段时间后被 Ubuntu 杀死了。 Valgrind 报告如下:
==19032== 92,178 bytes in 9 blocks are definitely lost in loss record 45 of 45
==19032== at 0x4848899: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==19032== by 0x4A37F15: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032== by 0x49FC06A: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032== by 0x49E48E3: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032== by 0x49F0B59: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032== by 0x49F0F79: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032== by 0x49B0D67: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032== by 0x4DC7934: start_thread (pthread_create.c:439)
==19032== by 0x4E58BF3: clone (clone.S:100)
这似乎是 librdkafka 库的内部问题。但我无法判断这是我的错误使用造成的还是库本身的错误造成的。
为了使泄漏报告有意义,您的应用程序需要彻底终止。
这意味着没有被杀死,只是干净地退出
main()
。