Beam GCSFileSystem / GcsBufferedWriter性能

问题描述 投票:0回答:1

我正在尝试从http下载大文件,并在数据流上使用apache-beam python sdk(2.6.0)将这些文件上传到gcs。

            from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
            gcs = GCSFileSystem(options)
            logging.info("Writing to file {}".format(filename))
            f = gcs.create(filename, compression_type=CompressionTypes.UNCOMPRESSED)
            chunk_size = 512 * 1024
            for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)):
                if chunk:
                    f.write(chunk)
                else:
                    break
                if i % 10 == 0:
                    logging.info("Written {by} kb bytes into {filename}".format(
                        by=((i * chunk_size) / 1000), filename=filename))
            logging.info("Closing file {}".format(filename))
            f.close()
            logging.info("Closed file {}".format(filename))

这种方法适用于小文件(~kb),尽管我正在努力使它适用于更大尺寸的文件(~Gb)。

记录表明它被卡在f.close()中,并且还没有文件写入GCS。我已经深入研究了代码,似乎GCSFileSystem实现了一个GcsBufferedWriter,它本身写入了一个由transfer.Upload获取的多处理管道。

关于可能导致此问题的原因,我没有太多线索,我怀疑连接/管道在重置过程中或在此过程中无声地中断(我正在下载的http服务器吞吐量非常低,我正在使用gevent并行化调用),或简单地transfer.Upload有一些性能问题。

当我检查机器统计数据时,我有更多的传入(20Mo / s)而不是流量(200ko / s),基本上没有磁盘写入,这让我想知道所有管道数据在哪里。

谢谢!

google-cloud-dataflow apache-beam
1个回答
0
投票

与此同时,我转而使用谷歌存储python客户端,这似乎确实有更好的性能,并按预期工作。虽然我正在观察一些瞬态错误

File "dataflow/make_training_chips.py", line 93, in process
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/client.py", line 71, in __init__
_http=_http)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/client.py", line 215, in __init__
_ClientProjectMixin.__init__(self, project=project)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/client.py", line 169, in __init__
project = self._determine_default(project)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/client.py", line 182, in _determine_default
return _determine_default_project(project)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/_helpers.py", line 179, in _determine_default_project
_, project = google.auth.default()
File "/usr/local/lib/python2.7/dist-packages/google/auth/_default.py", line 306, in default
  raise exceptions.DefaultCredentialsError(_HELP_MESSAGE)
  DefaultCredentialsError: Could not automatically determine credentials. 
  Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials 
  and re-run the application. For more information, please see 
  https://developers.google.com/accounts/docs/application-default-credentials. 
  [while running 'ParDo(GenerateTraingChips)']

这在我的工作人员中经常发生(尽管我以指数退避重试任务......)。

© www.soinside.com 2019 - 2024. All rights reserved.