当我想将 CSV 文件中的数据添加为分隔的“ ”时,我在 Python 脚本中使用了
COPY
和 Insert into
,但出现了一些错误,例如:
Java.lang.RuntimeException: java.lang.AssertionError: \
Attempted serializing to buffer exceeded maximum of 65535 bytes: 1700053
代码:
def pd_to_cassandra_type(pd_series):
if pd.api.types.is_integer_dtype(pd_series):
return 'bigint'
elif pd.api.types.is_float_dtype(pd_series):
return 'double'
else:
return 'text'
def clean_column_name(name):
name = re.sub(r'\W+', '_', name)
if name[0].isdigit():
name = '_' + name
return name
def create_table(session, keyspace_name, table_name, column_definition):
session.execute(f"""
CREATE KEYSPACE IF NOT EXISTS {keyspace_name}
WITH REPLICATION = {{ 'class': 'SimpleStrategy', 'replication_factor': 1 }}
""")
session.set_keyspace(keyspace_name)
session.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
{column_definition},
id UUID PRIMARY KEY
)
""")
def split_and_load_data():
file_path = 'path/to/your/large_file.tab'
keyspace_name = 'my_keyspace'
table_name = "my_table"
data = pd.read_csv(file_path, delimiter='\t')
# Clean and reassign column names
data.columns = [clean_column_name(col) for col in data.columns]
user = "user_name"
password = "1234"
auth_provider = PlainTextAuthProvider(username=user, password=password)
cluster = Cluster(["192.12.4.5"], auth_provider=auth_provider, connect_timeout=1000, control_connection_timeout=1000)
session = cluster.connect()
column_definitions = ", ".join([f"{col}" text for col in data.columns]
# İlk sütun grubuna göre tabloyu oluşturun
create_table(session, keyspace_name, table_name, column_definitions)
copy_command = f"cqlsh {"192.12.4.5"} -e \"COPY {keyspace}.{table} ({', '.join(data.columns)}) FROM '{file_path}' WITH DELIMITER='\t' AND HEADER=TRUE;\""
subprocess.run(copy_command, shell=True, check=True)
CSV 数据大小约为 500 MB。通常 Cassandra 应该支持加载数据,但我遇到了一些错误。
如果没有 (1) 完整的错误消息和 (2) 完整的堆栈跟踪,则无法确定
AssertionError
是否与您正在执行的批量加载有关。
无论如何,请务必注意,cqlsh 实用程序本身是一个 Python 脚本,它在底层使用 Cassandra Python 驱动程序。它不是为在另一个 Python 脚本或应用程序中运行而设计的。
如果您想以编程方式导入 CSV 文件的内容,则不建议使用 cqlsh
COPY
命令来执行此操作。相反,您应该迭代 CSV 中的各个行,然后使用字段值作为绑定变量构建 CQL INSERT
语句。干杯!