我的订户看起来像这样:
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import DeadLetterPolicy
dead_letter_policy = DeadLetterPolicy(
dead_letter_topic='dead_letter_topic',
max_delivery_attempts=5,
)
topic_path = subscriber.topic_path(PROJECT, TOPIC)
subscriber.create_subscription(sub_path, topic_path, dead_letter_policy=dead_letter_policy)
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION)
def callback(message):
print("Received message: {}".format(message))
print('Attempted:', message.delivery_attempt, 'times')
data = message.data.decode('utf-8')
data_d = json.loads(data)
if data_d["name"] == "some_file.json":
message.nack()
else:
message.ack()
收到的消息看起来像这样:
Received message: Message {
data: b'{\n "kind": "storage#object",\n "id": "...'
attributes: {
"bucketId": "sample_bucket",
...
}
}
Attempted: 12 times
显然,它尝试了5次以上,为什么我仍然可以从PubSub主题中提取此消息?这是订阅信息:
ackDeadlineSeconds: 10
deadLetterPolicy:
deadLetterTopic: projects/someproject/topics/dead_letter
maxDeliveryAttempts: 5
expirationPolicy:
ttl: 2678400s
messageRetentionDuration: 604800s
name: projects/someproject/subscriptions/new_sub
pushConfig: {}
topic: projects/someproject/topics/pubsub_sample