如何编写Cloud Function来接收,解析和发布PubSub消息?

问题描述 投票:0回答:1

这可以被视为this thread的后续行动,但我需要更多的帮助来推动事情的发展。希望有人可以查看下面我的尝试并提供进一步的指导。

总而言之,我需要一个云功能

  1. 由在主题A中发布的PubSub消息触发(这可以在UI中完成)。
  2. 在“推送”PubSub主题A中读取杂乱的对象更改通知消息
  3. “解析”它
  4. 在PubSub主题B中发布消息,其中原始消息ID作为数据,并且其他元数据(例如,文件名,大小,时间)作为属性。

. 1:

凌乱的对象更改通知的示例:

\ n“kind”:“storage#object”,\ n“id”:“bucketcfpubsub / test.txt / 1544681756538155”,\ n“”selfLink“:”https://www.googleapis.com/storage/v1/b/bucketcfpubsub/o/test.txt“,\ n”name“:”test.txt“, \ n“bucket”:“bucketcfpubsub”,\ n“generation”:“1544681756538155”,\ n“metageneration”:“1”,\ n“contentType”:“text / plain”,\ n“timeCreated”:“2018 -12-13T06:15:56.537Z“,\ n”更新“:”2018-12-13T06:15:56.537Z“,\ n”storageClass“:”STANDARD“,\ n”timeStorageClassUpdated“:”2018-12 -13T06:15:56.537Z“,\ n”size“:”1938“,\ n”md5Hash“:”sDSXIvkR / PBg4mHyIUIvww ==“,\ n”mediaLink“:”https://www.googleapis.com/download/storage/v1/b/bucketcfpubsub/o/test.txt?generation=1544681756538155&alt=media“,\ n”crc32c“:” UDhyzw ==“,\ n”etag“:”CKvqjvuTnN8CEAE =“\ n} \ n

为了澄清,这是一个带有空白“数据”字段的消息,上面的所有信息都在属性对中(如“属性名称”:“属性数据”)?或者它只是一个填充到“数据”字段中的长字符串,没有“属性”?

. 2:

在上面的线程中,使用“拉”订阅。它比使用“推送”订阅更好吗?推送样品如下:

def create_push_subscription(project_id,
                             topic_name,
                             subscription_name,
                             endpoint):
    """Create a new push subscription on the given topic."""
    # [START pubsub_create_push_subscription]
    from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"
    # TODO subscription_name = "Your Pub/Sub subscription name"
    # TODO endpoint = "https://my-test-project.appspot.com/push"

    subscriber = pubsub_v1.SubscriberClient()
    topic_path = subscriber.topic_path(project_id, topic_name)
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    push_config = pubsub_v1.types.PushConfig(
        push_endpoint=endpoint)

    subscription = subscriber.create_subscription(
        subscription_path, topic_path, push_config)

    print('Push subscription created: {}'.format(subscription))
    print('Endpoint for subscription is: {}'.format(endpoint))
    # [END pubsub_create_push_subscription]

或者在此之后是否需要更多代码来接收消息?

此外,每次发布pubsub消息触发云功能时,这不会创建新订户吗?我应该在CF的末尾添加订阅删除代码,还是有更有效的方法来执行此操作?

. 3:

接下来,要解析代码,此示例代码执行以下几个属性:

def summarize(message):
    # [START parse_message]
    data = message.data
    attributes = message.attributes

    event_type = attributes['eventType']
    bucket_id = attributes['bucketId']
    object_id = attributes['objectId']

这将与我在1的上述通知一起使用吗?

. 4:

如何分离topic_name?步骤1和2使用主题A,而此步骤是发布到主题B.是否像在下面的代码示例中重写topic_name一样简单?

# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # Add two attributes, origin and username, to the message
    publisher.publish(
        topic_path, data, origin='python-sample', username='gcp')

print('Published messages with custom attributes.')

从我获得大部分示例代码的源(除了上面的线程):python-docs-samples。将上述代码示例调整和串联在一起会产生有用的代码吗?或者我还会遗漏像“import ****”这样的东西吗?

google-cloud-platform google-cloud-storage google-cloud-functions google-cloud-pubsub google-cloud-python
1个回答
2
投票

您不应尝试手动创建在云功能中运行的订阅服务器。相反,请按照文档here设置云功能,通过传递--trigger-topic命令行参数,将调用发送到给定主题的所有消息。

解决您的其他一些问题:

“我应该在CF的末尾添加订阅删除代码” - 订阅是与特定积压消息相对应的长期资源。如果在云功能结束时创建并删除订阅,则不会接收在不存在时发送的消息。

“我如何分离topic_name” - 此示例中的“topic_name”指的是格式化为projects/project_id/topics/topic_name的字符串的最后一部分,它将在创建后出现在云控制台的this page中。

© www.soinside.com 2019 - 2024. All rights reserved.