我的客户类已经使用 maven-avro 插件创建。当我尝试运行这个程序时,我收到错误信息
Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.example.Customer does not contain a setter for field first_name
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class com.example.Customer cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
我正在使用 java 8
我从 maven avro 插件创建的客户类是特定记录类型
请帮帮我,我花了最后 5 天时间解决这个问题
我尝试了 3 种不同的方法,我在下面将它们称为方法 1、方法 2 ..
package com.example
import com.typesafe.config.ConfigException.Generic
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecord
import org.apache.flink.api.scala.createTypeInformation
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.flink.formats.avro.AvroDeserializationSchema
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import java.util.Properties
object flink_kafka_avro extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties
properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
properties.put("group.id", "customer-consumer-group-v1")
properties.put("auto.commit.enable", "false")
properties.put("auto.offset.reset", "earliest")
import org.apache.avro.Schema
import org.apache.avro.reflect.ReflectData
val schema = ReflectData.get.getSchema(classOf[Customer])
// Method 1 not working
//val ss = new FlinkKafkaConsumer[Customer]("customer-avro", AvroDeserializationSchema.forSpecific(classOf[Customer]),properties)
val schemaRegistryUrl = "http://localhost:8081"
//Method 2
val userKafkaReaderResult = env.addSource(new FlinkKafkaConsumer[Customer]("customer-avro",
ConfluentRegistryAvroDeserializationSchema.forSpecific(classOf[Customer],schemaRegistryUrl), properties).setStartFromEarliest())
userKafkaReaderResult.print()
//Method 3
// I tried like this it is not working even
//val strenew = FlinkKafkaConsumer[Customer]("test_topic", AvroDeserializationSchema.forSpecific(classOf[Customer]), properties).setStartFromEarliest
//env.addSource(ss).print()
env.execute()
}
我的 POM 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.example</groupId>
<artifactId>kafkaavrov1</artifactId>
<version>1.0-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
<properties>
<avro.version>1.8.2</avro.version>
<kafka.version>0.11.0.1</kafka.version>
<confluent.version>3.3.1</confluent.version>
</properties>
<!--necessary to resolve confluent dependencies-->
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-confluent-registry -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.13.2</version>
</dependency>
<!--Only dependency needed for the avro part-->
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<!--dependencies needed for the kafka part-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<version>1.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>dd
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<stringType>String</stringType>
<createSetters>false</createSetters>
<enableDecimalLogicalType>true</enableDecimalLogicalType>
<fieldVisibility>private</fieldVisibility>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>target/generated-sources/avro</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
除非
Customer
扩展 org.apache.avro.specific.SpecificRecordBase
,Flink 不会将其视为 Avro 类型,并将尝试使用其 POJO 序列化程序对其进行序列化。如果失败了(就像它在这里所做的那样),它将退回到将其视为通用类型,并将使用 Kryo。
这是POM中的这一行造成的: 错误的 Flink Avro util 将检查类中每个字段的 get/set 方法。如果两者都没有找到,则会抛出错误。
我通过设置“createSetters=true”修复了我的问题。
自动生成的类不符合flink POJO要求。您可以尝试通过更新 POM 将字段可见性设置为公开。 Flink POJO 需求 例如
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
<fieldVisibility>PUBLIC</fieldVisibility>
<createSetters>true</createSetters>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>