我有一个包含 json 消息的 Kafka 主题。我尝试使用 Flink Python API 处理此消息并将其存储在 GCS 中的 parquet 文件中。
这是清理后的代码片段:
class Extract(MapFunction):
def map(self, value):
record = json.loads(value)
dt_object = datetime.strptime(record['ts'], "%Y-%m-%dT%H:%M:%SZ")
return Row(dt_object, record['event_id'])
<...>
events_schema = DataTypes.ROW([
DataTypes.FIELD("ts", DataTypes.TIMESTAMP()),
DataTypes.FIELD("event_id", DataTypes.STRING())
])
<...>
# Main job part
kafka_source = KafkaSource.builder() \
<...>
.build()
ds: DataStream = env.from_source(kafka_source, WatermarkStrategy.no_watermarks(), "Kafka Source")
mapped_data = ds.map(Extract(), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING()]))
sink = (FileSink
.for_bulk_format("gs://<my_events_path>",
ParquetBulkWriters.for_row_type(row_type=events_schema))
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("bids")
.with_part_suffix(".parquet")
.build())
.build())
mapped_data.sink_to(sink)
问题是当我尝试运行此作业时出现错误:
Java.lang.ClassCastException: class java.sql.Timestamp cannot be cast to class java.time.LocalDateTime (java.sql.Timestamp is in module java.sql of loader 'platform'; java.time.LocalDateTime is in module java.base of loader 'bootstrap')
所以问题是
Types.SQL_TIMESTAMP()
和DataTypes.TIMESTAMP()
在翻译成相应的Java类时不兼容。但我没有看到任何其他选项来“典型化”我的映射转换。
如果代替
mapped_data = ds.map(Extract(), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING()]))
我使用这个选项
mapped_data = ds.map(Extract())
然后我收到另一个错误:
java.lang.ClassCastException: class [B cannot be cast to class org.apache.flink.types.Row ([B is in module java.base of loader 'bootstrap'; org.apache.flink.types.Row is in unnamed module of loader 'app')
我的问题是我可以使用 Flink Python API 以 parquet 格式保存包含时间戳的数据吗?
所以我成功地运行了这个。由于某些原因,有一个 LocalTimeTypeInfo 类未在 Types 静态方法下列出。
所以如果我改变
mapped_data = ds.map(Extract(), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING()]))
到
mapped_data = ds.map(Extract(), Types.ROW([LocalTimeTypeInfo(LocalTimeTypeInfo.TimeType.LOCAL_DATE_TIME), Types.STRING()]))
然后它将工作并创建带有时间戳列的镶木地板文件。仍然存在一个问题,因为它使用已弃用的
INT96
类型进行物理表示,但它可以工作。