云数据流流媒体,空闲时停止存钱?

问题描述 投票:2回答:1

我有一个用户可以投票的应用程序。

我希望我的应用程序能够扩展,因此我决定使用Cloud Dataflow聚合存储在Firestore中的计数器。

我已经设置了一个类型流的数据流作业,因此每当用户投票时,它都可以监听pubsub主题。

有时候我每天有数千个用户输入,有时候我有几百个......当有一段时间没有收到pubsub消息时,是否有任何“暂停”工作的解决方案?

目前,我的数据流工作总是在运行,我担心这会花费我很多钱。

如果有人可以帮我理解流媒体工作的结算,那么我们会感激不尽

这是我的Python管道:

def run(argv=None):
    # Config
    parser = argparse.ArgumentParser()
    # Output PubSub Topic
    parser.add_argument(
        '--output_topic', required=True)
    # Input PubSub Topic
    parser.add_argument(
        '--input_topic', required=True)

    known_args, pipeline_args = parser.parse_known_args(argv)

    # Pipeline options
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = True

    # Pipeline process
    with beam.Pipeline(options=pipeline_options) as p:

        # Counting votes
        def count_votes(contestant_votes):
            (contestant, votes) = contestant_votes
            return (contestant, sum(votes))

        # Format data to a fake object (used to be parsed by the CF)
        def format_result(contestant_votes):
            (contestant, votes) = contestant_votes
            return '{ "contestant": %s, "votes": %d }' % (contestant, votes)

        transformed = (p
                       | 'Receive PubSub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
                       .with_output_types(bytes)
                       | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
                       | 'Pair with one' >> beam.Map(lambda x: (x, 1))
                       | 'Apply window of time' >> beam.WindowInto(window.FixedWindows(30, 0))
                       | 'Group by contestant' >> beam.GroupByKey()
                       | 'Count votes' >> beam.Map(count_votes)
                       | 'Format to fake object string' >> beam.Map(format_result)
                       | 'Transform to PubSub base64 string' >> beam.Map(lambda x: x.encode('utf-8'))
                       .with_output_types(bytes))

        # Trigger a the output PubSub topic with the message payload
        transformed | beam.io.WriteToPubSub(known_args.output_topic)

        result = p.run()
        result.wait_until_finish()


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Job config

firebase google-cloud-firestore google-cloud-dataflow
1个回答
3
投票

回答您的成本问题:对于您目前使用的工人,它将花费您大约250美元(取决于您在一个月内的PD使用情况)。

目前没有时间迫使数据流“闲置”或扩展为0工人。你可以拥有的最低价格是1。

据说,您可以采取一些途径来尽量降低成本。

如果你的工人负荷不是很大,而你想要最简单的选择,你可以使用功能较弱的工人(n1-standard-1 [~USD $ 77.06]或n1-standard-2 [~USD $ 137.17])。 https://cloud.google.com/products/calculator/#id=3bbedf2f-8bfb-41db-9923-d3a5ef0c0250(如果你看到我添加了所有3种变体,使用我在你的照片中看到的430GB PD)。

如果您需要计算能力,可以切换到使用基于cron的数据流作业,如下所述:https://cloud.google.com/blog/products/gcp/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions。有了这个,您应该从订阅而不是主题中读取,这样您就可以保留消息,直到您开始工作。

© www.soinside.com 2019 - 2024. All rights reserved.