我正在尝试在 Apache Flink 中处理后从 1 个 KDS 写入另一个 KDS。我使用 Zeppelin 笔记本通过以下查询创建接收器表:
%flink.ssql
CREATE TABLE seller_revenue (
seller_id VARCHAR,
window_end TIMESTAMP,
sales DOUBLE
)
WITH (
'connector' = 'kinesis',
'stream' = 'seller_stream_window',
'aws.region' = 'ap-south-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json'
)
然后我使用以下方式写入数据
%flink.ssql(parallelism=1)
INSERT INTO seller_revenue
SELECT
seller_id,
TUMBLE_END(proctime, INTERVAL '30' SECONDS) AS window_end,
SUM(product_quantity * product_price) AS sales
FROM seller_sales
GROUP BY
TUMBLE(proctime, INTERVAL '30' SECONDS),
seller_id
但是出现以下错误:
Unable to create a sink for writing table 'hive.flink2.seller_revenue'.
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'kinesis'.
Unsupported options:
scan.stream.initpos
有人可以帮忙解决吗?
我尝试删除不受支持的选项
scan.stream.initpos
但是此后没有数据被写入。
如果您将 Zeppelin 笔记本部署为流应用程序,那么代码将可以工作。
在 Zeppelin 笔记本本身中,您无法执行这些步骤,我也遇到了类似的问题。