我上周开始使用 Pyflink,发现自己陷入了困境。 基本上我尝试从源 A 导入数据并将其下沉到 Elastic,这很好用,但是有一个特殊的字段不能正常工作。
该字段是一个 10 个字符的字符串,由我的 PyFlink 作业解析并通过加密例程运行并转换为十六进制,这使得该字符串现在为 128 个字符。
虽然下沉到弹性,不知何故,系统似乎将我的弦视为“长”类型。
尝试导入时抛出以下错误:
Caused by: ElasticsearchException[Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [some_encrypted_id] of type [long] in document with id '10'. Preview of field's value: 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx']]; nested: ElasticsearchException[Elasticsearch exception [type=illegal_argument_exception, reason=For input string: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"]];
我的水槽定义:
CREATE TABLE es_sink(
some_encrypted_id VARCHAR
) with (
'connector' = 'elasticsearch-7',
'hosts' = 'x', //normally not x
'index' = 'x',//normally not x
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size' = '42mb',
'sink.bulk-flush.max-actions' = '32',
'sink.bulk-flush.interval' = '1000',
'sink.bulk-flush.backoff.delay' = '1000',
'format' = 'json'
)
我尝试用 Text 替换 Varchar,但是在创建作业时出现以下错误:
java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: TEXT
老实说,我在这里没有想法。我尝试了多个不同的领域,一切似乎都按预期工作,只是这个例子不是。
我也不明白为什么系统会尝试将其下沉为“长”类型。我从来没有将任何东西定义为“长”。
希望有人能弄清楚我在这里做错了什么并指出正确的方向。如果需要更多信息,请告诉我!
您是否在弹性搜索索引中为您的字段指定了数据类型? ES 会根据您插入的值猜测该字段的类型,有时它可能不是您期望的类型。
例如如果你有一个索引 AA 和一个字段 aa,它还没有类型映射。然后您的程序依次插入“57”、“abc”……。当 ES 第一次看到 57 时,它会猜测这可能是一个数字类型并使用诸如整数或长整型之类的东西,并且您后续的插入可能会失败。
您可以尝试在写入之前为索引放置一个映射。 PUT 映射