我正在从Azure存储资源管理器中的多个容器中复制内容,并将其写入一堆新容器,并希望了解最有效的方法。
现有的容器被称为cycling-input-1,cycling-input-2,....并且内容被写入称为cycling-output-1,cycling-output-2等的新容器。容器都是相同的类型(jpegs)。
下面的for循环创建一个带有所需后缀的新容器(循环输出),然后将相关循环输入容器中的blob复制到此处。我有大约30个容器,每个容器有1000个图像,所以不确定这是否是最好的方法(它很慢)。有没有更好的方法呢?
from azure.storage.blob.baseblobservice import BaseBlobService
account_name = 'name'
account_key = 'key'
# connect to the storage account
blob_service = BaseBlobService(account_name = account_name, account_key = account_key)
# get a list of the containers that need to be processed
cycling_containers = blob_service.list_containers(prefix = 'cycling-input')
# check the list of containers
for c in cycling_containers:
print(c.name)
# copy across the blobs from existing containers to new containers with a prefix cycling-output
prefix_of_new_container = 'cycling-output-'
for c in cycling_containers:
contname = c.name
generator = blob_service.list_blobs(contname)
container_index = ''.join(filter(str.isdigit, contname))
for blob in generator:
flag_of_new_container = blob_service.create_container("%s%s" % (prefix_of_new_container, container_index))
blob_service.copy_blob("%s%s" % (prefix_of_new_container, container_index), blob.name, "https://%s.blob.core.windows.net/%s/%s" % (account_name, contname, blob.name))
简单的方法是使用multiprocessing
模块将所有容器的这些blob并行复制到用input
替换output
命名的新容器中。
这是我的示例代码作为参考。
from azure.storage.blob.baseblobservice import BaseBlobService
import multiprocessing
account_name = '<your account name>'
account_key = '<your account key>'
blob_service = BaseBlobService(
account_name=account_name,
account_key=account_key
)
cycling_containers = blob_service.list_containers(prefix = 'cycling-input')
def putBlobCopyTriples(queue, num_of_workers):
for c in cycling_containers:
container_name = c.name
new_container_name = container_name.replace('input', 'output')
blob_service.create_container(new_container_name)
for blob in blob_service.list_blobs(container_name):
blob_url = "https://%s.blob.core.windows.net/%s/%s" % (account_name, container_name, blob.name)
queue.put( (new_container_name, blob.name, blob_url) )
for i in range(num_of_workers):
queue.put( (None, None, None) )
def copyWorker(lock, queue, sn):
while True:
with lock:
(new_container_name, blob_name, new_blob_url) = queue.get()
if new_container_name == None:
break
print(sn, new_container_name, blob_name, new_blob_url)
blob_service.copy_blob(new_container_name, blob_name, new_blob_url)
if __name__ == '__main__':
num_of_workers = 4 # the number of workers what you want, for example, 4 is my cpu core count
lock = multiprocessing.Lock()
queue = multiprocessing.Queue()
multiprocessing.Process(target = putBlobCopyTriples, args = (queue, num_of_workers)).start()
workers = [multiprocessing.Process(target = copyWorker, args = (lock, queue, i)) for i in range(num_of_workers)]
for p in workers:
p.start()
注意:除了环境中的cpu核心数,复制速度限制取决于您的IO带宽。工人数量不是越多越好。建议该数字等于或小于您的cpu计数或超线程计数。