POJO类测试通过,但执行时显示无效

问题描述 投票:0回答:1

我定义一个 POJO 如下:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class IdCount {
    private  Integer id;
    private  String name;
}

我使用以下代码测试它是否是有效的 POJO:

public class Test {
    public static boolean isPojoClass(Class<?> pojoClass) {
        try {
            TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(pojoClass);
            return typeInfo instanceof PojoTypeInfo;
        } catch (Exception e) {
            return false;
        }
    }

    public static void main(String[] args) {
        boolean isPojo = isPojoClass(IdCount.class);

        if (isPojo) {
            System.out.println("This is a POJO class.");
        } else {
            System.out.println("This is not a POJO class.");
        }
    }
}

打印输出结果如下:

This is a POJO class.

我的实际任务如下:

public class TimeWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);

        tableEnv.executeSql("CREATE TABLE dataGen (\n" +
                "  id INT,\n" +
                "  name STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'datagen',\n" +
                "  'rows-per-second'='1',\n" +
                "  'fields.id.kind'='random',\n" +
                "  'fields.id.min'='1',\n" +
                "  'fields.id.max'='10',\n" +
                "  'fields.name.length'='10'\n" +
                ")");

        Table table = tableEnv.sqlQuery("select * from dataGen");

        DataStream<IdCount> dataStream = tableEnv.toDataStream(table, IdCount.class);

        dataStream
                .countWindowAll(3)
                .max("id")
                .print()
        ;

        env.execute();
    }
}

错误信息如下:

Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: 
Cannot reference field by field expression on 
*cn.chatdoge.flink117.POJO.IdCount<`id` INT, `name` STRING>*
(cn.chatdoge.flink117.POJO.IdCount, org.apache.flink.table.runtime.typeutils.ExternalSerializer)
Field expressions are only supported on POJO types, tuples, and case classes. 
(See the Flink documentation on what is considered a POJO.)
    at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:256)
    at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init>(ComparableAggregator.java:78)
    at org.apache.flink.streaming.api.datastream.AllWindowedStream.max(AllWindowedStream.java:1366)
    at cn.chatdoge.flink117.window.TimeWindowExample.main(TimeWindowExample.java:55)

我不明白为什么测试通过,但在实际执行过程中出现错误。你能帮我找出问题所在吗? 我用的是flink-1.17.2

apache-flink
1个回答
0
投票

我自己解决。需要地图功能。

DataStream<IdCount> dataStream = tableEnv.toDataStream(table, IdCount.class).map(
    t -> new IdCount(t.getId(), t.getName())
);
© www.soinside.com 2019 - 2024. All rights reserved.