Python 脚本中的 COPY 命令返回“AssertionError:尝试序列化缓冲区超出最大值”

问题描述 投票:0回答:1

当我想将 CSV 文件中的数据添加为分隔的“ ”时,我在 Python 脚本中使用了

COPY
Insert into
,但出现了一些错误,例如:

Java.lang.RuntimeException: java.lang.AssertionError: \
   Attempted serializing to buffer exceeded maximum of 65535 bytes: 1700053

代码:

def pd_to_cassandra_type(pd_series):
    if pd.api.types.is_integer_dtype(pd_series):
        return 'bigint'
    elif pd.api.types.is_float_dtype(pd_series):
        return 'double'
    else:
        return 'text'
    

def clean_column_name(name):
    name = re.sub(r'\W+', '_', name)
    if name[0].isdigit():
        name = '_' + name
    return name



def create_table(session, keyspace_name, table_name, column_definition):
    session.execute(f"""
    CREATE KEYSPACE IF NOT EXISTS {keyspace_name}
    WITH REPLICATION = {{ 'class': 'SimpleStrategy', 'replication_factor': 1 }}
    """)

    session.set_keyspace(keyspace_name)

    session.execute(f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        {column_definition},
        id UUID PRIMARY KEY
    )
    """)

def split_and_load_data():
    file_path = 'path/to/your/large_file.tab'
    keyspace_name = 'my_keyspace'
    table_name = "my_table"

    data = pd.read_csv(file_path, delimiter='\t')

    # Clean and reassign column names
    data.columns = [clean_column_name(col) for col in data.columns]

    user = "user_name"
    password = "1234"

    auth_provider = PlainTextAuthProvider(username=user, password=password)
    cluster       = Cluster(["192.12.4.5"], auth_provider=auth_provider, connect_timeout=1000, control_connection_timeout=1000)
    session       = cluster.connect()
    column_definitions = ", ".join([f"{col}" text for col in data.columns]

    # İlk sütun grubuna göre tabloyu oluşturun
    create_table(session, keyspace_name, table_name, column_definitions)
    copy_command = f"cqlsh {"192.12.4.5"} -e \"COPY {keyspace}.{table} ({', '.join(data.columns)}) FROM '{file_path}' WITH DELIMITER='\t' AND HEADER=TRUE;\""
    subprocess.run(copy_command, shell=True, check=True)

CSV 数据大小约为 500 MB。通常 Cassandra 应该支持加载数据,但我遇到了一些错误。

cassandra datastax-python-driver
1个回答
0
投票

如果没有 (1) 完整的错误消息和 (2) 完整的堆栈跟踪,则无法确定

AssertionError
是否与您正在执行的批量加载有关。

无论如何,请务必注意,cqlsh 实用程序本身是一个 Python 脚本,它在底层使用 Cassandra Python 驱动程序。它不是为在另一个 Python 脚本或应用程序中运行而设计的。

如果您想以编程方式导入 CSV 文件的内容,则不建议使用 cqlsh

COPY
命令来执行此操作。相反,您应该迭代 CSV 中的各个行,然后使用字段值作为绑定变量构建 CQL
INSERT
语句。干杯!

© www.soinside.com 2019 - 2024. All rights reserved.