我正在编写一个处理电子邮件的Dataflow流管道(用Python)。这个想法是,当电子邮件到达时,会发布一个发布/订阅消息,触发检索电子邮件并对其进行处理的管道。发布/订阅消息的内容是无用的,因为我只是用它来触发管道。
我在最后一部分遇到了一些麻烦。我设法部署管道并将其连接到Pub / Sub主题,但是当我尝试测试它(发布消息)时,没有任何反应。
我想我必须设置一个“收集”消息并在某些时候发出它们的窗口,但我该怎么做呢?有没有办法说“每次收到新的发布/订阅消息时启动管道,忽略其内容”?
提前致谢!
您能否分享有关管道和电子邮件存储位置的更多信息?
我建议你看一下Beam中可用的一些样本管道。
如果您分享有关管道/代码的更多信息,我可以尝试与您进行迭代。
我终于设法解决了我的问题。问题是由于我为此目的定义的类导入了自定义管道选项。此导入阻止了管道被触发。删除它我终于设法触发管道。
对于那些可能需要它的人来说,有罪的进口是
from engine.user_options import UserOptions
而导入的课程是
import apache_beam as beam
class UserOptions(beam.options.pipeline_options.PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--env', type=str)