这是我当前的场景。
当发生某些事件时,我将事件详细信息记录在 aws rds mysql 数据库中。 事件类有 3 个字段。
私有字符串事件; 私有长EVENT_ID; 私有 LocalDateTime TS;
每天一次,我想从 mysql 数据库获取所有数据并将其写入 parquet 文件中,然后将其上传到 aws s3 存储桶中。
我的要求是这样的 - 我需要 parquet 文件中字段 TS 的数据类型为 Timestamp。我尝试过的所有其他方法最终都会将时间戳数据保存为字符串或长整数。
我使用 AvroParquetWriter 编写镶木地板文件,并使用 JSON 字符串构建架构。
这是 json 模式字符串
private final String JSON_SCHEMA = "{"
+ "\"type\": \"record\","
+ "\"name\": \"activity\","
+ "\"fields\": ["
+ "{\"name\": \"EVENT\", \"type\": \"string\"},"
+ "{\"name\": \"EVENT_ID\", \"type\": \"long\"},"
+ "{\"name\": \"TS\", \"type\": \"long\", \"LogicalType\" : { \"Name\": \"TIMESTAMP\", \"IsAdjustedToUTC\": true, \"Unit\": \"MICROS\" }, \"ConvertedType\": \"TIMESTAMP_MICROS\"},"
+ "]"
+ "}";
这就是我在镶木地板文件中写入的方式
List<ActivityBean> allData = activityRepository.findAll();Schema schema = new Schema.Parser().parse(JSON_SCHEMA);
try(ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(FILE_LOCATION+FILE_NAME+count+PARQUET_EXTENSION))
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
.withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
.build()) {
for(ActivityBean bean: allData) {
GenericRecord record = getGenericRecord(bean, schema);
writer.write(record);
}
}
private static GenericRecord getGenericRecord(ActivityBean bean, Schema schema) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("MMM dd yyyy HH:mm:ss a").withZone(ZoneId.of(ZoneOffset.UTC.getId()));
GenericRecord record = new GenericData.Record(schema);
record.put("EVENT", bean.getCUSTOMER());
record.put("EVENT_ID", bean.getACTIVITY());
record.put("TS", DateTimeUtil.getEpochMicroseconds(bean.getTS()));
return record;
}
我无法弄清楚 json 模式中时间戳的字段类型应该是什么。我有一些示例 parquet 文件,它们告诉我我的 parquet 文件应该是什么样子,并且在一些 VS Code 扩展的帮助下,我可以看到预期的 ts 列数据类型是
Timestamp<MICROSECOND>
。使用上面的 json,我在 ts 字段中获得一个 int64 数据类型,其值类似于 1723535755205000,这是正确的纪元时间,但示例镶木地板文件的 ts 列的数据类型为 Timestamp<MICROSECOND>
,其值例如 2024-07-04 08 :00:27.418474.
对任何错误表示歉意,如果需要提供任何其他信息,请告诉我。
配置
AvroParquetWriter
,您需要配置一个“DataModel”,它知道如何将 Instant 类型转换为内部 Parquet 时间戳表示。
类似这样的:
GenericData model = new GenericData();
model.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
try(ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(FILE_LOCATION+FILE_NAME+count+PARQUET_EXTENSION))
.withSchema(schema)
.withDataModel(model)
其他选择是使用Carpet(免责声明,我是该库的创建者),它简化了使用java记录的过程:
record Activity(String event, long eventId, LocalDateTime ts) {
public Activity(ActivityBean bean) {
this(bean.getCUSTOMER(), bean.getACTIVITY(), bean.getTS());
}
}
File file = new File(FILE_LOCATION + FILE_NAME + count + PARQUET_EXTENSION);
try (CarpetWriter<Activity> writer = new CarpetWriter.Builder<>(new FileSystemOutputFile(file), Activity.class)
.withColumnNamingStrategy(ColumnNamingStrategy.SNAKE_CASE)
.withDefaultTimeUnit(TimeUnit.MICROS)
.build()) {
writer.write(allData.stream().map(Activity::new));
}