限制光束应用中的一个步骤

问题描述 投票:2回答:1

我在谷歌数据流上使用python beam,我的管道看起来像这样:

从文件中读取图像网址>>下载图像>>处理图像

问题是我不能让下载图像按步骤扩展,因为我的应用程序可能会被阻止从图像服务器。

这是一种可以扼杀这一步的方式吗?每分钟输入或输出。

谢谢。

python google-cloud-dataflow apache-beam dataflow
1个回答
0
投票

一种可能,也许是幼稚,是在这一步骤中引入睡眠。为此,您需要知道可以同时运行的ParDo的最大实例数。如果autoscalingAlgorithm设置为NONE,您可以从numWorkersworkerMachineType(DataflowPipelineOptions)获得。准确地说,有效率将除以线程总数:desired_rate/(num_workers*num_threads(per worker))。睡眠时间与有效率的倒数相反:

Integer desired_rate = 1; // QPS limit

if (options.getNumWorkers() == 0) { num_workers = 1; }
else { num_workers = options.getNumWorkers(); }

if (options.getWorkerMachineType() != null) { 
    machine_type = options.getWorkerMachineType();
    num_threads = Integer.parseInt(machine_type.substring(machine_type.lastIndexOf("-") + 1));
}
else { num_threads = 1; }

Double sleep_time = (double)(num_workers * num_threads) / (double)(desired_rate);

然后你可以在受限制的Fn中使用TimeUnit.SECONDS.sleep(sleep_time.intValue());或等价物。在我的示例中,作为一个用例,我想从公共文件中读取,解析空行并调用自然语言处理API,最大速率为1 QPS(我之前将desired_rate初始化为1):

p
    .apply("Read Lines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"))
    .apply("Omit Empty Lines", ParDo.of(new OmitEmptyLines()))
    .apply("NLP requests", ParDo.of(new ThrottledFn()))
    .apply("Write Lines", TextIO.write().to(options.getOutput()));

限速Fn是ThrottledFn,请注意sleep函数:

static class ThrottledFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        // Instantiates a client
        try (LanguageServiceClient language = LanguageServiceClient.create()) {

          // The text to analyze
          String text = c.element();
          Document doc = Document.newBuilder()
              .setContent(text).setType(Type.PLAIN_TEXT).build();

          // Detects the sentiment of the text
          Sentiment sentiment = language.analyzeSentiment(doc).getDocumentSentiment();                 
          String nlp_results = String.format("Sentiment: score %s, magnitude %s", sentiment.getScore(), sentiment.getMagnitude());

          TimeUnit.SECONDS.sleep(sleep_time.intValue());

          Log.info(nlp_results);
          c.output(nlp_results);
        }
    }
}

有了这个,我得到1元/秒的速率,如下图所示,并避免在使用多个工作人员时达到配额,即使请求没有真正分散(你可能同时获得8个请求,然后8个小时等等)。这只是一个测试,可能更好的实施将使用番石榴的rateLimiter

enter image description here

如果管道使用自动缩放(THROUGHPUT_BASED),那么它将更复杂并且应该更新工作器的数量(例如,Stackdriver Monitoring具有job/current_num_vcpus度量标准)。其他一般考虑因素是使用虚拟GroupByKey控制并行ParDos的数量,或者使用splitIntoBundles将源拆分等等。我想看看是否还有其他更好的解决方案。

© www.soinside.com 2019 - 2024. All rights reserved.