我试图在Trident中创建一个小示例。目的是查看在失败情况下如何重播元组。下面是拓扑定义
Random rand = new Random();
Config config = new Config();
config.setDebug(true);
config.setNumWorkers(1);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", new RandomIntegerSpout())
.map((MapFunction) tridentTuple -> {
if ((tridentTuple.getLongByField("msgid") % 50 == 0) &&
(rand.nextInt(2) == 1)) {
System.out.println(String.format("Failed to process tuple %d", tridentTuple.getLongByField("msgid")));
throw new ReportedFailedException("Divisible by 50");
}
return new Values(tridentTuple.toArray());
})
.peek((Consumer) tridentTuple -> System.out.println(tridentTuple.getValues()));
我使用来自Storm-starter的RandomIntegerSpout
,该扩展程序扩展了BaseRichSpout
并仅生成随机数。然后,我应用一个MapFunction
,它每50个元组仅绘制一个随机数,并随机使该元组失败。
问题是,我没有得到任何ack
或fail
。
我玩着水嘴,并在调试模式下运行它,尝试了相同的示例输出,并使用标准的防风栓进行了尝试。锚定工作正常,只是不会被三叉戟调用。
我在v1.2.3和v2.0.0中使用LocalCluster和StormSubmitter复制了此问题。
以下是Storm UI的屏幕截图:对应于地图确认的螺栓将使元组失效,但不会被传播回喷嘴。
我以为三叉戟Mastercoord可能期望某种状态的持久性来实现拓扑,但是用某些persistentAggregate代替peek并没有帮助。通过对map
进行同样的操作,我还排除了each
中的错误。
通过检查来看代码几乎是微不足道的,我可能会误解有关Trident / Storm的一些基本知识。如果批处理完成了,我期望三叉戟会调用出水嘴的ack
方法是否是错误的?我意识到fail
中没有IBatchSpout
方法。 Trident如何处理批次的重放?
三叉嘴不会在单个元组级别确认或失败元组。相反,元组将被批处理。
三叉嘴通常看起来像this interface。
M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, M lastPartitionMeta);
想法是,Trident将设法跟踪批处理元组的ack / fail,然后如果批处理失败,它将要求喷嘴重复该批处理,如果没有,则根本不会。
请注意,这与标准Storm Spout有何不同。对于普通的喷口,框架基本上会告诉喷口“嘿,发出一些东西。由您自己决定发出什么。”,然后使用ack
和fail
方法来告诉喷口是否应该发出特定的元组。再次。
使用Trident时,喷口将被告知“嘿,(重新)发出批次编号x”,然后由喷口决定该批次中有哪些元组。使用此模型,不需要fail
方法。不过,某些Trident喷口将具有ack/succeed
方法,以允许喷口丢弃它可能与特定正在进行的批处理有关的任何状态。
对于包装的IRichSpouts
,有一些bridging code会将它们包装到Trident API中。基本上,包装器将调用nextTuple
直到具有完整批次,然后将ID存储在缓存中。如果要求包装器重新批处理,它将在喷嘴上调用fail
。否则,一旦批处理成功,它将调用ack
。
[我认为您没有在Storm UI中看到与此相关的任何内容的原因是IRichBolt
实际上并未在那里显示。相反,它是包装的,因此ack/fail
调用是在spout-spout
组件内部“在幕后”进行的。如果您想确定是否正在调用确认/失败,请尝试向ack/fail
的IRichSpout
方法中添加一些日志记录。