Apache beam在Dataflow中得到与生成器对象不可下标相关的错误。

问题描述 投票:0回答:1

我试图在数据流中创建我的第一条管道,当我使用交互式光束运行器执行时,我有相同的代码运行,但在数据流中,我得到了所有类型的错误,这对我来说没有什么意义。

我从pub sub得到的json格式如下。

{"timestamp":1589992571906,"lastPageVisited":"https://kickassdataprojects.com/simple-and-complete-tutorial-on-simple-linear-regression/","pageUrl":"https://kickassdataprojects.com/","pageTitle":"Helping%20companies%20and%20developers%20create%20awesome%20data%20projects%20%7C%20Data%20Engineering/%20Data%20Science%20Blog","eventType":"Pageview","landingPage":0,"referrer":"direct","uiud":"31af5f22-4cc4-48e0-9478-49787dd5a19f","sessionId":322371}

这是我的流水线的代码。

from __future__ import absolute_import
import apache_beam as beam
#from apache_beam.runners.interactive import interactive_runner
#import apache_beam.runners.interactive.interactive_beam as ib
import google.auth
from datetime import timedelta
import json
from datetime import datetime
from apache_beam import window
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode, AfterCount
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
import argparse
import logging
from time import mktime

def setTimestamp(elem):
     from apache_beam import window
     yield window.TimestampedValue(elem, elem['timestamp'])

def createTuples(elem):
     yield (elem["sessionId"], elem)

class WriteToBigQuery(beam.PTransform):
  """Generate, format, and write BigQuery table row information."""
  def __init__(self, table_name, dataset, schema, project):
    """Initializes the transform.
    Args:
      table_name: Name of the BigQuery table to use.
      dataset: Name of the dataset to use.
      schema: Dictionary in the format {'column_name': 'bigquery_type'}
      project: Name of the Cloud project containing BigQuery table.
    """
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    #super(WriteToBigQuery, self).__init__()
    beam.PTransform.__init__(self)
    self.table_name = table_name
    self.dataset = dataset
    self.schema = schema
    self.project = project

  def get_schema(self):
    """Build the output table schema."""
    return ', '.join('%s:%s' % (col, self.schema[col]) for col in self.schema)

  def expand(self, pcoll):
    return (
        pcoll
        | 'ConvertToRow' >>
        beam.Map(lambda elem: {col: elem[col]
                               for col in self.schema})
        | beam.io.WriteToBigQuery(
            self.table_name, self.dataset, self.project, self.get_schema()))


class ParseSessionEventFn(beam.DoFn):
  """Parses the raw game event info into a Python dictionary.
  Each event line has the following format:
    username,teamname,score,timestamp_in_ms,readable_time
  e.g.:
    user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
  The human-readable time string is not used here.
  """
  def __init__(self):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    #super(ParseSessionEventFn, self).__init__()
    beam.DoFn.__init__(self)

  def process(self, elem):
          #timestamp = mktime(datetime.strptime(elem["timestamp"], "%Y-%m-%d %H:%M:%S").utctimetuple())
          elem['sessionId'] = int(elem['sessionId'])
          elem['landingPage'] = int(elem['landingPage'])
          yield elem

