Flink - 如何将表结果转换为Datastream

问题描述 投票:0回答:1

我试图查询 postgresql 表,并将其转换为 dataStream:

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
                .
                .
                .
                .
            catalog config and register
                .
                .
                .
Table tb = tEnv.sqlQuery("SELECT * FROM mytable);
DataStream<Row> dataStream = tEnv.toDataStream(tb)  
dataStream.print();

但是数据流似乎是空的.. 查询很好,当我打印 TableResult 时,我可以看到我的表格。

TableResult tb1 = tEnv.sqlQuery("SELECT * FROM mytable).execute();
tb1.print();

                     

这是正确的做法吗? 我想知道因为根据表文档: “Table 对象描述了数据转换的管道。它不以任何方式包含数据本身。” 那么如果它不包含数据,那么将其转换为数据流有什么意义呢? 或者也许数据流应该保存数据? 我还尝试将 TableResult 转换为 DataStream 但没有找到正确的方法。

谢谢你!

apache-flink etl flink-streaming flink-sql
1个回答
0
投票

你的代码中有

env.execute();
吗? 如果您不执行 StreamExecutionEnvironment,您将不会有任何结果。

// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

...

// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();

Flink 文档中的示例

© www.soinside.com 2019 - 2024. All rights reserved.