我是新手,我对转换和动作的工作原理有基本了解(guide)。我正在文本文件的每行(基本上是段落)上尝试一些NLP操作。处理后,应将结果发送到服务器(REST Api)进行存储。该程序在yarn
模式下在10个节点的群集上作为Spark作业(使用spark-submit提交)运行。这是我到目前为止所做的。
...
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<String> processedLines = lines
.map(line -> {
// processed here
return result;
});
processedLines.foreach(line -> {
// Send to server
});
这有效,但是foreach
循环似乎是顺序的,似乎它不在工作节点上以分布式模式运行。我正确吗?
我尝试了以下代码,但是它不起作用。错误:java: incompatible types: inferred type does not conform to upper bound(s)
。显然是错误的,因为map
是转换,而不是动作。
lines.map(line -> { /* processing */ })
.map(line -> { /* Send to server */ });
我也尝试过take()
,但是它需要int
,并且processedLines.count()
的类型为long
。
processedLines.take(processedLines.count()).forEach(pl -> { /* Send to server */ });
数据巨大(大于100gb)。我想要的是处理和将其发送到服务器均应在工作程序节点上完成。 map
中的处理部分严格地在工作程序节点上进行。但是我如何将处理后的数据从工作节点发送到服务器,因为foreach
似乎在驱动程序中发生了顺序循环(如果我正确的话)。简而言之,如何在工作程序节点而非驱动程序中执行action
。
任何帮助将不胜感激。
foreach
是火花动作。它基本上采用了RDD的每个元素,并对该元素应用了功能。
foreach
在执行程序节点或工作程序节点上执行。它没有应用于驱动程序节点。请注意,在运行spark的本地执行模式下,驱动程序和执行程序节点都可以驻留在同一JVM上。
检查此内容以供参考foreach explanation
您的方法看起来不错,您尝试映射RDD的每个元素,然后将foreach
应用于每个元素。我之所以会花时间,是因为您要处理的数据大小(〜100GB)。
对此进行优化的一种方法是repartition
输入数据集。理想情况下,每个分区的大小应为128MB,以获得更好的性能结果。您将找到许多有关进行数据重新分区的最佳实践的文章。我建议您遵循它们,它将带来一些性能上的好处。
您可以想到的第二个优化是分配给每个执行者节点的内存。在进行火花调整时,它起着非常重要的作用。
您可以想到的第三个优化是,将网络呼叫批处理到服务器。当前,您正在为RDD的每个元素对服务器进行网络调用。如果您的设计允许您批处理这些网络调用,则可以在一个网络调用中发送多个元素。如果产生的延迟主要归因于这些网络调用,这也可能有所帮助。
我希望这会有所帮助。
您可以将代码替换为
processedLines.foreach(line -> {
使用任一解决方案
processedLines.foreachAsync(line -> {
// Send to server
}).get();
//To iterate batch wise I would go for this
processedLines.foreachPartitionAsync(lineIterator -> {
// Create your ouput client connection here
while (lineIterator.hasNext()){
String line = lineIterator.next();
}
}).get();
这两个函数都将创建一个Future对象或提交一个新线程或一个取消阻止的调用,这将自动向您的代码添加并行性。