导入WriteToDatastore时出错(Apache Beam / Google DataFlow)

问题描述 投票:2回答:1

我正在尝试使用Apache Beam管道将实体写入Google Cloud Datastore。为了测试,我在使用Apache Beam instructions设置的本地Python 2.7虚拟环境中执行此操作。编码在本地的Jupyter笔记本中完成。这是我正在尝试的伪代码:

import apache_beam as beam
#from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud import datastore
from google.cloud.datastore.entity import Entity

# projectId will be taken from the environment
storage = datastore.Client()

# The kind for the new entity
gds_entity_kind = 'test_entity'


class PrintFn(beam.DoFn):
    def process(self, element):
        print (element)
        return None


def create_entity(entity_id, name):
    key = storage.key(gds_entity_kind, int(entity_id))

    entity = Entity(key=key)
    entity.update({
        'name': name
    })

    return entity


lines = [
    "'0815';'entity A'",
    "'4711';'entity B'"
]

with beam.Pipeline() as p:
    (p
     | 'read lines' >> beam.Create(lines)
     | 'rows to columns' >> beam.Map(lambda v: v.split(';'))
     | 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
     | 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
#     | 'write to datastore' >> WriteToDatastore()
     | 'debug print' >> beam.ParDo(PrintFn())
    )

我发现this posting有类似的问题,但有些答案似乎与我的情况无关?!我发现问题似乎与import语句有关

来自google.cloud导入数据存储区

和---关联

来自apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore

如果我重新启动内核并仅导入WriteToDataStore,那么我不会收到任何错误。如果我尝试导入两者,我会收到此错误。任何帮助表示赞赏!

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-10-45d84b2c60ba> in <module>()
      1 import apache_beam as beam
----> 2 from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
      3 from google.cloud import datastore
      4 from google.cloud.datastore.entity import Entity
      5 

/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/datastore/v1/datastoreio.py in <module>()
     21 import time
     22 
---> 23 from apache_beam.io.gcp.datastore.v1 import helper
     24 from apache_beam.io.gcp.datastore.v1 import query_splitter
     25 from apache_beam.io.gcp.datastore.v1 import util

/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/datastore/v1/helper.py in <module>()
     34 # pylint: disable=wrong-import-order, wrong-import-position
     35 try:
---> 36   from google.cloud.proto.datastore.v1 import datastore_pb2
     37   from google.cloud.proto.datastore.v1 import entity_pb2
     38   from google.cloud.proto.datastore.v1 import query_pb2

/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/cloud/proto/datastore/v1/datastore_pb2.py in <module>()
     15 
     16 from google.api import annotations_pb2 as google_dot_api_dot_annotations__pb2
---> 17 from google.cloud.proto.datastore.v1 import entity_pb2 as google_dot_cloud_dot_proto_dot_datastore_dot_v1_dot_entity__pb2
     18 from google.cloud.proto.datastore.v1 import query_pb2 as google_dot_cloud_dot_proto_dot_datastore_dot_v1_dot_query__pb2
     19 

/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/cloud/proto/datastore/v1/entity_pb2.py in <module>()
     26   serialized_pb=_b('\n,google/cloud/proto/datastore/v1/entity.proto\x12\x13google.datastore.v1\x1a\x1cgoogle/api/annotations.proto\x1a\x1cgoogle/protobuf/struct.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x18google/type/latlng.proto\"7\n\x0bPartitionId\x12\x12\n\nproject_id\x18\x02 \x01(\t\x12\x14\n\x0cnamespace_id\x18\x04 \x01(\t\"\xb7\x01\n\x03Key\x12\x36\n\x0cpartition_id\x18\x01 \x01(\x0b\x32 .google.datastore.v1.PartitionId\x12\x32\n\x04path\x18\x02 \x03(\x0b\x32$.google.datastore.v1.Key.PathElement\x1a\x44\n\x0bPathElement\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12\x0c\n\x02id\x18\x02 \x01(\x03H\x00\x12\x0e\n\x04name\x18\x03 \x01(\tH\x00\x42\t\n\x07id_type\"8\n\nArrayValue\x12*\n\x06values\x18\x01 \x03(\x0b\x32\x1a.google.datastore.v1.Value\"\xf1\x03\n\x05Value\x12\x30\n\nnull_value\x18\x0b \x01(\x0e\x32\x1a.google.protobuf.NullValueH\x00\x12\x17\n\rboolean_value\x18\x01 \x01(\x08H\x00\x12\x17\n\rinteger_value\x18\x02 \x01(\x03H\x00\x12\x16\n\x0c\x64ouble_value\x18\x03 \x01(\x01H\x00\x12\x35\n\x0ftimestamp_value\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x00\x12-\n\tkey_value\x18\x05 \x01(\x0b\x32\x18.google.datastore.v1.KeyH\x00\x12\x16\n\x0cstring_value\x18\x11 \x01(\tH\x00\x12\x14\n\nblob_value\x18\x12 \x01(\x0cH\x00\x12.\n\x0fgeo_point_value\x18\x08 \x01(\x0b\x32\x13.google.type.LatLngH\x00\x12\x33\n\x0c\x65ntity_value\x18\x06 \x01(\x0b\x32\x1b.google.datastore.v1.EntityH\x00\x12\x36\n\x0b\x61rray_value\x18\t \x01(\x0b\x32\x1f.google.datastore.v1.ArrayValueH\x00\x12\x0f\n\x07meaning\x18\x0e \x01(\x05\x12\x1c\n\x14\x65xclude_from_indexes\x18\x13 \x01(\x08\x42\x0c\n\nvalue_type\"\xbf\x01\n\x06\x45ntity\x12%\n\x03key\x18\x01 \x01(\x0b\x32\x18.google.datastore.v1.Key\x12?\n\nproperties\x18\x03 \x03(\x0b\x32+.google.datastore.v1.Entity.PropertiesEntry\x1aM\n\x0fPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12)\n\x05value\x18\x02 \x01(\x0b\x32\x1a.google.datastore.v1.Value:\x02\x38\x01\x42\x82\x01\n\x17\x63om.google.datastore.v1B\x0b\x45ntityProtoP\x01Z<google.golang.org/genproto/googleapis/datastore/v1;datastore\xaa\x02\x19Google.Cloud.Datastore.V1b\x06proto3')
     27   ,
