使用ROW()进行嵌套数据结构

问题描述 投票:1回答:1

我已经成功地使用flink-json工件中的JsonRowSerializationSchema来创建一个TableSink<Row>并使用ROW从SQL输出json。它适用于发送平面数据:

INSERT INTO outputTable 
SELECT 
  ROW(col1, col1)
FROM inputTable
>>>> OK:
{"outCol1":"dasdasdas","outCol2":"dasdasdas"}

现在,我正在尝试嵌套模式,它以一种奇怪的方式分解:

INSERT INTO outputTable 
SELECT 
  ROW('ttt', ROW('ppp'))
FROM inputTable
>>>> OK:
{"outCol1":"ttt","outCol2":{"outCol3":"ppp"}}

INSERT INTO outputTable 
SELECT 
  ROW('ttt', ROW(col1))
FROM inputTable
>>>> OK:
{"outCol1":"ttt","outCol2":{"outCol3":"dasdasdas"}}

INSERT INTO outputTable 
SELECT 
  ROW(col1, ROW(col1))
FROM inputTable
>>>> KO

这是一个解析问题,但我很困惑为什么会发生这种情况。 col1和'ttt'是String类型表达式,应该是可替换的;但不知何故,解析器受到以下ROW的干扰,因为stacktrace说:

Caused by: org.apache.calcite.sql.parser.impl.ParseException: Encountered ", ROW" at line 3, column 11.
Was expecting one of:
    ")" ...
    "," <IDENTIFIER> ...
    "," <QUOTED_IDENTIFIER> ...
    "," <BACK_QUOTED_IDENTIFIER> ...
    "," <BRACKET_QUOTED_IDENTIFIER> ...
    "," <UNICODE_QUOTED_IDENTIFIER> ...

    at org.apache.calcite.sql.parser.impl.SqlParserImpl.generateParseException(SqlParserImpl.java:23019)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.jj_consume_token(SqlParserImpl.java:22836)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.ParenthesizedSimpleIdentifierList(SqlParserImpl.java:4466)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression3(SqlParserImpl.java:3328)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression2b(SqlParserImpl.java:3066)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression2(SqlParserImpl.java:3092)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression(SqlParserImpl.java:3045)
    at ...

我错过了一些关于语法的东西吗?解析器试图做什么?我应该以另一种方式使用ROW()吗?

这是一个错误吗?

apache-flink flink-streaming apache-calcite flink-sql
1个回答
0
投票

在进一步挖掘之后,我得出了以下结果:你只需要很好地与ROW()交谈。

这将有效:

INSERT INTO outputTable
SELECT ROW(col1, col2) 
FROM (
  SELECT 
    col1, 
    ROW(col1, col1) as col2 
  FROM inputTable
) tbl2

注意:

  • 嵌套:也许SQL只允许一个嵌套级别。但是你可以使用几个表表达式。我对它的看法是,Flink现在很少在将其推送到执行引擎之前转换SQL语义。执行计划将在一个单元中创建一个融合的ROW(col1,ROW(col1,col1)),因此这不会产生影响。
  • ROW(col1,col1):辅助表中的ROW(col1)不起作用。 (它可以在第一个表中独立工作)。不知道为什么。但是,嘿,当我只有一个值时,我真的需要吗?我可以崩溃那一个值。如果您在输出模式中有一些余地,这将不是问题。

我在这里提交了一个JIRA问题:

https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11399

将相应更新此帖子

© www.soinside.com 2019 - 2024. All rights reserved.