在我们的 storm 1.0.2 应用程序中,我们面临内存不足异常。在调试时,我们看到 Kafka spout 向螺栓发出了太多消息。螺栓以接近 4.0 的容量运行。那么有没有一种方法可以在风暴中启用背压,以便喷口根据螺栓的容量排放。尝试启用 topology.backpressure.enable 为 true 但遇到了这个问题 https://issues.apache.org/jira/browse/STORM-1949。我们正在使用开箱即用的 KafkaSpout 实现,并为我们的螺栓扩展 BaseRichBolt。我们的 DAG 是线性的。
可以通过在拓扑配置中设置maxSpoutPending值来处理KafkaSpout的背压,
Config config = new Config();
config.setMaxSpoutPending(200);
config.setMessageTimeoutSecs(100);
StormSubmitter.submitTopology("testtopology", config, builder.createTopology());
maxSpoutPending 是给定时间拓扑中可以等待确认的元组数。设置此属性,将通知 KafkaSpout 不再使用来自 Kafka 的任何数据,除非未确认的元组计数小于 maxSpoutPending 值。
看这条评论:
/** * 指定在使用 {@link ProcessingGuarantee} 时 spout 是否应该要求 Storm 跟踪发出的元组 * {@link ProcessingGuarantee#AT_LEAST_ONCE}。在提供至少一次保证时,spout 将始终跟踪发出的元组 * 无论此设置如何。默认情况下此设置为 false。 * *
即使在可靠性不是问题的情况下,启用跟踪也很有用,因为它允许 * {@link Config#TOPOLOGY_MAX_SPOUT_PENDING} 产生效果,并启用一些喷口指标(例如完整延迟) *否则将被禁用。 * * @param tupleTrackingEnforced 如果 Storm 应该跟踪发出的元组,则为 true,否则为 false */
我认为这个配置可以帮助:
成分:
id: "kafkaRecordTranslator" 类名:“ir.zarebin.fse.stormcrawler.spout.KafkaRecordTranslator”
id: "spoutConfigBuilder"
类名:“org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder” 构造函数参数:
属性: