从多个Kafka主题读取数据(通用列表类设计)

问题描述 投票:0回答:1

我尝试更改Flink运行程序代码,以使其从多个Kafka主题读取数据,并将其相应地写入不同的HDFS文件夹,而无需加入。我在主过程方法和反射内部有很多Java和Scala通用方法以及通用对象初始化。它可以在一个Avro模式下正常工作,但是当我尝试添加未知数量的Avro模式时,我在泛型和反射结构上遇到了问题。

如何解决?什么设计模式可以帮助我?

模型(Avro模式)在Java类中。

    public enum Types implements MessageType {
    RECORD_1("record1", "01", Record1.getClassSchema(), Record1.class),
    RECORD_2("record2", "02", Record2.getClassSchema(), Record2.class);

    private String topicName;
    private String dataType;
    private Schema schema;
    private Class<? extends SpecificRecordBase> clazz;}



public class Record1 extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord 
{
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("???");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
... }

public class Record1 ...

具有主要处理方法的处理特征。

import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.fs.Writer
import tests.{Record1, Record2, Types}
import scala.reflect.ClassTag

trait Converter[T] extends Serializable {
  def convertToModel(message: KafkaSourceType): T
}

trait FlinkRunner extends Serializable {

  val kafkaTopicToModelMapping: Map[String, Class[_ <: SpecificRecordBase]] =
    Map(
      "record_1" -> Types.RECORD_1.getClassType,
      "record_2" -> Types.RECORD_2.getClassType
    )

  def buildAvroSink1(path: String, writer1: Writer[Record1]): BucketingSink[Record1] = ???
  def buildAvroSink2(path: String, writer2: Writer[Record2]): BucketingSink[Record2] = ???

  def process(topicList: List[String], env: StreamExecutionEnvironment): Unit = {
    // producer kafka source building
    val clazz1: Class[Record1] = ClassTag(kafkaTopicToModelMapping(topicList.head)).runtimeClass.asInstanceOf[Class[Record1]]
    val clazz2: Class[Record2] = ClassTag(kafkaTopicToModelMapping(topicList.tail.head)).runtimeClass.asInstanceOf[Class[Record2]]
    // How to makes clazzes list from that val clazzes: List[Class[???]] = ???

    val avroTypeInfo1: TypeInformation[Record1] = TypeInformation.of(clazz1)
    val avroTypeInfo2: TypeInformation[Record2] = TypeInformation.of(clazz2)
    // How to makes clazzes list from that val avroTypeInfos = ???

    val stream: DataStream[KafkaSourceType] = ???

    // consumer avro paths building, it
    val converter1: Converter[Record1] = new Converter[Record1] {
      override def convertToModel(message: KafkaSourceType): Record1 = deserializeAvro[Record1](message.value)
    }
    val converter2: Converter[Record2] = new Converter[Record2] {
      override def convertToModel(message: KafkaSourceType): Record2 = deserializeAvro[Record2](message.value)
    }
      // How to makes converters list from that

    val outputResultStream1 = stream
      .filter(_.topic == topicList.head)
      .map(record => converter1.convertToModel(record))(avroTypeInfo1)

    val outputResultStream2 = stream
      .filter(_.topic == topicList.tail.head)
      .map(record => converter2.convertToModel(record))(avroTypeInfo2)

    val writer1 = new AvroSinkWriter[Record1](???)
    val writer2 = new AvroSinkWriter[Record2](???)

    // add sink and start process
  }
}

原样卡夫卡有几个不同的主题。 Kafka版本是10.2,没有Confluent。每个Kafka主题都只能使用一个用Java编写的Avro模式类。唯一的一项Flink作业(用Scala编写)读取唯一的主题,使用一种Avro模式进行转换,并将数据仅写入HDFS中的一个文件夹。名称,路径和输出文件夹名称位于config中。例如,有3个带有参数的作业流:

第一作业流程

--brokersAdress … 
--topic record1
--folderName  folder1
-- avroClassName Record1
--output C:/….
--jobName SingleTopic1 
--number_of_parallel 2
--number_of_task 1
--mainClass Runner 
….

第二作业流程

--brokersAdress … 
--topic record1
--folderName  folder1
-- avroClassName Record1
--output C:/….
--jobName SingleTopic2 
--number_of_parallel 2
--number_of_task 1
--mainClass Runner 
….

第三工作流程

待做一个Flink作业可以读取多个Kafka主题,可以使用不同的Avro架构对其进行转换,并且可以将数据写入不同的文件夹而无需加入。例如,我只能启动一个将执行相同工作的工作流程

--brokersAdress … 
--topic record1, record2, record3
--folderName  folder1, folder2, 
-- avroClassName Record1, Record2
--output C:/….
--jobName MultipleTopics 
--number_of_parallel 3
--number_of_task 3
--mainClass Runner
...

好,谢谢。关于代码组织,存在几个问题:1)如何在方法和方法(称为过程)参数中泛化变量,以启动从SpecificRecordBase类继承的几个分类列表?如果可能,请确定。

val clazz1: Class[Record1] = ClassTag(kafkaTopicToModelMapping(topicList.head)).runtimeClass.asInstanceOf[Class[Record1]]
val clazz2: Class[Record2] = ClassTag(kafkaTopicToModelMapping(topicList.tail.head)).runtimeClass.asInstanceOf[Class[Record2]]

2)对于avroTypeInfo1, avroTypeInfo2 ..., converter1, converter2, ..., buildAvroSink1, buildAvroSink2, ... .,同样的问题>

[我也有关于建筑的问题。我尝试执行此代码,并且Flink在Avro模式类的不同主题下正常工作。哪些Flink代码工具可以帮助我将不同的Avro架构类放入几个outputStrems并为其添加接收器?你有代码示例吗?

而且我还可以使用Flink解决来自不同Kafka主题的多个Avro文件的问题?也许会合。

我尝试更改Flink运行程序代码,以使其从多个Kafka主题读取数据,并将其相应地写入不同的HDFS文件夹,而无需加入。我有很多Java和Scala通用方法,而且...

scala generics apache-kafka apache-flink class-design
1个回答
2
投票

我对你的动力有点迷茫。一般的想法是,如果要使用通用方法,请使用GenericRecord。如果您有针对不同类型的特定代码,请访问SpecificRecord,但不要在其周围使用通用代码。

© www.soinside.com 2019 - 2024. All rights reserved.