我有传感器数据到达 pub/sub (protobuf),它作为 python 字典插入到“pipeline_fstore”中。数据一次到达。 在管道中的“添加元数据...”步骤中,我正在读取 Firestore 数据以增强接收到的数据。 如何优化 Firestore 上的“读取”?有什么建议吗? 我稍后在“添加元数据”上添加的字段上“GroupByKey”,所以我无法提前完成。
问:当我们使用“start_bundle(self):”时,它仅适用于之前对数据进行分组的情况,对吗?在这种情况下,“捆绑”将是单独的行,对吧?
谢谢
fs_results = (
pipeline_fstore
| 'Add Timestamp' >> beam.Map(lambda elem: beam.window.TimestampedValue(elem, int(elem['measurement_time'])))
| 'Window into window_size Seconds' >> beam.WindowInto(beam.window.FixedWindows(window_size), allowed_lateness=beam.window.Duration(seconds=60))
| 'Add Metadata to the original data' >> beam.ParDo(add_metadata())
| 'Add Node Position' >> beam.ParDo(add_position())
| 'Extract siteId as Key' >> beam.Map(lambda elem: (elem.get("siteId", ""), elem))
| 'Group by siteId' >> beam.GroupByKey()
| 'Write to Firestore' >> beam.ParDo(FirestoreUpdateDoFn())
)
我能够使用“GroupByKey”并应用“start_bundle”和“finish_bundle”来优化对 firestore 的写入,以启动捆绑批次并写入最终批次。 现在,在 15 秒的窗口中,我看到批量大小约为 10-20 行,这与数据接收率一致。
关于我关于“start_bundle”的问题,根据我的测试,我只有在应用“GroupByKey”时才能有效地让窗口工作。仅具有“窗口”机制不足以看到当前的行为。
很高兴收到任何可以改进我的代码的反馈,并很高兴帮助其他人实现同样的目标。
class FirestoreUpdateDoFn(beam.DoFn):
def setup(self):
from firebase_admin import firestore
self.db = firestore.Client()
def teardown(self):
self.db.close()
def start_bundle(self):
date = datetime.now()
logging.info(f"Starting bundle - Firestore ({date})")
self.batch = self.db.batch()
self.entries = 0
def finish_bundle(self):
date = datetime.now()
logging.info(f"Finish bundle ({date}). batch size: {self.entries}\n")
try:
self.batch.commit()
except Exception as e:
logging.error(e)