我有后台进程 - 读取 Excel 文件并保存该文件中的数据。我需要在后台进程中读取文件。但我有错误
InMemoryUploadedFile
。
我的代码
def create(self, validated_data):
company = ''
file_type = ''
email = ''
file = validated_data['file']
import_data.delay(file=file,
company=company,
file_type=file_type,
email=email)
我的方法看起来像
@app.task
def import_data(
file,
company,
file_type,
email):
// some code
但是我有错误
InMemoryUploadedFile
。
如何将文件发送到 cellery 而不会出现错误?
当您延迟任务时,Celery 将尝试序列化参数,在您的情况下包含文件。
文件,尤其是内存中的文件无法序列化。
因此,要解决这个问题,您必须保存文件并将文件路径传递给延迟函数,然后读取那里的文件并进行计算。
Celery 不知道如何序列化复杂对象,例如文件对象。然而,这个问题可以很容易地解决。我所做的是将文件编码/解码为其 Base64 字符串表示形式。这允许我直接通过 Celery 发送文件。
下面的示例展示了如何操作(我故意将每个转换分开放置,尽管这可以以更Pythonic的方式安排):
import base64
import tempfile
# (Django, HTTP server)
file = request.FILES['files'].file
file_bytes = file.read()
file_bytes_base64 = base64.b64encode(file_bytes)
file_bytes_base64_str = file_bytes_base64.decode('utf-8') # this is a str
# (...send string through Celery...)
# (Celery worker task)
file_bytes_base64 = file_bytes_base64_str.encode('utf-8')
file_bytes = base64.b64decode(file_bytes_base64)
# Write the file to a temporary location, deletion is guaranteed
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file = os.path.join(tmp_dir, 'something.zip')
with open(tmp_file, 'wb') as f:
f.write(file_bytes)
# Process the file
这对于大文件来说可能效率低下,但对于中小型临时文件来说却变得非常方便。
Msgpack 可以用作序列化器,将二进制数据发送到 Celery 任务:
# celery.py
app.conf.accept_content = ('json', 'msgpack')
# tasks.py
@app.task(serializer="msgpack", compression="gzip")
def import_data(file_content: bytes, company: str, file_type: str, email: str):
buffer = io.BytesIO(file_content)
...
# foo.py
import_data.delay(
file_content=file.read(),
company=company,
file_type=file_type,
email=email,
)