我正在尝试使用kafka-clients api运行kafka生产者。我有多个生产者使用单独的线程运行,每个生成器都尝试将数据写入kafka。问题是当我增加并行运行的线程数时,我从kafka获得了一个中断的异常。例如,如果我并行运行20个线程,它不会抛出任何异常,但是当我并行运行100个线程时,我得到以下异常:
线程“pool-910-thread-1”中的异常org.apache.kafka.common.errors.InterruptException:java.lang.InterruptedException 在org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1154) 在org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1128) 在org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1107) 在com.t4e.kafka.producer.IEC104KafkaReadMessageProcessor.runProducer(IEC104KafkaReadMessageProcessor.java:45) 在com.t4e.iec104.connection.Iec60870ReadListener.writeToJsonFile(Iec60870ReadListener.java:707) 在com.t4e.iec104.connection.Iec60870ReadListener.newASdu(Iec60870ReadListener.java:75) 在org.openmuc.j60870.Connection $ ConnectionReader $ 1.run(Connection.java:143) 在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624) 在java.lang.Thread.run(Thread.java:748) 引起:java.lang.InterruptedException at java.lang.Object.wait(Native Method) 在java.lang.Thread.join(Thread.java:1260) 在org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1152)
这是我的制作人代码:
private static final Logger logger = LoggerFactory.getLogger(IEC104KafkaReadMessageProcessor.class);
static KafkaProducerConfigReader kafkaConfig = new KafkaProducerConfigReader();
static String newLine = System.getProperty("line.separator");
/**
* @param message
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
public static synchronized RecordMetadata runProducer(String message) throws InterruptedException, ExecutionException {
Producer<Long, String> producer = ProducerCreator.createProducer();
ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(kafkaConfig.getTopicName(), message);
try {
RecordMetadata metadata = producer.send(record).get();
logger.info(("Record sent with key " + " to partition " + metadata.partition() + " with offset "
+ metadata.offset()));
return metadata;
} catch (ExecutionException e) {
logger.error("ExecutionException : Error in sending record to kafka");
throw new ExecutionException(e);
} catch (InterruptedException e) {
logger.error("InterruptedException : Error in sending record" + newLine);
throw new InterruptedException();
} finally {
logger.info(" Closing Kafka producer ");
producer.close();
}
}
问题可能不在于您正在创建生产者的代码中。查看您分享的日志,
线程“pool-910-thread-1”中的异常org.apache.kafka.common.errors.InterruptException:java.lang.InterruptedException
我可以看到有910个线程池处于活动状态。创建那么多个池而不是创建保持单个池的线程可能是个更好的主意。您可能希望查看创建线程池并控制它的位置。
我怀疑代码中的线程泄漏会导致此中断异常。