我在谷歌数据流上使用python beam,我的管道看起来像这样:
从文件中读取图像网址>>下载图像>>处理图像
问题是我不能让下载图像按步骤扩展,因为我的应用程序可能会被阻止从图像服务器。
这是一种可以扼杀这一步的方式吗?每分钟输入或输出。
谢谢。
一种可能,也许是幼稚,是在这一步骤中引入睡眠。为此,您需要知道可以同时运行的ParDo的最大实例数。如果autoscalingAlgorithm
设置为NONE
,您可以从numWorkers
和workerMachineType
(DataflowPipelineOptions)获得。准确地说,有效率将除以线程总数:desired_rate/(num_workers*num_threads(per worker))
。睡眠时间与有效率的倒数相反:
Integer desired_rate = 1; // QPS limit
if (options.getNumWorkers() == 0) { num_workers = 1; }
else { num_workers = options.getNumWorkers(); }
if (options.getWorkerMachineType() != null) {
machine_type = options.getWorkerMachineType();
num_threads = Integer.parseInt(machine_type.substring(machine_type.lastIndexOf("-") + 1));
}
else { num_threads = 1; }
Double sleep_time = (double)(num_workers * num_threads) / (double)(desired_rate);
然后你可以在受限制的Fn中使用TimeUnit.SECONDS.sleep(sleep_time.intValue());
或等价物。在我的示例中,作为一个用例,我想从公共文件中读取,解析空行并调用自然语言处理API,最大速率为1 QPS(我之前将desired_rate
初始化为1):
p
.apply("Read Lines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"))
.apply("Omit Empty Lines", ParDo.of(new OmitEmptyLines()))
.apply("NLP requests", ParDo.of(new ThrottledFn()))
.apply("Write Lines", TextIO.write().to(options.getOutput()));
限速Fn是ThrottledFn
,请注意sleep
函数:
static class ThrottledFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
// Instantiates a client
try (LanguageServiceClient language = LanguageServiceClient.create()) {
// The text to analyze
String text = c.element();
Document doc = Document.newBuilder()
.setContent(text).setType(Type.PLAIN_TEXT).build();
// Detects the sentiment of the text
Sentiment sentiment = language.analyzeSentiment(doc).getDocumentSentiment();
String nlp_results = String.format("Sentiment: score %s, magnitude %s", sentiment.getScore(), sentiment.getMagnitude());
TimeUnit.SECONDS.sleep(sleep_time.intValue());
Log.info(nlp_results);
c.output(nlp_results);
}
}
}
有了这个,我得到1元/秒的速率,如下图所示,并避免在使用多个工作人员时达到配额,即使请求没有真正分散(你可能同时获得8个请求,然后8个小时等等)。这只是一个测试,可能更好的实施将使用番石榴的rateLimiter。
如果管道使用自动缩放(THROUGHPUT_BASED
),那么它将更复杂并且应该更新工作器的数量(例如,Stackdriver Monitoring具有job/current_num_vcpus
度量标准)。其他一般考虑因素是使用虚拟GroupByKey控制并行ParDos的数量,或者使用splitIntoBundles将源拆分等等。我想看看是否还有其他更好的解决方案。