Flink 作业(64 个任务)将记录路由到 1 个任务

问题描述 投票:0回答:1

我有一个 AWS 托管的 flink 作业写入 Kafka。我似乎无法弄清楚为什么我的大部分记录都被路由到单个任务,尽管有 64 个并行度。

下游kafka topic有16个分区。我认为这可能是一个问题。而且我的密钥基数非常低,我读到这可能有助于为一项任务分配特定的密钥。我注意到有时检查点持续时间也会相当长 >(2 分钟)。

这是我的数据流代码:

public class DataStreamJob {

private static final Logger LOG = LoggerFactory.getLogger(DataStreamJob.class);

public static void main(String[] args) throws Exception {
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    
    // initilize config
    FetchConfigBuilder configBuilder = new FetchConfigBuilder();
    configBuilder.withConfigProvider(new KinesisRuntimeConfigProvider());
    configBuilder.withConfigProvider(new EnvironmentConfigProvider());

    FetchConfigV2 configV2 = configBuilder.build();

    Config jobConfig = new Config(configV2);

    // base properties to consume from event-tracking kafka
    Properties properties = KafkaPropertiesUtil.getBaseProperties();

    // msk properties to consume smart carousel cdc stream from msk kafka
    Properties mskProperties = KafkaPropertiesUtil.getMskProperties();

    // offsetInitializers
    OffsetsInitializer earliestCommitedOffsets = OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST);
    OffsetsInitializer latestOffsets = OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST);
    OffsetsInitializer earliestOffsets = OffsetsInitializer.earliest();

    // protobuf KyroSerializer register
    env.getConfig().registerTypeWithKryoSerializer(VideoAdCrudEvent.Video.class, ProtobufSerializer.class);
    env.getConfig().registerTypeWithKryoSerializer(Unit.class, ProtobufSerializer.class);
    
    // carousel mobile event source
    KafkaSource<CarouselEvent> carouselSource = createKafkaSource(
        jobConfig.sourceMskServicesExpressBootstrapServer, jobConfig.sourceMobileAnalyticsTopic, jobConfig.mskConsumerGroupId,
        new CarouselEventDeserSchema(), mskProperties, latestOffsets);
    
    // stream of carousel events
    DataStream<CarouselEvent> carouselStream = env
        .fromSource(carouselSource, WatermarkStrategy.noWatermarks(), "Carousel Source")
        .filter(new CarouselImpressionFilter())
        .assignTimestampsAndWatermarks(
            WatermarkStrategy.<CarouselEvent>forBoundedOutOfOrderness(Duration.ofMillis(10000))
                .withIdleness(Duration.ofMillis(10000))
                .withTimestampAssigner((event, timestamp) -> event.getOriginTs()))
        .uid("carousel-impressions-stream");
    
    // aggregation 
    DataStream<CarouselImpressionAggResult> runningCountStream = carouselStream
        .keyBy(event -> event.getAdUnitId())
        .process(new RunningTotalImpressionCountFunction())
        .uid("running-total-impression-count-stream");

    // kafka sink   
    runningCountStream.addSink(new KafkaProtobufSink(jobConfig.sourceMskServicesBootstrapServer, jobConfig.sinkKafkaTopic, jobConfig.schemaRegistryUrl, jobConfig.protoSchemaId)).name("kafka-running-count-sink");

    // Execute program, beginning computation.
    env.execute("ad-impression-aggregation");
}

    // Create Kafka Source utility method

    private static <T> KafkaSource<T> createKafkaSource(String bootstrapServers, String topic, String groupId, DeserializationSchema<T> deserializer, Properties properties, OffsetsInitializer offsetsInitializer) {
        return KafkaSource.<T>builder()
            .setBootstrapServers(bootstrapServers)
            .setTopics(topic)
            .setGroupId(groupId)
            .setStartingOffsets(offsetsInitializer)
            .setValueOnlyDeserializer(deserializer)
            .setProperties(properties)
            .build();
    }

Here's what I'm seeing in the UI

这是我的 KafkaSink