---> 28   dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_type_dot_latlng__pb2.DESCRIPTOR,])
     29 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
     30 

/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/protobuf/descriptor.pyc in __new__(cls, name, package, options, serialized_pb, dependencies, public_dependencies, syntax, pool)
    827         # TODO(amauryfa): use the pool passed as argument. This will work only
    828         # for C++-implemented DescriptorPools.
--> 829         return _message.default_pool.AddSerializedFile(serialized_pb)
    830       else:
    831         return super(FileDescriptor, cls).__new__(cls)

TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/proto/datastore/v1/entity.proto":
  google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.PartitionId.namespace_id: "google.datastore.v1.PartitionId.namespace_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.PartitionId: "google.datastore.v1.PartitionId" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.Key.partition_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.path: "google.datastore.v1.Key.path" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement.id_type: "google.datastore.v1.Key.PathElement.id_type" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement.kind: "google.datastore.v1.Key.PathElement.kind" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement.id: "google.datastore.v1.Key.PathElement.id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement.name: "google.datastore.v1.Key.PathElement.name" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement: "google.datastore.v1.Key.PathElement" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key: "google.datastore.v1.Key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.ArrayValue.values" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.ArrayValue: "google.datastore.v1.ArrayValue" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.value_type: "google.datastore.v1.Value.value_type" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.null_value: "google.datastore.v1.Value.null_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.boolean_value: "google.datastore.v1.Value.boolean_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.integer_value: "google.datastore.v1.Value.integer_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.double_value: "google.datastore.v1.Value.double_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.timestamp_value: "google.datastore.v1.Value.timestamp_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.key_value: "google.datastore.v1.Value.key_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.string_value: "google.datastore.v1.Value.string_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.blob_value: "google.datastore.v1.Value.blob_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.geo_point_value: "google.datastore.v1.Value.geo_point_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Value.entity_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.array_value: "google.datastore.v1.Value.array_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.meaning: "google.datastore.v1.Value.meaning" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.exclude_from_indexes: "google.datastore.v1.Value.exclude_from_indexes" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value: "google.datastore.v1.Value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.key: "google.datastore.v1.Entity.key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.properties" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.key: "google.datastore.v1.Entity.PropertiesEntry.key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Entity.PropertiesEntry.value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry: "google.datastore.v1.Entity.PropertiesEntry" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity: "google.datastore.v1.Entity" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.PartitionId" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Key.path: "google.datastore.v1.Key.PathElement" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.Value" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.key_value: "google.datastore.v1.Key" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Entity" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.array_value: "google.datastore.v1.ArrayValue" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Value" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.key: "google.datastore.v1.Key" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
python google-cloud-platform google-cloud-dataflow python-2.x apache-beam
1个回答
2
投票

我发现使用带有Apache Beam(Google Cloud Dataflow)的Google Cloud Datastore的编程概念与默认的Datastore API不同。

您需要使用this example中给出的数据存储区帮助程序。有了这个,我就能够改变现在成功运行的代码。请注意实体的不同导入和不同的实体创建过程。

总之,它在完全限定的JSON表示法中创建实体,您在浏览Google云端控制台中的数据存储区时也会看到该表示法。我的原始代码用更简单的JSON创建了实体,通常在写入数据存储区时也可以理解。总的来说,我避免了导致错误的原始两个不同导入的不同依赖关系。

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper, PropertyFilter

#from google.cloud import datastore
#from google.cloud.datastore.entity import Entity


def create_entity(_id, name):
    entity = entity_pb2.Entity()
    kind = "test_entity"

    datastore_helper.add_key_path(entity.key, kind, _id)

    datastore_helper.add_properties(entity, {
            "name": unicode(name)
        })

    return entity

class PrintFn(beam.DoFn):
    def process(self, element):
        print (element)
        return None

project_id = 'your-gcp-project-id'

lines = [
    "'0815';'entity A'",
    "'4711';'entity B'"
]

with beam.Pipeline() as p:
    (p
     | 'read lines' >> beam.Create(lines)
     | 'rows to columns' >> beam.Map(lambda v: v.split(';'))
     | 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
     | 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
     | 'write to datastore' >> WriteToDatastore(project_id)
#     | 'debug print' >> beam.ParDo(PrintFn())
    )
© www.soinside.com 2019 - 2024. All rights reserved.