Apache Flink Python Datastream API 接收到 Parquet

问题描述 投票:0回答:1

我有一个包含 json 消息的 Kafka 主题。我尝试使用 Flink Python API 处理此消息并将其存储在 GCS 中的 parquet 文件中。

这是清理后的代码片段:

class Extract(MapFunction):
    def map(self, value):
        record = json.loads(value)
        dt_object = datetime.strptime(record['ts'], "%Y-%m-%dT%H:%M:%SZ")
        return Row(dt_object, record['event_id'])

 <...>

events_schema = DataTypes.ROW([
    DataTypes.FIELD("ts", DataTypes.TIMESTAMP()),
    DataTypes.FIELD("event_id", DataTypes.STRING())
])
<...>

# Main job part
kafka_source = KafkaSource.builder() \
        <...>
        .build()

ds: DataStream = env.from_source(kafka_source, WatermarkStrategy.no_watermarks(), "Kafka Source")

mapped_data = ds.map(Extract(), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING()]))

sink = (FileSink
        .for_bulk_format("gs://<my_events_path>",
                         ParquetBulkWriters.for_row_type(row_type=events_schema))
        .with_output_file_config(
            OutputFileConfig.builder()
            .with_part_prefix("bids")
            .with_part_suffix(".parquet")
            .build())
        .build())

mapped_data.sink_to(sink)

问题是当我尝试运行此作业时出现错误:

Java.lang.ClassCastException: class java.sql.Timestamp cannot be cast to class java.time.LocalDateTime (java.sql.Timestamp is in module java.sql of loader 'platform'; java.time.LocalDateTime is in module java.base of loader 'bootstrap')

所以问题是

Types.SQL_TIMESTAMP()
DataTypes.TIMESTAMP()
在翻译成相应的Java类时不兼容。但我没有看到任何其他选项来“典型化”我的映射转换。

如果代替

mapped_data = ds.map(Extract(), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING()]))

我使用这个选项

mapped_data = ds.map(Extract())

然后我收到另一个错误:

java.lang.ClassCastException: class [B cannot be cast to class org.apache.flink.types.Row ([B is in module java.base of loader 'bootstrap'; org.apache.flink.types.Row is in unnamed module of loader 'app')

我的问题是我可以使用 Flink Python API 以 parquet 格式保存包含时间戳的数据吗?

python apache-flink parquet
1个回答
0
投票

所以我成功地运行了这个。由于某些原因,有一个 LocalTimeTypeInfo 类未在 Types 静态方法下列出。

所以如果我改变

mapped_data = ds.map(Extract(), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING()]))

mapped_data = ds.map(Extract(), Types.ROW([LocalTimeTypeInfo(LocalTimeTypeInfo.TimeType.LOCAL_DATE_TIME), Types.STRING()]))

然后它将工作并创建带有时间戳列的镶木地板文件。仍然存在一个问题,因为它使用已弃用的

INT96
类型进行物理表示,但它可以工作。

© www.soinside.com 2019 - 2024. All rights reserved.