我正在尝试从http下载大文件,并在数据流上使用apache-beam python sdk(2.6.0)将这些文件上传到gcs。
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
gcs = GCSFileSystem(options)
logging.info("Writing to file {}".format(filename))
f = gcs.create(filename, compression_type=CompressionTypes.UNCOMPRESSED)
chunk_size = 512 * 1024
for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)):
if chunk:
f.write(chunk)
else:
break
if i % 10 == 0:
logging.info("Written {by} kb bytes into {filename}".format(
by=((i * chunk_size) / 1000), filename=filename))
logging.info("Closing file {}".format(filename))
f.close()
logging.info("Closed file {}".format(filename))
这种方法适用于小文件(~kb),尽管我正在努力使它适用于更大尺寸的文件(~Gb)。
记录表明它被卡在f.close()中,并且还没有文件写入GCS。我已经深入研究了代码,似乎GCSFileSystem实现了一个GcsBufferedWriter,它本身写入了一个由transfer.Upload获取的多处理管道。
关于可能导致此问题的原因,我没有太多线索,我怀疑连接/管道在重置过程中或在此过程中无声地中断(我正在下载的http服务器吞吐量非常低,我正在使用gevent并行化调用),或简单地transfer.Upload有一些性能问题。
当我检查机器统计数据时,我有更多的传入(20Mo / s)而不是流量(200ko / s),基本上没有磁盘写入,这让我想知道所有管道数据在哪里。
谢谢!
与此同时,我转而使用谷歌存储python客户端,这似乎确实有更好的性能,并按预期工作。虽然我正在观察一些瞬态错误
File "dataflow/make_training_chips.py", line 93, in process
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/client.py", line 71, in __init__
_http=_http)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/client.py", line 215, in __init__
_ClientProjectMixin.__init__(self, project=project)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/client.py", line 169, in __init__
project = self._determine_default(project)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/client.py", line 182, in _determine_default
return _determine_default_project(project)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/_helpers.py", line 179, in _determine_default_project
_, project = google.auth.default()
File "/usr/local/lib/python2.7/dist-packages/google/auth/_default.py", line 306, in default
raise exceptions.DefaultCredentialsError(_HELP_MESSAGE)
DefaultCredentialsError: Could not automatically determine credentials.
Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials
and re-run the application. For more information, please see
https://developers.google.com/accounts/docs/application-default-credentials.
[while running 'ParDo(GenerateTraingChips)']
这在我的工作人员中经常发生(尽管我以指数退避重试任务......)。