我正在创建一个拓扑,它读取示例视频,对其进行一些转换并将其保存为输出,并且我正在使用 apachestorm 同时应用不同的过滤器。想象一下,在阅读完视频后,我将其帧发送到 2 个螺栓。一个应用高斯模糊效果,另一个锐化每个接收帧。锐化器和高斯模糊都成功地将它们的元组发送到同一目的地。现在我想合并从这两个螺栓发出的结果帧,但聚合器螺栓一次只接收一个元组值。我该如何修复它?
拓扑.java:
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import java.io.*;
public class Topology {
public static void main(String[] args) throws Exception {
// Create a log file for stdout and stderr
File logFile = new File("topology.log");
// Redirect stdout and stderr to the log file
PrintStream printStream = new PrintStream(new FileOutputStream(logFile));
try (printStream) {
System.setOut(printStream);
System.setErr(printStream);
Config config = new Config(); // Create Config instance for cluster configuration
config.setDebug(true);
TopologyBuilder builder = new TopologyBuilder(); // Create a TopologyBuilder
Spout spout = new Spout(); // Set the spout and bolts in the topology
BoltFrameAnalyzer boltFrameAnalyzer = new BoltFrameAnalyzer();
BoltAnalysisSaver boltAnalysisSaver = new BoltAnalysisSaver();
BoltImageProcessor boltImageProcessor = new BoltImageProcessor();
BoltGaussianBlur boltGaussianBlur = new BoltGaussianBlur();
BoltSharpener boltSharpener = new BoltSharpener();
BoltFrameAggregator boltFrameAggregator = new BoltFrameAggregator();
BoltOutputGenerator boltOutputGenerator = new BoltOutputGenerator();
builder.setSpout("spout", spout, 1); // Define the data flow by connecting the spout and bolts
builder.setBolt("bolt-frame-analyzer", boltFrameAnalyzer, 1).shuffleGrouping("spout");
builder.setBolt("bolt-analysis-saver", boltAnalysisSaver, 1).shuffleGrouping("bolt-frame-analyzer");
builder.setBolt("bolt-image-processor", boltImageProcessor, 1).shuffleGrouping("spout");
builder.setBolt("bolt-gaussian-blur", boltGaussianBlur, 1).shuffleGrouping("bolt-image-processor");
builder.setBolt("bolt-sharpener", boltSharpener, 1).shuffleGrouping("bolt-image-processor");
builder.setBolt("bolt-frame-aggregator", boltFrameAggregator, 1).shuffleGrouping("bolt-sharpener").shuffleGrouping("bolt-gaussian-blur");
builder.setBolt("bolt-output-generator", boltOutputGenerator, 1).shuffleGrouping("bolt-frame-aggregator");
try (LocalCluster cluster = new LocalCluster()) { // Use try-with-resources
cluster.submitTopology("Topology", config, builder.createTopology());
Thread.sleep(100000); // Adjust sleep time as needed
} // Automatic cluster shutdown when exiting the try block
}
// Close the PrintStream and log file
}
}
BoltGaussianBlur.java:
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.opencv.core.Mat;
import org.opencv.core.Size;
import org.opencv.imgcodecs.Imgcodecs;
import org.opencv.imgproc.Imgproc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
public class BoltGaussianBlur extends BaseBasicBolt {
private final String framesGaussianBlurFilePath;
public BoltGaussianBlur() {
try {
Properties prop = new Properties();
prop.load(new FileInputStream("config.ini"));
framesGaussianBlurFilePath = prop.getProperty("framesGaussianBlurFilePath");
} catch (IOException e) {
LOG.error("BoltImageProcessor: Error occurred while reading config.ini file", e);
throw new RuntimeException(e);
}
}
private static final Logger LOG = LoggerFactory.getLogger(BoltGaussianBlur.class);
public void execute(Tuple input, BasicOutputCollector collector) {
int gaussianBlurFrameNumber = input.getIntegerByField("frameNumber");
LOG.info("BoltGaussianBlur: Frame #" + gaussianBlurFrameNumber + " has been received successfully.");
Mat receivedFrame = (Mat) input.getValueByField("resizedFrame");
Mat gaussianBlurFrame = new Mat();
Imgproc.GaussianBlur(receivedFrame, gaussianBlurFrame, new Size(9, 9), 2, 2);
String gaussianBlurFileName = framesGaussianBlurFilePath + "/frame_" + gaussianBlurFrameNumber + "_gaussian_blur.png";
Imgcodecs.imwrite(gaussianBlurFileName, gaussianBlurFrame);
LOG.info("BoltGaussianBlur: Frame #" + gaussianBlurFrameNumber + " has been converted to Gaussian blur successfully.");
collector.emit(new Values(gaussianBlurFrame, gaussianBlurFrameNumber));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("gaussianBlurFrame", "gaussianBlurFrameNumber"));
}
}
BoltSharpener.java:
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.opencv.core.Core;
import org.opencv.core.Mat;
import org.opencv.core.Size;
import org.opencv.imgproc.Imgproc;
import org.opencv.imgcodecs.Imgcodecs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
public class BoltSharpener extends BaseBasicBolt {
private final String framesSharpenedFilePath;
public BoltSharpener() {
try {
Properties prop = new Properties();
prop.load(new FileInputStream("config.ini"));
framesSharpenedFilePath = prop.getProperty("framesSharpenedFilePath");
} catch (IOException e) {
LOG.error("BoltSharpener: Error occurred while reading config.ini file", e);
throw new RuntimeException(e);
}
}
private static final Logger LOG = LoggerFactory.getLogger(BoltSharpener.class);
public void execute(Tuple input, BasicOutputCollector collector) {
int sharpenedFrameNumber = input.getIntegerByField("frameNumber");
LOG.info("BoltSharpener: Frame #" + sharpenedFrameNumber + " has been received successfully.");
Mat receivedFrame = (Mat) input.getValueByField("resizedFrame");
Mat sharpenedFrame = new Mat();
Imgproc.GaussianBlur(receivedFrame, sharpenedFrame, new Size(0, 0), 10);
Core.addWeighted(receivedFrame, 1.5, sharpenedFrame, -0.5, 0, sharpenedFrame);
String sharpenedFileName = framesSharpenedFilePath + "/frame_" + sharpenedFrameNumber + "_sharpened.png";
Imgcodecs.imwrite(sharpenedFileName, sharpenedFrame);
LOG.info("BoltSharpener: Frame #" + sharpenedFrameNumber + " has been sharpened successfully.");
collector.emit(new Values(sharpenedFrame, sharpenedFrameNumber));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sharpenedFrame", "sharpenedFrameNumber"));
}
}
BoltFrame聚合器:
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.opencv.core.Core;
import org.opencv.core.Mat;
import org.opencv.imgcodecs.Imgcodecs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
public class BoltFrameAggregator extends BaseBasicBolt {
private final String framesAggregated;
public BoltFrameAggregator() {
try {
Properties prop = new Properties();
prop.load(new FileInputStream("config.ini"));
framesAggregated = prop.getProperty("framesAggregatedFilePath");
} catch (IOException e) {
LOG.error("BoltAggregator: Error occurred while reading config.ini file", e);
throw new RuntimeException(e);
}
}
private static final Logger LOG = LoggerFactory.getLogger(BoltFrameAggregator.class);
public void execute(Tuple input, BasicOutputCollector collector) {
Mat receivedGaussianBlurFrame = (Mat) input.getValueByField("gaussianBlurFrame");
int gaussianBlurFrameNumber = input.getIntegerByField("gaussianBlurFrameNumber");
Mat receivedSharpenedFrame = (Mat) input.getValueByField("sharpenedFrame");
int sharpenedFrameNumber = input.getIntegerByField("sharpenedFrameNumber");
int aggregatedFrameNumber = input.getIntegerByField("sharpenedFrameNumber");
Mat aggregatedFrame = new Mat();
if (sharpenedFrameNumber == gaussianBlurFrameNumber) {
Core.addWeighted(receivedSharpenedFrame, 1, receivedGaussianBlurFrame, 1, 0, aggregatedFrame);
String aggregatedFileName = framesAggregated + "/frame_" + sharpenedFrameNumber + "_aggregated.png";
Imgcodecs.imwrite(aggregatedFileName, aggregatedFrame);
LOG.info("BoltAggregator: Frame #" + sharpenedFrameNumber + " has been aggregated successfully.");
collector.emit(new Values(aggregatedFrame, aggregatedFrameNumber));
} else {
LOG.warn("BoltAggregator: Frame numbers for Gaussian blur and sharpened frames do not match.");
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("aggregatedFrame", "aggregatedFrameNumber"));
}
}
我检查了每个拼写错误,尝试为高斯模糊和锐化器的输出流指定一个名称。即使我尝试存储接收元组,看看在高斯模糊的前 4 个或 5 个元组之后,锐化器元组是否会到达,但不起作用。
错误日志:
[Thread-42-bolt-frame-aggregator-executor[3, 3]] INFO o.a.s.e.Executor - Processing received TUPLE: source: bolt-gaussian-blur:5, stream: default, id: {}, [Mat [ 720*1280*CV_8UC1, isCont=true, isSubmat=false, nativeObj=0x1fc231bd4a0, dataAddr=0x1fc258f0f60 ], 0] PROC_START_TIME(sampled): null EXEC_START_TIME(sampled): null for TASK: 3
...
[Thread-42-bolt-frame-aggregator-executor[3, 3]] ERROR o.a.s.u.Utils - Async loop died!
java.lang.RuntimeException: java.lang.IllegalArgumentException: sharpenedFrame does not exist
at org.apache.storm.executor.Executor.accept(Executor.java:301) ~[storm-client-2.6.0.jar:2.6.0]
at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm-client-2.6.0.jar:2.6.0]
at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) ~[storm-client-2.6.0.jar:2.6.0]
at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) ~[storm-client-2.6.0.jar:2.6.0]
at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) ~[storm-client-2.6.0.jar:2.6.0]
at org.apache.storm.utils.Utils$1.run(Utils.java:398) ~[storm-client-2.6.0.jar:2.6.0]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: java.lang.IllegalArgumentException: sharpenedFrame does not exist
at org.apache.storm.tuple.Fields.fieldIndex(Fields.java:98) ~[storm-client-2.6.0.jar:2.6.0]
at org.apache.storm.tuple.TupleImpl.fieldIndex(TupleImpl.java:101) ~[storm-client-2.6.0.jar:2.6.0]
at org.apache.storm.tuple.TupleImpl.getValueByField(TupleImpl.java:161) ~[storm-client-2.6.0.jar:2.6.0]
at BoltFrameAggregator.execute(BoltFrameAggregator.java:33) ~[classes/:?]
at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:48) ~[storm-client-2.6.0.jar:2.6.0]
at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.6.0.jar:2.6.0]
at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.6.0.jar:2.6.0]
... 6 more
如错误消息所示,您的变量
sharpenedFrame
似乎不存在。那么您可以仔细检查它是否正确初始化吗?您可能需要应用广泛的日志记录 - 或者在本地计算机上使用伪集群,这使得调试变得更加容易。