Flink 表获取类型信息

问题描述 投票:0回答:2

我有一个 flink 表,比方说

CREATE TABLE source(id int, name string) with (...)
,还有一个目标表,比方说
CREATE TABLE destination(id int, unique_name string) with (...)
unique_name
是在flink内部流程函数中使用业务逻辑计算的。

所以我们可以安全地假设源模式与目标模式完全相同(名称和数据类型)。 我使用数据流 API 做了一些低级的

process
来获取
destination
数据流。它有
outputType
作为
GenericType<org.apache.flink.types.Row>
。当我再次将
destination
数据流转换回表时,出现以下错误。

org.apache.flink.table.api.ValidationException: Column types of query result and sink 
for registered table 'default_catalog.default_database.destination' do not match.
Cause: Different number of columns.

Query schema: [f0: RAW('org.apache.flink.types.Row', '...')]
Sink schema:  [id: INT, name: STRING]

虽然我可以使用下面的代码解决这个问题,但是我想对此进行泛化并从目的地

RowTypeInformation
获取
Table
。有什么办法可以从 flink
TypeInformation
获得
Table
.

tableEnv.fromDataStream(destionationDataStream.map(x -> x).returns(Types.ROW(Types.Int, Types.String))
apache-flink flink-streaming
2个回答
3
投票

桌式系统比

TypeInformation
更丰富。如果您可以使用内部类,则可以使用
org.apache.flink.table.runtime.typeutils.ExternalTypeInfo
TypeInformation
可以使用 Table API 的
DataType
进行配置。

如果你喜欢使用官方支持的API。您可以使用

TypeInformation
声明输入和输出类型,并在调用
DataTypes.of(TypeInformation)
时使用
StreamTableEnvironment.toDataStream(..., DataType)


0
投票

我也遇到了同样的问题。你解决了吗。 流处理后可能导致以下错误:Query schema: [f0: RAW ('org.Apache.Flink.Types.Row', '...')] 接收器架构:[id: INT, name: STRING]

© www.soinside.com 2019 - 2024. All rights reserved.