我正在研究旧的 MySQL 8 数据库和新的 PostgreSQL 数据库之间的复制。
这是双向复制,但我在将 MySQL 中的 LongText 列同步到 PostgreSQL 中的 Text 列时遇到问题。
我已经设置了有效的触发器和转换,数据正在两个数据库之间同步。
我遇到的问题是,当数据从 MySQL 发送到 PostgreSQL 时,数据存储为十六进制。
MySQL 值:
a.b.c.d
PostgreSQL 结果值:612E622E632E64
此外,当我更新 PostgreSQL 中的列时,当 SymmetricDS 尝试更新 MySQL 中的行时,我收到错误:
Incorrect string value: '\xDFM\x84\xDFM\x84...' for column 'foo'
由于我不知道 MySQL 列中到底存储了多少数据,因此我无法将此列更改为其他类型。
如何让 SymmetricDS 复制这些列并在两个数据库上以纯文本形式显示结果?
我的 SymmetricDS 触发器、路由器和转换配置:
insert into sym_trigger
(trigger_id, source_table_name, channel_id, last_update_time, create_time, sync_on_incoming_batch)
values
('foo_table_trigger', 'foo', 'my_channel', current_timestamp, current_timestamp, 1),
('bar_table_trigger', 'bar', 'my_channel', current_timestamp, current_timestamp, 1);
insert into sym_trigger_router
(trigger_id, router_id, initial_load_order, last_update_time, create_time)
values
('foo_table_trigger', 'source-2-target', 100, current_timestamp, current_timestamp),
('bar_table_trigger', 'target-2-source', 100, current_timestamp, current_timestamp);
insert into sym_transform_table
(transform_id, source_node_group_id, target_node_group_id, transform_point,
source_table_name,target_table_name, update_action, delete_action, transform_order,
column_policy, update_first,last_update_by, last_update_time, create_time)
values
('fooTransform', 'Test-source', 'Test-target', 'LOAD', 'foo', 'bar', 'UPD_ROW',
'DEL_ROW', 3, 'IMPLIED', 1, 'testing', current_timestamp, current_timestamp),
('barTransform', 'Test-target', 'Test-source', 'EXTRACT','bar', 'foo', 'UPD_ROW',
'DEL_ROW', 2, 'IMPLIED', 1, 'testing', current_timestamp, current_timestamp);
insert into sym_transform_column
(transform_id, include_on, target_column_name, source_column_name, pk, transform_type,
transform_expression, transform_order, last_update_time, last_update_by, create_time)
values
('fooTransform', '*', 'id', 'GUID', 1, 'copy', '', 1, current_timestamp, 'testing', current_timestamp),
('fooTransform', '*', 'myTextCol', 'myLongTextCol', 0, 'copy', '', 2, current_timestamp,'testing', current_timestamp),
('barTransform', '*', 'GUID', 'id', 1, 'copy', '', 1, current_timestamp, 'testing', current_timestamp),
('barTransform', '*', 'myLongTextCol', 'myTextCol', 0, 'copy', '', 2, current_timestamp,'accountSecurityTransform_column', current_timestamp);
我尝试将 PostgreSQL 中的 TEXT 列类型切换为 varchar,但结果相同。
我期望数据将保存为纯文本值而不是十六进制。
编辑: 以下是示例表,myStringCol 工作正常,因为 sym_transform_table 中的 SymmetricDS 配置设置为 IMPLIED。
这意味着它将自动同步匹配的列。当它不匹配时,需要将列转换添加到 sym_transform_column 中,这是我为 myLongTextCol 和 myTextCol 所做的。
-- MySQL table
CREATE TABLE foo
(
id INTEGER PRIMARY KEY AUTO_INCREMENT,
myStringCol VARCHAR(255) NOT NULL,
myLongTextCol LONGTEXT NOT NULL,
GUID CHAR(36),
INDEX GUID_index (GUID)
)
-- PostgreSQL table
CREATE TABLE bar
(
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
myStringCol VARCHAR(255) NOT NULL,
myTextCol TEXT NOT NULL
);
这些表通常会与默认设置同步,因此您的环境中存在某些问题会导致问题。 这里有一些需要检查的事项。
1. 检查 SymmetricDS 引擎属性,看看是否有
db.read.strings.as.bytes
的设置。 如果有,请继续将其删除。 重新启动 SymmetricDS 并运行“同步触发器”,以便它可以重建触发器以匹配。
2. 如果您可以控制表定义,请确保您在 MySQL 上使用 LONGTEXT。 如果服务器或数据库的默认字符集是“
binary
”,那么它会默默地将字符类型转换为二进制类型。 描述 MySQL 上的表以确认是否发生这种情况:
mysql> desc foo;
+---------------+----------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+----------------+------+-----+---------+----------------+
| id | int | NO | PRI | NULL | auto_increment |
| myStringCol | varbinary(255) | NO | | NULL | |
| myLongTextCol | longblob | NO | | NULL | |
| GUID | binary(36) | YES | MUL | NULL | |
+---------------+----------------+------+-----+---------+----------------+
如果它给了你LONGBLOB,那么你可以指定使用LONGTEXT所需的字符集。 当您创建或更改表时,请指定字符集,如下所示:
alter table foo modify column myLongTextCol longtext character set utf8mb4;
重新创建或更改表后,您需要在 SymmetricDS 上运行“同步触发器”,以便它可以重建触发器以匹配。
3. 如果您无法控制表定义,那么您将需要在字符和二进制之间转换数据。 将
transform_type
设置为“bsh
”(BeanShell)并使用以下命令从 MySQL(以十六进制捕获)转换为 Postgres:
import org.apache.commons.codec.binary.Hex;
return new String(Hex.decodeHex(currentValue));
使用以下命令从 Postgres(以 Base64 捕获)转换为 MySQL:
import org.apache.commons.codec.binary.Base64;
new String(Base64.encodeBase64(currentValue.getBytes()));