我有一个 AWS 托管的 flink 作业写入 Kafka。我似乎无法弄清楚为什么我的大部分记录都被路由到单个任务,尽管有 64 个并行度。
下游kafka topic有16个分区。我认为这可能是一个问题。而且我的密钥基数非常低,我读到这可能有助于为一项任务分配特定的密钥。我注意到有时检查点持续时间也会相当长 >(2 分钟)。
这是我的数据流代码:
public class DataStreamJob {
private static final Logger LOG = LoggerFactory.getLogger(DataStreamJob.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// initilize config
FetchConfigBuilder configBuilder = new FetchConfigBuilder();
configBuilder.withConfigProvider(new KinesisRuntimeConfigProvider());
configBuilder.withConfigProvider(new EnvironmentConfigProvider());
FetchConfigV2 configV2 = configBuilder.build();
Config jobConfig = new Config(configV2);
// base properties to consume from event-tracking kafka
Properties properties = KafkaPropertiesUtil.getBaseProperties();
// msk properties to consume smart carousel cdc stream from msk kafka
Properties mskProperties = KafkaPropertiesUtil.getMskProperties();
// offsetInitializers
OffsetsInitializer earliestCommitedOffsets = OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST);
OffsetsInitializer latestOffsets = OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST);
OffsetsInitializer earliestOffsets = OffsetsInitializer.earliest();
// protobuf KyroSerializer register
env.getConfig().registerTypeWithKryoSerializer(VideoAdCrudEvent.Video.class, ProtobufSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(Unit.class, ProtobufSerializer.class);
// carousel mobile event source
KafkaSource<CarouselEvent> carouselSource = createKafkaSource(
jobConfig.sourceMskServicesExpressBootstrapServer, jobConfig.sourceMobileAnalyticsTopic, jobConfig.mskConsumerGroupId,
new CarouselEventDeserSchema(), mskProperties, latestOffsets);
// stream of carousel events
DataStream<CarouselEvent> carouselStream = env
.fromSource(carouselSource, WatermarkStrategy.noWatermarks(), "Carousel Source")
.filter(new CarouselImpressionFilter())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarouselEvent>forBoundedOutOfOrderness(Duration.ofMillis(10000))
.withIdleness(Duration.ofMillis(10000))
.withTimestampAssigner((event, timestamp) -> event.getOriginTs()))
.uid("carousel-impressions-stream");
// aggregation
DataStream<CarouselImpressionAggResult> runningCountStream = carouselStream
.keyBy(event -> event.getAdUnitId())
.process(new RunningTotalImpressionCountFunction())
.uid("running-total-impression-count-stream");
// kafka sink
runningCountStream.addSink(new KafkaProtobufSink(jobConfig.sourceMskServicesBootstrapServer, jobConfig.sinkKafkaTopic, jobConfig.schemaRegistryUrl, jobConfig.protoSchemaId)).name("kafka-running-count-sink");
// Execute program, beginning computation.
env.execute("ad-impression-aggregation");
}
// Create Kafka Source utility method
private static <T> KafkaSource<T> createKafkaSource(String bootstrapServers, String topic, String groupId, DeserializationSchema<T> deserializer, Properties properties, OffsetsInitializer offsetsInitializer) {
return KafkaSource.<T>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(topic)
.setGroupId(groupId)
.setStartingOffsets(offsetsInitializer)
.setValueOnlyDeserializer(deserializer)
.setProperties(properties)
.build();
}
这是我的 KafkaSink
public class KafkaProtobufSink extends RichSinkFunction<CarouselImpressionAggResult> {
private transient Producer<String, UnitImpression> producer;
private final String topic;
private final String bootstrapServers;
private final String schemaRegistryUrl;
private final String protoSchemaId;
private static final Logger LOG = LoggerFactory.getLogger(KafkaProtobufSink.class);
public KafkaProtobufSink(String bootstrapServers, String topic, String schemaRegistryUrl, String protoSchemaId){
this.topic = topic;
this.bootstrapServers = bootstrapServers;
this.schemaRegistryUrl = schemaRegistryUrl;
this.protoSchemaId = protoSchemaId;
}
@Override
public void open(Configuration parameters) throws Exception {
Properties properties = KafkaPropertiesUtil.getProtoProducerProperties(schemaRegistryUrl, bootstrapServers, protoSchemaId);
Producer<String, UnitImpression> producer = new KafkaProducer<String, UnitImpression>(properties);
this.producer = producer;
LOG.info("initializing producer");
}
@Override
public void invoke(CarouselImpressionAggResult element, Context context) {
LOG.info("Invoke method called with element: {}", element);
try {
com.adsrv.unit.v1.PlacementLocation ad_placement = PlacementLocation.PLACEMENT_LOCATION_UNSPECIFIED;
String count = String.valueOf(element.getImpressionTotalUserCount());
String placement = String.valueOf(element.getPlacement());
String unit_id = element.getAdUnitId();
// check for specific placement location
if (placement.equals("collections")) {
ad_placement = PlacementLocation.PLACEMENT_LOCATION_COLLECTIONS;
} else if (placement.equals("discover")) {
ad_placement = PlacementLocation.PLACEMENT_LOCATION_CAROUSEL;
}
UnitImpression value = UnitImpression.newBuilder()
.setCount(count)
.setPlacement(ad_placement) // for placement location carousel
.build();
String key = unit_id;
ProducerRecord<String, UnitImpression> record = new ProducerRecord<String, UnitImpression>(topic, key, value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
LOG.error("Error sending record to Kafka: ", exception);
} else {
LOG.info("Record sent to topic {} partition {} offset {}", metadata.topic(), metadata.partition(), metadata.offset());
}
});
LOG.info("Record produced: {}", record);
} catch (Exception e) {
LOG.error("Exception in invoke method: ", e);
}
}
@Override
public void close() {
LOG.info("producer closed");
producer.close();
}
}
有什么特别突出的吗?我真的不确定如何继续这里
编辑:
我也在 AWS Managed Flink 控制台中看到了此异常:
locationInformation
org.apache.flink.runtime.rest.handler.AbstractHandler.handleException(AbstractHandler.java:269)
logger
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
message
Unhandled exception.
messageSchemaVersion
1
messageType
ERROR
threadName
flink-pekko.actor.default-dispatcher-21
throwableInformation
org.apache.flink.util.FlinkException: Failed to transfer file from TaskExecutor 172.21.118.149:6122-f1aef7.
at org.apache.flink.runtime.rest.handler.taskmanager.AbstractTaskManagerFileHandler.handleException(AbstractTaskManagerFileHandler.java:224)
at org.apache.flink.runtime.rest.handler.taskmanager.AbstractTaskManagerFileHandler.lambda$respondToRequest$1(AbstractTaskManagerFileHandler.java:159)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:261)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1265)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:45)
at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:309)
at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629)
at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:35)
at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.io.FileNotFoundException: The file LOG does not exist on the TaskExecutor.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:2222)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
当密钥基数较低时,Flink 不能很好地为任务分配密钥。可能你不走运,大部分或所有的键都被分配给同一个任务。
kafka分区的数量也给sink的有效并行度设置了上限。
不相关的评论:这看起来像是使用 Table API 会让事情变得更容易的情况。一方面,它具有内置的 protobuf 支持。如果您想坚持使用 DataStream API,最好使用 KafkaSink,而不是实现您自己的。