以下查询使用 Python 可以正常工作:
sql =[f"""LOAD DATA FROM S3 's3://{s3_bucket_name}/{s3_key}'
REPLACE
INTO TABLE {schema_name}.stg_tickets
CHARACTER SET utf8mb4
FIELDS
TERMINATED BY ','
ENCLOSED BY '"'
IGNORE 1 LINES
(id, url, @created_at, @updated_at, @external_id, @type, subject,
description, priority, status, recipient, requester_id,
submitter_id, assignee_id, @organization_id, @group_id,
collaborator_ids, follower_ids, @forum_topic_id, @problem_id,
@has_incidents, @is_public, @due_at, tags, sharing_agreement_ids,
fields, followup_ids, ticket_form_id, @allow_channelback,
generated_timestamp, via_channel, @via_source_to_name,
@via_source_to_address, @via_source_rel, satisfaction_rating_score,
@metric_set_id, @metric_set_url, @metric_set_created_at,
@metric_set_updated_at, metric_set_group_stations,
metric_set_assignee_stations, metric_set_reopens, metric_set_replies,
@metric_set_assignee_updated_at, @metric_set_requester_updated_at,
@metric_set_status_updated_at, @metric_set_initially_assigned_at,
@metric_set_assigned_at, @metric_set_solved_at, @metric_set_latest_comment_added_at,
@metric_set_reply_time_in_minutes_calendar, @metric_set_reply_time_in_minutes_business,
@metric_set_first_resolution_time_in_minutes_calendar,
@metric_set_first_resolution_time_in_minutes_business,
@metric_set_full_resolution_time_in_minutes_calendar,
@metric_set_full_resolution_time_in_minutes_business,
@metric_set_agent_wait_time_in_minutes_calendar,
@metric_set_agent_wait_time_in_minutes_business,
@metric_set_requester_wait_time_in_minutes_calendar,
@metric_set_requester_wait_time_in_minutes_business,
@metric_set_on_hold_time_in_minutes_calendar,
@metric_set_on_hold_time_in_minutes_business)
SET db_creation_date = current_timestamp, db_modify_date = current_timestamp,
has_incidents = case when @has_incidents = 'True' then 1 else 0 end,
is_public = case when @is_public = 'True' then 1 else 0 end,
allow_channelback = case when @allow_channelback = 'True' then 1 else 0 end,
type = NULLIF(@type, ''),
external_id = NULLIF(@external_id, ''),
forum_topic_id = NULLIF(@eforum_topic_id, ''),
problem_id = NULLIF(@problem_id, ''),
organization_id = NULLIF(@organization_id, ''),
metric_set_reply_time_in_minutes_calendar = NULLIF(@metric_set_reply_time_in_minutes_calendar, ''),
metric_set_reply_time_in_minutes_business = NULLIF(@metric_set_reply_time_in_minutes_business, ''),
metric_set_first_resolution_time_in_minutes_calendar = NULLIF(@metric_set_first_resolution_time_in_minutes_calendar, ''),
metric_set_first_resolution_time_in_minutes_business = NULLIF(@metric_set_first_resolution_time_in_minutes_business, ''),
metric_set_full_resolution_time_in_minutes_calendar = NULLIF(@metric_set_full_resolution_time_in_minutes_calendar, ''),
metric_set_full_resolution_time_in_minutes_business = NULLIF(@metric_set_full_resolution_time_in_minutes_business, ''),
metric_set_agent_wait_time_in_minutes_calendar = NULLIF(@metric_set_agent_wait_time_in_minutes_calendar, ''),
metric_set_agent_wait_time_in_minutes_business = NULLIF(@metric_set_agent_wait_time_in_minutes_business, ''),
metric_set_requester_wait_time_in_minutes_calendar = NULLIF(@metric_set_requester_wait_time_in_minutes_calendar, ''),
metric_set_requester_wait_time_in_minutes_business = NULLIF(@metric_set_requester_wait_time_in_minutes_business, ''),
metric_set_on_hold_time_in_minutes_calendar = NULLIF(@metric_set_on_hold_time_in_minutes_calendar, ''),
metric_set_on_hold_time_in_minutes_business = NULLIF(@metric_set_on_hold_time_in_minutes_business, ''),
via_source_to_name = NULLIF(@via_source_to_name, ''),
via_source_to_address = NULLIF(@via_source_to_address, ''),
via_source_rel = NULLIF(@via_source_rel, ''),
metric_set_id = NULLIF(@metric_set_id, ''),
metric_set_url = NULLIF(@metric_set_url, ''),
problem_id = NULLIF(@problem_id, ''),
created_at = NULLIF(@created_at, ''),
updated_at = NULLIF(@updated_at, ''),
due_at = NULLIF(@due_at, ''),
metric_set_created_at = NULLIF(@metric_set_created_at, ''),
metric_set_updated_at = NULLIF(@metric_set_updated_at, ''),
metric_set_assignee_updated_at = NULLIF(@metric_set_assignee_updated_at, ''),
metric_set_requester_updated_at = NULLIF(@metric_set_requester_updated_at, ''),
metric_set_status_updated_at = NULLIF(@metric_set_status_updated_at, ''),
metric_set_initially_assigned_at = NULLIF(@metric_set_initially_assigned_at, ''),
metric_set_assigned_at = NULLIF(@metric_set_assigned_at, ''),
metric_set_solved_at = NULLIF(@metric_set_solved_at, ''),
metric_set_latest_comment_added_at = NULLIF(@metric_set_latest_comment_added_at, '')
"""]
但是一旦我添加“ESCAPED BY”,
FIELDS
TERMINATED BY ','
ENCLOSED BY '"'
ESCAPED BY '\\'
IGNORE 1 LINES
它给了我这个错误
mysql.connector.errors.ProgrammingError: 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'True' then 1 else 0 end,
is_public = case when @is_public = ' at line 36
即使我没有更改这部分代码中的任何内容。我错过了什么?我需要在将数据加载到 MySQL 时处理特殊字符。
即使我删除了所有 SET 命令,我仍然会收到此错误:
mysql.connector.errors.ProgrammingError: 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ''\'
IGNORE 1 LINES
(id, url, created_at,' at line 8
查询如何运行:
def execute_statement(self, context: OpExecutionContext|AssetExecutionContext, statement: str, db_conn: mc.connection = None,
commit: bool = True, close_connection_afterwards: bool = True):
try:
conn = self.set_connection(context=context, db_conn=db_conn)
context.log.debug(f"Executing statement {statement}")
cursor = conn.cursor()
cursor.execute(statement)
if commit:
conn.commit()
except mc.Error as e:
context.log.error(e)
raise Exception(e)
finally:
if conn and close_connection_afterwards:
conn.close()
我无法重现,请参见下文(我现在应该投票结束吗?😉)
我创建了一个(小!)表,如下所示:
CREATE TABLE stg_tickets(
id INT,
url VARCHAR(200),
created_at TIMESTAMP,
updated_at TIMESTAMP,
external_id INT,
subject VARCHAR(200)
);
将代码更改为:
import mysql.connector
cnx = mysql.connector.connect(user='root', password='******',
host='******',
database='test')
filename = "/var/lib/mysql/mysql-files/stg_tickets.csv"
tablename = "test.stg_tickets"
sql = fr'''LOAD DATA FROM INFILE '{filename}'
REPLACE
INTO TABLE {tablename}
CHARACTER SET utf8mb4
FIELDS
TERMINATED BY ','
ENCLOSED BY '"'
ESCAPED BY '\\'
IGNORE 1 LINES
(id, url, created_at, updated_at, external_id, @type, subject
)
'''
print(sql)
cursor = cnx.cursor()
try:
print(sql)
cursor.execute(sql)
except mysql.connector.Error as err:
print("Error: {}".format(err))
cnx.commit()
cursor.close()
使用此文件
stg_tickets.csv
进行的测试表明,输入文件是否被 "
包围并没有什么区别。 (即使未指定 OPTIONALLY
,就像这里一样!):
1,"url","2024-06-07 10:01:00","2024-06-07 10:01:00",1,"x",test1
2,"url","2024-06-07 10:02:00","2024-06-07 10:02:00",2,"x",test2
3,"url","2024-06-07 10:03:00","2024-06-07 10:03:00",3,"x",test3
4,"url","2024-06-07 10:04:00","2024-06-07 10:04:00",4,"x",test4
MySQL 结果:
[test]> select * from stg_tickets;
+------+------+---------------------+---------------------+-------------+---------+
| id | url | created_at | updated_at | external_id | subject |
+------+------+---------------------+---------------------+-------------+---------+
| 1 | url | 2024-06-07 10:01:00 | 2024-06-07 10:01:00 | 1 | test1 |
| 2 | url | 2024-06-07 10:02:00 | 2024-06-07 10:02:00 | 2 | test2 |
| 3 | url | 2024-06-07 10:03:00 | 2024-06-07 10:03:00 | 3 | test3 |
| 4 | url | 2024-06-07 10:04:00 | 2024-06-07 10:04:00 | 4 | test4 |
+------+------+---------------------+---------------------+-------------+---------+
4 rows in set (0.00 sec)