我有一个 flink 表,比方说
CREATE TABLE source(id int, name string) with (...)
,还有一个目标表,比方说 CREATE TABLE destination(id int, unique_name string) with (...)
。 unique_name
是在flink内部流程函数中使用业务逻辑计算的。
所以我们可以安全地假设源模式与目标模式完全相同(名称和数据类型)。 我使用数据流 API 做了一些低级的
process
来获取destination
数据流。它有 outputType
作为 GenericType<org.apache.flink.types.Row>
。当我再次将 destination
数据流转换回表时,出现以下错误。
org.apache.flink.table.api.ValidationException: Column types of query result and sink
for registered table 'default_catalog.default_database.destination' do not match.
Cause: Different number of columns.
Query schema: [f0: RAW('org.apache.flink.types.Row', '...')]
Sink schema: [id: INT, name: STRING]
虽然我可以使用下面的代码解决这个问题,但是我想对此进行泛化并从目的地
RowTypeInformation
获取Table
。有什么办法可以从 flink TypeInformation
获得Table
.
tableEnv.fromDataStream(destionationDataStream.map(x -> x).returns(Types.ROW(Types.Int, Types.String))
桌式系统比
TypeInformation
更丰富。如果您可以使用内部类,则可以使用org.apache.flink.table.runtime.typeutils.ExternalTypeInfo
。 TypeInformation
可以使用 Table API 的DataType
进行配置。
如果你喜欢使用官方支持的API。您可以使用
TypeInformation
声明输入和输出类型,并在调用DataTypes.of(TypeInformation)
时使用
StreamTableEnvironment.toDataStream(..., DataType)
我也遇到了同样的问题。你解决了吗。 流处理后可能导致以下错误:Query schema: [f0: RAW ('org.Apache.Flink.Types.Row', '...')] 接收器架构:[id: INT, name: STRING]