如何使用python将确认从后台函数返回到pubsub

问题描述 投票:1回答:2

我正在设置一个新的GCP项目,以便在将CSV文件上传到存储桶后立即读取和解析该文件。在这种程度上,我创建了一个发布到pub / sub的触发器。 Pub / Sub本身将消息发送到后台函数。

一切似乎都很好,例如一旦文件被上传,触发器就会立即向Pubsub发送消息,然后发送到该功能。我还可以看到该功能的消息。

然而,问题是将Ack发送回pub / sub。在某处我读回发任何2xx状态应该完成工作(从队列中删除消息),但事实并非如此。结果,pubsub“认为”消息尚未传递,并一遍又一遍地发送消息。

def parse_data(data, context):


    if 'data' in data:
        args = base64.b64decode(data['data']).decode('utf-8')
        pubsub_message = args.replace('\n', ' ')
        properties = json.loads(pubsub_message)
        myBucket = validate_message(properties, 'bucket')   
        myFileName = validate_message(properties, "name")
        fileLocation = 'gs://'+myBucket+'/'+myFileName
        readAndEnhanceData(fileLocation)
        return 'OK', 200
    else:
        return 'Something went wrong, no data received'   

这是一个日志文件,显示正在连续调用该函数。

D  CSV_Parser_Raw_Data 518626734652287 Function execution took 72855 ms,
 finished with status: 'ok' CSV_Parser_Raw_Data 518626734652287

D  CSV_Parser_Raw_Data 518626708442766 Function execution took 131886 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518626708442766 

D  CSV_Parser_Raw_Data 518624470100006 Function execution took 65412 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518624470100006 

D  CSV_Parser_Raw_Data 518626734629237 Function execution took 68004 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518626734629237

D  CSV_Parser_Raw_Data 518623777839079 Function execution took 131255 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518623777839079 

D  CSV_Parser_Raw_Data 518623548622842 Function execution took 131186 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518623548622842 

D  CSV_Parser_Raw_Data 518623769252453 Function execution took 133981 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518623769252453 

所以我很高兴知道我在这里失踪了!即我怎么能打破这个循环呢?

*关于问题的更新*感谢@kamal强迫我睁开眼睛,在我完成任务时重新审视桶/主题等任务,重新审核所有内容并实现,我在一个子中使用了一个临时文件文件夹但在SAME桶中作为上传文件!那就是问题所在。 Finalize事件适用于任何在桶中创建的对象。所以Kamal是正确的多次上传正在发生!

如果您以相同的方式处理项目,请确保创建一个tmp文件夹并确保不向该文件夹添加任何触发器。


python google-cloud-platform google-cloud-functions google-cloud-pubsub
2个回答
1
投票

通常,Google Cloud Pub / Sub保证at least once delivery消息。这意味着它总是可以获得重复,尽管它们应该是相对罕见的。在您的情况下,不是一遍又一遍地处理相同的消息,它是不同的消息。诸如518626734652287之类的数字是消息ID。由于每次都不同,这意味着发布了多条消息。很可能发生了两件事之一:

  1. 文件将多次上传。
  2. GCS触发器已多次设置。你可以通过运行gsutil notification list gs://<bucket name>来检查这一点。

如果后者是问题,你会看到多个条目,例如:

projects/_/buckets/my-bucket/notificationConfigs/1
    Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic

projects/_/buckets/my-bucket/notificationConfigs/2
    Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic

projects/_/buckets/my-bucket/notificationConfigs/3
    Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic

您可以通过使用配置名称发出删除来删除额外通知,例如gsutil notification delete projects/_/buckets/my-bucket/notificationConfigs/2

值得注意的是,使用Cloud Functions和Pub / Sub,可以设置两种类型的订阅:由用户配置的订阅和由Cloud Functions本身配置的订阅。默认情况下,前者的确认截止日期为10秒。这意味着如果在10秒内未确认消息,则将重新传送该消息。对于后者,默认值为600秒。如果邮件的处理时间比此时间段长,则可能会发生重新传递。

您可以尝试减少处理邮件所需的时间,也可以增加确认截止日期。您可以使用gcloud工具增加确认截止日期:

gcloud pubsub subscriptions update <subscription name> --ack-deadline=180

这会将截止日期增加到3分钟。您也可以在Cloud Console Pub/Sub page中单击订阅,单击“编辑”,然后将“确认截止日期”更改为更大的值。

使用云功能,您无需返回HTTP状态。只有在直接使用push subscription时才需要这样做。


0
投票

你不能只从你的功能返回200。你需要实际“确认”pubsub消息。你没有显示实际从pubsub获取消息的代码,但我认为在该代码的某处,你有类似的东西:

queue = Queue.Queue()
message = queue.get()
parse_data(message.data, context)

这就是你需要回复消息的地方:

queue = Queue.Queue()
message = queue.get()
if parse_data(message.data, context):
    message.ack()
© www.soinside.com 2019 - 2024. All rights reserved.