我正在使用 Google pubsublite。具有单个分区和一些消息的小型虚拟主题。 Python 客户端库。使用回调执行标准
SubscriberCluent.subscribe
。回调将消息放入队列中。当msg从队列中取出来消费时,它的ack
就会被调用。当我想停止时,我调用 subscribe_future.cancel(); subscriber_future.result()
并丢弃队列中未使用的消息。
假设我知道该主题有 30 条消息。我在停止之前吃了 10 个。然后我在同一订阅中重新启动一个新的
SubscriberClient
并接收消息。我希望从第 11 条消息开始,但我从第一条消息开始。所以珍贵的订阅者已经确认了前 10 个,但就好像服务器没有收到确认一样。
我想也许ack需要一些时间才能到达服务器。所以我等了2分钟才开始第二次订阅。没有帮助。
然后你认为也许订阅者对象管理 ack 调用,我需要在取消之前“刷新”它们,但我发现了另一个关于此的信息。
我错过了什么?
这是代码。如果您有 pubsublite 帐户,则在填写凭据后即可执行该代码。代码显示了两个问题,一是这个问题的主题;另一个是在这里
询问的# Using python 3.8
from __future__ import annotations
import logging
import pickle
import queue
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from typing import Union, Optional
from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite import AdminClient, PubSubMessage
from google.cloud.pubsublite import Reservation as GCPReservation
from google.cloud.pubsublite import Subscription as GCPSubscription
from google.cloud.pubsublite import Topic as GCPTopic
from google.cloud.pubsublite.cloudpubsub import (PublisherClient,
SubscriberClient)
from google.cloud.pubsublite.types import (BacklogLocation, CloudZone,
LocationPath,
ReservationPath, SubscriptionPath,
TopicPath,
)
from google.cloud.pubsublite.types import FlowControlSettings
from google.oauth2.service_account import Credentials
logging.getLogger('google.cloud').setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
FORMAT = '[%(asctime)s.%(msecs)03d %(name)s] %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')
class Account:
def __init__(self,
project_id: str,
region: str,
zone: str,
credentials: Credentials,
):
self.project_id = project_id
self.region = region
self.zone = CloudZone.parse(zone)
self.credentials = credentials
self.client = AdminClient(region=region, credentials=credentials)
def location_path(self) -> LocationPath:
return LocationPath(self.project_id, self.zone)
def reservation_path(self, name: str) -> ReservationPath:
return ReservationPath(self.project_id, self.region, name)
def topic_path(self, name: str) -> TopicPath:
return TopicPath(self.project_id, self.zone, name)
def subscription_path(self, name: str) -> SubscriptionPath:
return SubscriptionPath(self.project_id, self.zone, name)
def create_reservation(self, name: str, *, capacity: int = 32) -> None:
path = self.reservation_path(name)
reservation = GCPReservation(name=str(path),
throughput_capacity=capacity)
self.client.create_reservation(reservation)
# logger.info('reservation %s created', name)
def create_topic(self,
name: str,
*,
partition_count: int = 1,
partition_size_gib: int = 30,
reservation_name: str = 'default') -> Topic:
# A topic name can not be reused within one hour of deletion.
top_path = self.topic_path(name)
res_path = self.reservation_path(reservation_name)
topic = GCPTopic(
name=str(top_path),
partition_config=GCPTopic.PartitionConfig(count=partition_count),
retention_config=GCPTopic.RetentionConfig(
per_partition_bytes=partition_size_gib * 1024 * 1024 * 1024),
reservation_config=GCPTopic.ReservationConfig(
throughput_reservation=str(res_path)))
self.client.create_topic(topic)
# logger.info('topic %s created', name)
return Topic(name, self)
def delete_topic(self, name: str) -> None:
path = self.topic_path(name)
self.client.delete_topic(path)
# logger.info('topic %s deleted', name)
def get_topic(self, name: str) -> Topic:
return Topic(name, self)
class Topic:
def __init__(self, name: str, account: Account):
self.account = account
self.name = name
self._path = self.account.topic_path(name)
def create_subscription(self,
name: str,
*,
pos: str = None) -> Subscription:
path = self.account.subscription_path(name)
if pos is None or pos == 'beginning':
starting_offset = BacklogLocation.BEGINNING
elif pos == 'end':
starting_offset = BacklogLocation.END
else:
raise ValueError(
'Argument start only accepts one of two values - "beginning" or "end"'
)
Conf = GCPSubscription.DeliveryConfig
subscription = GCPSubscription(
name=str(path),
topic=str(self._path),
delivery_config=Conf(delivery_requirement=Conf.DeliveryRequirement.DELIVER_IMMEDIATELY))
self.account.client.create_subscription(subscription, starting_offset)
# logger.info('subscription %s created for topic %s', name, self.name)
return Subscription(name, self)
def delete_subscription(self, name: str) -> None:
path = self.account.subscription_path(name)
self.account.client.delete_subscription(path)
# logger.info('subscription %s deleted from topic %s', name, self.name)
def get_subscription(self, name: str):
return Subscription(name, self)
@contextmanager
def get_publisher(self, **kwargs):
with Publisher(self, **kwargs) as pub:
yield pub
class Publisher:
def __init__(self, topic: Topic, *, batch_size: int = 100):
self.topic = topic
self._batch_config = {
'max_bytes': 3 * 1024 * 1024, # 3 Mb; must be < 4 max_bytes
'max_latency': 0.05, # 50 ms
'max_messages': batch_size, # default is 1000
}
self._messages = queue.Queue(1000)
self._nomore = object()
self._pool = None
self._worker = None
def __enter__(self):
def _publish():
path = self.topic._path
dumps = pickle.dumps
nomore = self._nomore
q = self._messages
with PublisherClient(
credentials=self.topic.account.credentials,
per_partition_batching_settings=BatchSettings(**self._batch_config),
) as publisher:
while True:
x = q.get()
if x is nomore:
break
future = publisher.publish(path, dumps(x))
future.result()
self._pool = ThreadPoolExecutor(1)
self._worker = self._pool.submit(_publish)
return self
def __exit__(self, exc_type, exc_value, traceback):
self._messages.put(self._nomore)
self._worker.result()
self._pool.shutdown()
def put(self, data) -> None:
self._messages.put(data)
class Subscription:
def __init__(self, name: str, topic: Topic):
self.topic = topic
self.name = name
self._path = topic.account.subscription_path(name)
@contextmanager
def get_subscriber(self, *, backlog=None):
with Subscriber(self, backlog=backlog) as sub:
yield sub
class Subscriber:
def __init__(self, subscription: Subscription, backlog: int = None):
self.subscription = subscription
self._backlog = backlog or 100
self._cancel_requested: bool = None
self._messages: queue.Queue = None
self._pool: ThreadPoolExecutor = None
self._NOMORE = object()
self._subscribe_task = None
def __enter__(self):
self._pool = ThreadPoolExecutor(1).__enter__()
self._messages = queue.Queue(self._backlog)
messages = self._messages
def callback(msg: PubSubMessage):
logger.info('got %s', pickle.loads(msg.data))
messages.put(msg)
def _subscribe():
flowcontrol = FlowControlSettings(
messages_outstanding=self._backlog,
bytes_outstanding=1024 * 1024 * 10)
subscriber = SubscriberClient(credentials=self.subscription.topic.account.credentials)
with subscriber:
fut = subscriber.subscribe(self.subscription._path, callback, flowcontrol)
logger.info('subscribe sent to gcp')
while True:
if self._cancel_requested:
fut.cancel()
fut.result()
while True:
while not messages.empty():
try:
_ = messages.get_nowait()
except queue.Empty:
break
try:
messages.put_nowait(self._NOMORE)
break
except queue.Full:
continue
break
time.sleep(0.003)
self._subscribe_task = self._pool.submit(_subscribe)
return self
def __exit__(self, *args, **kwargs):
if self._pool is not None:
if self._subscribe_task is not None:
self._cancel_requested = True
while True:
z = self._messages.get()
if z is self._NOMORE:
break
self._subscribe_task.result()
self._subscribe_task = None
self._messages = None
self._pool.__exit__(*args, **kwargs)
self._pool = None
def get(self, timeout=None):
if timeout is not None and timeout == 0:
msg = self._messages.get_nowait()
else:
msg = self._messages.get(block=True, timeout=timeout)
data = pickle.loads(msg.data)
msg.ack()
return data
def get_account() -> Account:
return Account(project_id='--fill-in-proj-id--',
region='us-central1',
zone='us-central1-a',
credentials='--fill-in-creds--')
# This test shows that it takes extremely long to get the first messsage
# in `subscribe`.
def test1(account):
name = 'test-' + str(uuid.uuid4())
topic = account.create_topic(name)
try:
with topic.get_publisher() as p:
p.put(1)
p.put(2)
p.put(3)
sub = topic.create_subscription(name)
try:
with sub.get_subscriber() as s:
t0 = time.time()
logger.info('getting the first message')
z = s.get()
t1 = time.time()
logger.info(' got the first message')
print(z)
print('getting the first msg took', t1 - t0, 'seconds')
finally:
topic.delete_subscription(name)
finally:
account.delete_topic(name)
def test2(account):
name = 'test-' + str(uuid.uuid4())
topic = account.create_topic(name)
N = 30
try:
with topic.get_publisher(batch_size=1) as p:
for i in range(N):
p.put(i)
sub = topic.create_subscription(name)
try:
with sub.get_subscriber() as s:
for i in range(10):
z = s.get()
assert z == i
# The following block shows that the subscriber
# resets to the first message, not as expected
# that it picks up where the last block left.
with sub.get_subscriber() as s:
for i in range(10, 20):
z = s.get()
try:
assert z == i
except AssertionError as e:
print(z, '!=', i)
return
finally:
topic.delete_subscription(name)
finally:
account.delete_topic(name)
if __name__ == '__main__':
a = get_account()
try:
a.create_reservation('default')
except AlreadyExists:
pass
test1(a)
print('')
test2(a)
我找到了解决办法。在取消“订阅”未来之前,我需要睡一会儿以允许刷新(即发送出去)。特别是,
google.cloud.pubsublite.cloudpubsub.internal.make_subscriber._DEFAULT_FLUSH_SECONDS
(值 0.1)似乎是观看时间。需要睡得比这个长一点才能确定。
这是 google 包中的一个错误。 “取消”未来意味着放弃未处理的消息,而应发送已提交的确认。这个错误可能没有被注意到,因为重复的消息传递不是一个错误。
我无法重现您的问题,但我认为您应该检查有关使用 cloud pubsublite 的官方文档中处理该问题的方式。
这是我从接收消息示例中提取和更新的代码,它按预期工作,它将从精简主题获取消息并确认以避免再次获取。如果重新运行,我只会在有数据要提取的情况下获取数据。我添加了代码,以便您可以检查是否有与您的代码不同的内容。
consumer.py
from concurrent.futures._base import TimeoutError
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
FlowControlSettings,
SubscriptionPath,
MessageMetadata,
)
from google.cloud.pubsub_v1.types import PubsubMessage
# TODO(developer):
project_number = project-number
cloud_region = "us-central1"
zone_id = "a"
subscription_id = "sub-id"
timeout = 90
location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)
per_partition_flow_control_settings = FlowControlSettings(
messages_outstanding=1000,
bytes_outstanding=10 * 1024 * 1024,
)
def callback(message: PubsubMessage):
message_data = message.data.decode("utf-8")
metadata = MessageMetadata.decode(message.message_id)
print(f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}.")
message.ack()
# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:
streaming_pull_future = subscriber_client.subscribe(
subscription_path,
callback=callback,
per_partition_flow_control_settings=per_partition_flow_control_settings,
)
print(f"Listening for messages on {str(subscription_path)}...")
try:
streaming_pull_future.result(timeout=timeout)
except TimeoutError or KeyboardInterrupt:
streaming_pull_future.cancel()
assert streaming_pull_future.done()
我遇到你的情况的唯一方法是当我使用不同的订阅时。但在这方面,当不同的订阅从主题获取消息时,每个订阅都将收到相同的存储消息,如从 Lite 订阅接收消息中所述。
考虑一下: