我正在使用 Apache Flink 1.19 和 java 17。
当我们对数据流进行分组时,它会不断更改和更新。因此,当我想将结果发送到 postgres 时,我首先尝试像往常一样将其转换为 DataStream,如下所示:
DataStream<Row> finalDataStream = TableEnv.toDataStream(finalTable);
通常之后我会使用映射函数将其转换为我的数据类型的元组。但问题恰恰发生在所示的代码中。它告诉我这一点:
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink '*anonymous_datastream_sink$3*' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[user_name, ip, city, log_time, end_time], select=[user_name, ip, city, log_time, end_time, SUM(upload) AS upload, SUM(download) AS download])
所以我无法将我的表转换为数据流,没关系,我会尝试不需要转换为数据流的方法:
//create temp view for final result
TableEnv.createTemporaryView("source", finalTable);
//registering the table in postgres
TableEnv.executeSql("""
CREATE TABLE sink_table {
columns ....
PRIMARY KEY (column ...) NOT ENFORCED
""" + ") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = '" + Entity.jdbcUrl + "'," +
" 'table-name' = 'sink_table'," +
" 'username' = '" + Entity.username + "'," +
" 'password' = '" + Entity.password + "'," +
" 'driver' = 'org.postgresql.Driver'" +
")"
);
// Insert into the sink table
TableEnv.executeSql("INSERT INTO sink_table " +
"SELECT columns FROM source"
);
but it tells me This:Caused by: org.postgresql.util.PSQLException: ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification
如果你正在考虑这个:
TableEnv.executeSql("INSERT INTO sink_table (columns ...) " +
"SELECT columns ... " +
"FROM source " +
"ON CONFLICT (column1, column2) " +
"DO UPDATE SET " +
" column3 = EXCLUDED.column3 "
);
我之前已经尝试过,flink 是这样说的:
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Incorrect syntax near the keyword 'ON' at line 4, column 1.
我真的很困惑,还有什么我可以尝试的吗?
问题解决了吗?我也有同样的问题
SQL parse failed. Incorrect syntax near the keyword 'ON'