Google Pubsub 云存储订阅,可将消息合并到同一个 avro 文件中

问题描述 投票:0回答:1
  • 我有一个没有架构强制的 Google Pubsub 主题(我也不希望有架构强制)
  • 我有一个 Google Pubsub 云存储订阅设置,可以:
    • 将消息作为 Avro 文件刷新到 GCS(Google 云存储)
    • 文件名/路径模式
    • 对文件大小或时间要求的一些限制,我认为将多个消息合并到同一个 Avro 文件中

问题是,无论我如何设置此云存储订阅,我都会看到发布到 Pubsub 主题的每条消息都有 1 个 Avro 文件。我想以某种方式改变这一点,将消息合并到 1 个 Avro 文件中,或者可能根据时间/空间限制继续附加到同一个 Avro 文件。

# cloud storage subscription
gcloud pubsub subscriptions create projects/my-project/subscriptions/my-subscription \
  --topic=projects/my-project/topics/my-topic \
  --cloud-storage-bucket=my-bucket \
  --cloud-storage-file-prefix=my-prefix/ \
  --cloud-storage-file-suffix=_my-suffix.avro \
  --cloud-storage-max-bytes=2GB \
  --cloud-storage-max-duration=1m \
  --cloud-storage-output-format=avro \
  --cloud-storage-write-metadata \
  --dead-letter-topic=projects/my-project/topics/my-dlt \
  --max-delivery-attempts=5 \
  --project=my-project

然后我向 Pubsub 主题发送 2 条消息。

然后我等待 1 分钟(参见上面的

--cloud-storage-max-duration=1m
)。

然后我检查GCS存储桶的内容:

$ gsutil ls gs://my-bucket/my-prefix/
gs://my-bucket/my-prefix/2024-06-14T14:22:45+00:00_6db102_my-suffix.avro
gs://my-bucket/my-prefix/2024-06-14T14:22:46+00:00_0bc008_my-suffix.avro

但在这里我尝试将 Pubsub 消息合并到 1 个 avro 文件中。我原以为

--cloud-storage-max-bytes=2GB
--cloud-storage-max-duration=1m
会这样做,但他们没有。

我还尝试使用

--cloud-storage-file-datetime-format=YYYY-MM-DD
--cloud-storage-file-datetime-format=YYYY-MM-DD_hh
删除文件名的日期时间敏感性,但当我尝试创建云存储订阅时,它失败了,因为它需要完整的日期时间定义格式。

请参阅此处描述上述内容的文档:https://cloud.google.com/pubsub/docs/create-cloudstorage-subscription#file_names

我想要一个 GCS 存储桶结构,例如:

gs://my-bucket/my-prefix/YYYY-MM-DD_<uuid_1>_my-suffix.avro
gs://my-bucket/my-prefix/YYYY-MM-DD_<uuid_2>_my-suffix.avro
...
gs://my-bucket/my-prefix/YYYY-MM-DD_<uuid_N>_my-suffix.avro

或者也许将

./YYYY-DD-MM/
作为子文件夹。

理想情况下,我希望避免使用 Dataflow 或 Dataproc (Spark)。看起来云存储订阅可以很好地将数据刷新到 GCS 中。

这可能吗?我怎样才能做到这一点?

google-cloud-platform google-cloud-storage partitioning google-cloud-pubsub
1个回答
0
投票

不幸的是,由于 Pub/Sub 规模在后端的工作方式,您的消息分布在多个文件中是预期的行为。这在故障排除文档中进行了讨论。如果您需要云存储订阅将消息放入较少的并发文件中,您可以提交描述您的用例的功能请求,我们会考虑它。

同时,如果您需要将消息放入同一个文件中,则可能需要设置一个单独的进程来在写入文件后合并文件。实现此目的的一种方法是运行一个定期进程,该进程组成 Cloud Storage 对象

© www.soinside.com 2019 - 2024. All rights reserved.