我尝试更改Flink运行程序代码,以使其从多个Kafka主题读取数据,并将其相应地写入不同的HDFS文件夹,而无需加入。我在主过程方法和反射内部有很多Java和Scala通用方法以及通用对象初始化。它可以在一个Avro模式下正常工作,但是当我尝试添加未知数量的Avro模式时,我在泛型和反射结构上遇到了问题。
如何解决?什么设计模式可以帮助我?
模型(Avro模式)在Java类中。
public enum Types implements MessageType {
RECORD_1("record1", "01", Record1.getClassSchema(), Record1.class),
RECORD_2("record2", "02", Record2.getClassSchema(), Record2.class);
private String topicName;
private String dataType;
private Schema schema;
private Class<? extends SpecificRecordBase> clazz;}
public class Record1 extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord
{
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("???");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
... }
public class Record1 ...
具有主要处理方法的处理特征。
import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.fs.Writer
import tests.{Record1, Record2, Types}
import scala.reflect.ClassTag
trait Converter[T] extends Serializable {
def convertToModel(message: KafkaSourceType): T
}
trait FlinkRunner extends Serializable {
val kafkaTopicToModelMapping: Map[String, Class[_ <: SpecificRecordBase]] =
Map(
"record_1" -> Types.RECORD_1.getClassType,
"record_2" -> Types.RECORD_2.getClassType
)
def buildAvroSink1(path: String, writer1: Writer[Record1]): BucketingSink[Record1] = ???
def buildAvroSink2(path: String, writer2: Writer[Record2]): BucketingSink[Record2] = ???
def process(topicList: List[String], env: StreamExecutionEnvironment): Unit = {
// producer kafka source building
val clazz1: Class[Record1] = ClassTag(kafkaTopicToModelMapping(topicList.head)).runtimeClass.asInstanceOf[Class[Record1]]
val clazz2: Class[Record2] = ClassTag(kafkaTopicToModelMapping(topicList.tail.head)).runtimeClass.asInstanceOf[Class[Record2]]
// How to makes clazzes list from that val clazzes: List[Class[???]] = ???
val avroTypeInfo1: TypeInformation[Record1] = TypeInformation.of(clazz1)
val avroTypeInfo2: TypeInformation[Record2] = TypeInformation.of(clazz2)
// How to makes clazzes list from that val avroTypeInfos = ???
val stream: DataStream[KafkaSourceType] = ???
// consumer avro paths building, it
val converter1: Converter[Record1] = new Converter[Record1] {
override def convertToModel(message: KafkaSourceType): Record1 = deserializeAvro[Record1](message.value)
}
val converter2: Converter[Record2] = new Converter[Record2] {
override def convertToModel(message: KafkaSourceType): Record2 = deserializeAvro[Record2](message.value)
}
// How to makes converters list from that
val outputResultStream1 = stream
.filter(_.topic == topicList.head)
.map(record => converter1.convertToModel(record))(avroTypeInfo1)
val outputResultStream2 = stream
.filter(_.topic == topicList.tail.head)
.map(record => converter2.convertToModel(record))(avroTypeInfo2)
val writer1 = new AvroSinkWriter[Record1](???)
val writer2 = new AvroSinkWriter[Record2](???)
// add sink and start process
}
}
原样卡夫卡有几个不同的主题。 Kafka版本是10.2,没有Confluent。每个Kafka主题都只能使用一个用Java编写的Avro模式类。唯一的一项Flink作业(用Scala编写)读取唯一的主题,使用一种Avro模式进行转换,并将数据仅写入HDFS中的一个文件夹。名称,路径和输出文件夹名称位于config中。例如,有3个带有参数的作业流:
第一作业流程
--brokersAdress …
--topic record1
--folderName folder1
-- avroClassName Record1
--output C:/….
--jobName SingleTopic1
--number_of_parallel 2
--number_of_task 1
--mainClass Runner
….
第二作业流程
--brokersAdress …
--topic record1
--folderName folder1
-- avroClassName Record1
--output C:/….
--jobName SingleTopic2
--number_of_parallel 2
--number_of_task 1
--mainClass Runner
….
第三工作流程
…
待做一个Flink作业可以读取多个Kafka主题,可以使用不同的Avro架构对其进行转换,并且可以将数据写入不同的文件夹而无需加入。例如,我只能启动一个将执行相同工作的工作流程
--brokersAdress …
--topic record1, record2, record3
--folderName folder1, folder2,
-- avroClassName Record1, Record2
--output C:/….
--jobName MultipleTopics
--number_of_parallel 3
--number_of_task 3
--mainClass Runner
...
好,谢谢。关于代码组织,存在几个问题:1)如何在方法和方法(称为过程)参数中泛化变量,以启动从SpecificRecordBase类继承的几个分类列表?如果可能,请确定。
val clazz1: Class[Record1] = ClassTag(kafkaTopicToModelMapping(topicList.head)).runtimeClass.asInstanceOf[Class[Record1]]
val clazz2: Class[Record2] = ClassTag(kafkaTopicToModelMapping(topicList.tail.head)).runtimeClass.asInstanceOf[Class[Record2]]
2)对于avroTypeInfo1, avroTypeInfo2 ..., converter1, converter2, ..., buildAvroSink1, buildAvroSink2, ... .
,同样的问题>
[我也有关于建筑的问题。我尝试执行此代码,并且Flink在Avro模式类的不同主题下正常工作。哪些Flink代码工具可以帮助我将不同的Avro架构类放入几个outputStrems并为其添加接收器?你有代码示例吗?
而且我还可以使用Flink解决来自不同Kafka主题的多个Avro文件的问题?也许会合。
我尝试更改Flink运行程序代码,以使其从多个Kafka主题读取数据,并将其相应地写入不同的HDFS文件夹,而无需加入。我有很多Java和Scala通用方法,而且...
我对你的动力有点迷茫。一般的想法是,如果要使用通用方法,请使用GenericRecord。如果您有针对不同类型的特定代码,请访问SpecificRecord,但不要在其周围使用通用代码。