在我简单的 Maven 应用程序中,我有 3 个 avro 文件:
ReportDetails.avsc
{
"type": "record",
"name": "ReportDetails",
"namespace": "com.vl.model.avro",
"fields": [
{"name": "detailId", "type": "string"},
{"name": "detailName", "type": "string"}
]
}
Employee.avsc
{
"fields": [
{ "name": "employeeId", "type": "string"},
{ "name": "position", "type": "string" },
{ "name": "department", "type": "int" },
{"name": "employeeName", "type": "string"}
],
"name": "Employee",
"namespace": "com.vl.model.avro",
"type": "record"
}
Report.avsc
{
"type": "record",
"name": "Report",
"namespace": "com.vl.model.avro",
"fields": [
{"name": "reportId", "type": "string"}
, {"name": "employee", "type": ["null", "com.vl.model.avro.Employee"], "default": null}
, {"name": "details", "type": {"type": "array", "items": "com.vl.model.avro.ReportDetails"}}
]
}
插件配置是
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<enableDecimalLogicalType>true</enableDecimalLogicalType>
<stringType>String</stringType>
<fieldVisibility>PRIVATE</fieldVisibility>
<includes>
<include>ReportDetails.avsc</include>
<include>Employee.avsc</include>
<include>Report.avsc</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
所以这失败了,因为
[INFO] --- avro:1.11.3:schema (default) @ spring-cloud-stream-kafka-streaming-example ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.693 s
[INFO] Finished at: 2024-02-01T14:43:20+02:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.avro:avro-maven-plugin:1.11.3:schema (default) on project spring-cloud-stream-kafka-streaming-example: Execution default of goal org.apache.avro:avro-maven-plugin:1.11.3:schema failed: Undefined name: "com.vl.model.avro.Employee" -> [Help 1]
为了使其正常工作,我更新了
Report
配置(Employee
和 ReportDetails
没有更改)
{
"type": "record",
"name": "Report",
"namespace": "com.vl.model.avro",
"fields": [
{"name": "reportId", "type": "string"}
, {"name": "employee", "type": ["null", {"type": "record", "name": "Employee", "fields": []}], "default": null}
, {"name": "details", "type": {"type": "array", "items": {"type": "record", "name": "ReportDetails", "fields": []}}}
]
}
它看起来解决了我的
avro:1.11.3:schema
生成问题并且工作正常,因为生成了我需要的东西。
Avro 模式必须在模式注册服务中注册,以便在不同的微服务上共享。为了满足这个需求,我配置了
kafka-schema-registry-maven-plugin
(7.5.1
),它可以上传和下载模型。
所以插件配置是:
<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-maven-plugin</artifactId>
<version>7.5.1</version>
<executions>
<execution>
<id>avro-resources</id>
<phase>generate-sources</phase>
<goals>
<goal>download</goal>
</goals>
</execution>
</executions>
<configuration>
<schemaRegistryUrls>
<param>http://localhost:8081</param>
</schemaRegistryUrls>
<outputDirectory>src/main/avro</outputDirectory>
<subjectPatterns>
<param>^com.vl.model.*$</param>
</subjectPatterns>
<versions>
<param>latest</param>
</versions>
<subjects>
<com.vl.model.ReportDetails>src/main/resources/avro/ReportDetails.avsc</com.vl.model.ReportDetails>
<com.vl.model.Employee>src/main/resources/avro/Employee.avsc</com.vl.model.Employee>
<com.vl.model.Report>src/main/resources/avro/Report.avsc</com.vl.model.Report>
</subjects>
<schemaTypes>
<com.vl.model.ReportDetails>AVRO</com.vl.model.ReportDetails>
<com.vl.model.Employee>AVRO</com.vl.model.Employee>
<com.vl.model.Report>AVRO</com.vl.model.Report>
</schemaTypes>
<references>
<com.vl.model.Report>
<reference>
<name>details</name>
<subject>com.vl.model.ReportDetails</subject>
</reference>
<reference>
<name>employee</name>
<subject>com.vl.model.Employee</subject>
</reference>
</com.vl.model.Report>
</references>
</configuration>
</plugin>
注册模式
mvn schema-registry:register
会导致不同的问题
[INFO] --- schema-registry:7.5.1:register (default-cli) @ spring-cloud-stream-kafka-streaming-example ---
[INFO] Registered subject(com.vl.model.Overtime) with id 3 version 1
[INFO] Registered subject(com.vl.model.Absence) with id 4 version 1
[INFO] Registered subject(com.vl.model.Employee) with id 5 version 1
[INFO] Registered subject(com.vl.model.ReportDetails) with id 6 version 1
[INFO] Registered subject(com.vl.model.Attendance) with id 7 version 1
[ERROR] Could not parse Avro schema
org.apache.avro.SchemaParseException: Can't redefine: com.vl.model.avro.Employee
at org.apache.avro.Schema$Names.put (Schema.java:1550)
at org.apache.avro.Schema$Names.add (Schema.java:1544)
at org.apache.avro.Schema.parse (Schema.java:1665)
at org.apache.avro.Schema.parse (Schema.java:1765)
at org.apache.avro.Schema.parse (Schema.java:1678)
at org.apache.avro.Schema$Parser.parse (Schema.java:1433)
at org.apache.avro.Schema$Parser.parse (Schema.java:1421)
at io.confluent.kafka.schemaregistry.avro.AvroSchema.<init> (AvroSchema.java:120)
at io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider.parseSchemaOrElseThrow (AvroSchemaProvider.java:54)
at io.confluent.kafka.schemaregistry.SchemaProvider.parseSchema (SchemaProvider.java:114)
at io.confluent.kafka.schemaregistry.SchemaProvider.parseSchema (SchemaProvider.java:123)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.parseSchema (CachedSchemaRegistryClient.java:286)
at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.parseSchema (SchemaRegistryClient.java:61)
at io.confluent.kafka.schemaregistry.maven.UploadSchemaRegistryMojo.processSubject (UploadSchemaRegistryMojo.java:120)
at io.confluent.kafka.schemaregistry.maven.UploadSchemaRegistryMojo.execute (UploadSchemaRegistryMojo.java:92)
at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:126)
at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute2 (MojoExecutor.java:328)
at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:316)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:212)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:174)
at org.apache.maven.lifecycle.internal.MojoExecutor.access$000 (MojoExecutor.java:75)
at org.apache.maven.lifecycle.internal.MojoExecutor$1.run (MojoExecutor.java:162)
at org.apache.maven.plugin.DefaultMojosExecutionStrategy.execute (DefaultMojosExecutionStrategy.java:39)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:159)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:105)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:73)
at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:53)
at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:118)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:261)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:173)
at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:101)
at org.apache.maven.cli.MavenCli.execute (MavenCli.java:906)
at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:283)
at org.apache.maven.cli.MavenCli.main (MavenCli.java:206)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:77)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:568)
at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:283)
at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:226)
at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:407)
at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:348)
[ERROR] Schema for com.vl.model.Report could not be parsed.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
因为我们不能声明两次
"type": "record", "name": "Employee"
或"ReportDetails"
。
尝试解决我更新的最后一个问题
Report
描述
{
"type": "record",
"name": "Report",
"namespace": "com.vl.model.avro",
"fields": [
{"name": "reportId", "type": "string"}
, {"name": "employee", "type": {"type": "Employee", "java-class": "com.vl.model.avro.Employee"}}
, {"name": "details", "type": {"type": "array", "items": "com.vl.model.avro.ReportDetails"}}
]
}
它现在位于架构注册表中。
Avro 模式肯定已发布到模式注册表。但无论是本地还是远程都无法帮助生成java类。 从模式注册表中拉出
avro
文件,您可以发现它们有点不同。远程模式的问题:
[INFO] --- avro:1.11.3:schema (default) @ spring-cloud-stream-kafka-streaming-example ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.753 s
[INFO] Finished at: 2024-02-01T16:59:40+02:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.avro:avro-maven-plugin:1.11.3:schema (default) on project spring-cloud-stream-kafka-streaming-example: Execution default of goal org.apache.avro:avro-maven-plugin:1.11.3:schema failed: "Employee" is not a defined name. The type of the "employee" field must be a defined name or a {"type": ...} expression. -> [Help 1]
对于本地模式
Execution default of goal org.apache.avro:avro-maven-plugin:1.11.3:schema failed: Type not supported: Employee
。
从模式注册表下载的
Report.avsc
(我格式化了一个以提高可读性)看起来像:
{
"type": "record",
"name": "Report",
"namespace": "com.vl.model.avro",
"fields": [
{
"name": "reportId",
"type": "string"
},
{
"name": "employee",
"type": "Employee"
},
{
"name": "details",
"type": {
"type": "array",
"items": "ReportDetails"
}
}
]
}
(在
com.vl.model.avro
、Employee
等类型之前错过了 ReportDetails
命名空间)
如您所见,解决一个问题我们会得到不同的问题。我会很乐意
我没有删除 avsc 引用,因为我确信它会在 kafka 通信时带来序列化问题。
PS2。 解决方案应该使用 Maven 构建器生成 Java 数据类。 Java 类将用于发布 kafka 消息。
看来,从模式注册表中提取了正确的模式(在其他情况下没有人会在他们的项目中使用它是合乎逻辑的。)
{
"type": "record",
"name": "Report",
"namespace": "com.vl.model.avro",
"fields": [
{
"name": "reportId",
"type": "string"
},
{
"name": "employee",
"type": "Employee"
},
{
"name": "details",
"type": {
"type": "array",
"items": "ReportDetails"
}
}
]
}
所以问题可能出在插件上。验证插件 api,我意识到我在配置中遗漏了一些重要的东西。
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<enableDecimalLogicalType>true</enableDecimalLogicalType>
<stringType>String</stringType>
<fieldVisibility>PRIVATE</fieldVisibility>
<imports>
<import>${project.basedir}/src/main/resources/avro/Employee.avsc</import>
<import>${project.basedir}/src/main/resources/avro/ReportDetails.avsc</import>
<import>${project.basedir}/src/main/resources/avro/Report.avsc</import>
</imports>
<includes>
<include>ReportDetails.avsc</include>
<include>Employee.avsc</include>
<include>Report.avsc</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
因此错过了
imports
部分。
附注 要修复
imports
部分(以及 includes
)中拉取模式的问题,应该是拉取文件夹中的文件。