class AnalyzeSessions(beam.DoFn):
  """Parses the raw game event info into a Python dictionary.
  Each event line has the following format:
    username,teamname,score,timestamp_in_ms,readable_time
  e.g.:
    user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
  The human-readable time string is not used here.
  """
  def __init__(self):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    #super(AnalyzeSessions, self).__init__()
    beam.DoFn.__init__(self)

  def process(self, elem, window=beam.DoFn.WindowParam):
          sessionId = elem[0]
          uiud = elem[1][0]["uiud"]
          count_of_events = 0
          pageUrl = []
          window_end = window.end.to_utc_datetime()
          window_start = window.start.to_utc_datetime()
          session_duration = window_end - window_start
          for rows in elem[1]:
             if rows["landingPage"] == 1:
                    referrer = rows["refererr"]
             pageUrl.append(rows["pageUrl"])       

          return {
             "pageUrl":pageUrl,
             "eventType":"pageview",
             "uiud":uiud,
             "sessionId":sessionId,
             "session_duration": session_duration,
              "window_start" : window_start
               }

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument('--topic', type=str, help='Pub/Sub topic to read from')
    parser.add_argument(
          '--subscription', type=str, help='Pub/Sub subscription to read from')
    parser.add_argument(
          '--dataset',
          type=str,
          required=True,
          help='BigQuery Dataset to write tables to. '
          'Must already exist.')
    parser.add_argument(
          '--table_name',
          type=str,
          default='game_stats',
          help='The BigQuery table name. Should not already exist.')
    parser.add_argument(
          '--fixed_window_duration',
          type=int,
          default=60,
          help='Numeric value of fixed window duration for user '
          'analysis, in minutes')
    parser.add_argument(
          '--session_gap',
          type=int,
          default=5,
          help='Numeric value of gap between user sessions, '
          'in minutes')
    parser.add_argument(
          '--user_activity_window_duration',
          type=int,
          default=30,
          help='Numeric value of fixed window for finding mean of '
          'user session duration, in minutes')
    args, pipeline_args = parser.parse_known_args(argv)
    session_gap = args.session_gap * 60
    options = PipelineOptions(pipeline_args)
    # Set the pipeline mode to stream the data from Pub/Sub.
    options.view_as(StandardOptions).streaming = True

    options.view_as( StandardOptions).runner= 'DataflowRunner'
    options.view_as(SetupOptions).save_main_session = save_main_session
    p = beam.Pipeline(options=options)
    lines = (p
                | beam.io.ReadFromPubSub(
              subscription="projects/phrasal-bond-274216/subscriptions/rrrr")
             | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
             | beam.Map(lambda x: json.loads(x))
             | beam.ParDo(ParseSessionEventFn())
             )

    next = ( lines
                | 'AddEventTimestamps' >> beam.Map(setTimestamp)
                | 'Create Tuples' >> beam.Map(createTuples)
                | beam.Map(print) 
                | 'Window' >> beam.WindowInto(window.Sessions(15))
                | 'group by key' >> beam.GroupByKey()          
                | 'analyze sessions' >> beam.ParDo(AnalyzeSessions())         
                | 'WriteTeamScoreSums' >> WriteToBigQuery(
                args.table_name,
               {

               "uiud":'STRING',
               "session_duration": 'INTEGER',
               "window_start" : 'TIMESTAMP'
                          },
                options.view_as(GoogleCloudOptions).project)
             )

    next1 = ( next
             | 'Create Tuples' >> beam.Map(createTuples)
             | beam.Map(print) 

             )

    result = p.run()
#    result.wait_till_termination()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

在下面的代码中,当我试图在管道中创建元组时,我得到了以下错误'生成器'对象不可下标。我得到的是使用yield创建生成器对象,即使是return也没有用,它只是让我的流水线发呆。

apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables File "sessiontest1.py", line 23, in createTuples TypeError: 'generator' object is not subscriptable [while running 'generatedPtransform-148']

这是我用来执行管道的代码。

python3 sessiontest1.py     --project phrasal-bond-xxxxx     --region us-central1     --subscription projects/phrasal-bond-xxxxx/s
ubscriptions/xxxxxx     --dataset sessions_beam     --runner DataflowRunner     --temp_location gs://webevents/sessions --service_account_email-xxxxxxxx-
[email protected]  

任何关于这个问题的帮助将被感激。谢谢大家,又是第一次在数据流上工作,所以不知道我在这里错过了什么。

我之前得到的其他错误,现在已经解决了:-

a) 我从行名 beam.Map(lambda elem: window.TimestampedValue(elem, elem['timestamp']))中得到了widow没有定义的错误。

如果我去beam.window,那么它说beam没有定义,根据我的说法,beam应该由dataflow提供。

NameError: name 'window' is not defined [while running 'generatedPtransform-3820']

你只需要在函数本身导入模块。

python-3.x google-cloud-dataflow apache-beam dataflow beam
1个回答
0
投票

获得一个 'generator' object is not subscriptable 的错误表明,当你试图进行 elem["sessionID"],elem已经是一个生成器。前面的变换是setTimestamp,也是使用了 yield 因此输出一个生成器,作为元素传递给createTuples。

这里的解决方法是用以下方法实现setTimestamp和createTuples return 而不是 yield. 在下面的变换中返回你要接收的元素。


0
投票

你应该在你的代码中设置save_main_session = True。( 试着在你的代码中取消这一行的comment).你应该在你的代码中设置save_main_session = True. 点击这里查看更多关于NameError的信息。https:/cloud.google.comdataflowdocsresourcesfaq)。

© www.soinside.com 2019 - 2024. All rights reserved.