public class KafkaProtobufSink extends RichSinkFunction<CarouselImpressionAggResult> {

private transient Producer<String, UnitImpression> producer;
private final String topic;
private final String bootstrapServers;
private final String schemaRegistryUrl;
private final String protoSchemaId;

private static final Logger LOG = LoggerFactory.getLogger(KafkaProtobufSink.class);

public KafkaProtobufSink(String bootstrapServers, String topic, String schemaRegistryUrl, String protoSchemaId){

    this.topic = topic;
    this.bootstrapServers = bootstrapServers;
    this.schemaRegistryUrl = schemaRegistryUrl;
    this.protoSchemaId = protoSchemaId;
    
}

@Override
public void open(Configuration parameters) throws Exception {

    Properties properties = KafkaPropertiesUtil.getProtoProducerProperties(schemaRegistryUrl, bootstrapServers, protoSchemaId);
    
    Producer<String, UnitImpression> producer = new KafkaProducer<String, UnitImpression>(properties);
    this.producer = producer;

    LOG.info("initializing producer");
}

@Override
public void invoke(CarouselImpressionAggResult element, Context context) {
    LOG.info("Invoke method called with element: {}", element);

    try {
        
        com.adsrv.unit.v1.PlacementLocation ad_placement = PlacementLocation.PLACEMENT_LOCATION_UNSPECIFIED;

        String count = String.valueOf(element.getImpressionTotalUserCount());
        String placement = String.valueOf(element.getPlacement());

        String unit_id = element.getAdUnitId();

        // check for specific placement location
        if (placement.equals("collections")) {
            ad_placement = PlacementLocation.PLACEMENT_LOCATION_COLLECTIONS;
        } else if (placement.equals("discover")) {
            ad_placement = PlacementLocation.PLACEMENT_LOCATION_CAROUSEL;
        }

        UnitImpression value = UnitImpression.newBuilder()
            .setCount(count)
            .setPlacement(ad_placement) // for placement location carousel
            .build();

        String key = unit_id;
        ProducerRecord<String, UnitImpression> record = new ProducerRecord<String, UnitImpression>(topic, key, value);
        
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                LOG.error("Error sending record to Kafka: ", exception);
            } else {
                LOG.info("Record sent to topic {} partition {} offset {}", metadata.topic(), metadata.partition(), metadata.offset());
            }
        });
        LOG.info("Record produced: {}", record);
    } catch (Exception e) {
        LOG.error("Exception in invoke method: ", e);
    }
}


@Override
public void close() {
    LOG.info("producer closed");
    producer.close();
}

}

有什么特别突出的吗?我真的不确定如何继续这里

编辑:

我也在 AWS Managed Flink 控制台中看到了此异常:

locationInformation
org.apache.flink.runtime.rest.handler.AbstractHandler.handleException(AbstractHandler.java:269)
logger
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
message
Unhandled exception.
messageSchemaVersion
1
messageType
ERROR
threadName
flink-pekko.actor.default-dispatcher-21
throwableInformation
org.apache.flink.util.FlinkException: Failed to transfer file from TaskExecutor 172.21.118.149:6122-f1aef7.
at org.apache.flink.runtime.rest.handler.taskmanager.AbstractTaskManagerFileHandler.handleException(AbstractTaskManagerFileHandler.java:224)
at org.apache.flink.runtime.rest.handler.taskmanager.AbstractTaskManagerFileHandler.lambda$respondToRequest$1(AbstractTaskManagerFileHandler.java:159)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:261)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1265)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:45)
at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:309)
at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629)
at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:35)
at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.io.FileNotFoundException: The file LOG does not exist on the TaskExecutor.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:2222)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
apache-kafka apache-flink streaming
1个回答
0
投票

当密钥基数较低时,Flink 不能很好地为任务分配密钥。可能你不走运,大部分或所有的键都被分配给同一个任务。

kafka分区的数量也给sink的有效并行度设置了上限。

不相关的评论:这看起来像是使用 Table API 会让事情变得更容易的情况。一方面,它具有内置的 protobuf 支持。如果您想坚持使用 DataStream API,最好使用 KafkaSink,而不是实现您自己的。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.