我定义一个 POJO 如下:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class IdCount {
private Integer id;
private String name;
}
我使用以下代码测试它是否是有效的 POJO:
public class Test {
public static boolean isPojoClass(Class<?> pojoClass) {
try {
TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(pojoClass);
return typeInfo instanceof PojoTypeInfo;
} catch (Exception e) {
return false;
}
}
public static void main(String[] args) {
boolean isPojo = isPojoClass(IdCount.class);
if (isPojo) {
System.out.println("This is a POJO class.");
} else {
System.out.println("This is not a POJO class.");
}
}
}
打印输出结果如下:
This is a POJO class.
我的实际任务如下:
public class TimeWindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
tableEnv.executeSql("CREATE TABLE dataGen (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.id.kind'='random',\n" +
" 'fields.id.min'='1',\n" +
" 'fields.id.max'='10',\n" +
" 'fields.name.length'='10'\n" +
")");
Table table = tableEnv.sqlQuery("select * from dataGen");
DataStream<IdCount> dataStream = tableEnv.toDataStream(table, IdCount.class);
dataStream
.countWindowAll(3)
.max("id")
.print()
;
env.execute();
}
}
错误信息如下:
Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException:
Cannot reference field by field expression on
*cn.chatdoge.flink117.POJO.IdCount<`id` INT, `name` STRING>*
(cn.chatdoge.flink117.POJO.IdCount, org.apache.flink.table.runtime.typeutils.ExternalSerializer)
Field expressions are only supported on POJO types, tuples, and case classes.
(See the Flink documentation on what is considered a POJO.)
at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:256)
at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init>(ComparableAggregator.java:78)
at org.apache.flink.streaming.api.datastream.AllWindowedStream.max(AllWindowedStream.java:1366)
at cn.chatdoge.flink117.window.TimeWindowExample.main(TimeWindowExample.java:55)
我不明白为什么测试通过,但在实际执行过程中出现错误。你能帮我找出问题所在吗? 我用的是flink-1.17.2
我自己解决。需要地图功能。
DataStream<IdCount> dataStream = tableEnv.toDataStream(table, IdCount.class).map(
t -> new IdCount(t.getId(), t.getName())
);