我已将 Flink 升级到 1.9 版本,并且 RichParallelSourceFunction 现已弃用。如何迁移并保持并行性?

问题描述 投票:0回答:1

我有这段代码,我使用 RichParallelSourceFunction 创建一个实现 SourceFuntion 接口的类。

class SourceWithSleep(private val input: List<ByteArray>, private val sleep: Long) :
        RichParallelSourceFunction<ByteArray>() {

        @Volatile
        private var stop = false

        override fun run(ctx: SourceFunction.SourceContext<ByteArray>) {
            input.forEach { ctx.collect(it) }
            Thread.sleep(sleep)
        }

        override fun cancel() {
            stop = true
        }
    }

如何根据 Flink 文档迁移到使用 Source 接口,但保持并行性?我相信 Source 没有考虑到这一点。

我试过这个:

class SourceWithSleep(
        private val input: List<ByteArray>,
        private val sleep: Long
    ) : Source<ByteArray, SourceSplit, Void> {

        override fun getBoundedness(): Boundedness = Boundedness.BOUNDED

        override fun createReader(
            readerContext: SourceReaderContext
        ): SourceReader<ByteArray, SourceSplit> {
            return SourceWithSleepReader(input.iterator(), sleep)
        }

        override fun createEnumerator(
            enumeratorContext: SplitEnumeratorContext<SourceSplit>
        ): SplitEnumerator<SourceSplit, Void> {
            return SimpleSplitEnumerator()
        }

        override fun restoreEnumerator(
            enumeratorContext: SplitEnumeratorContext<SourceSplit>,
            checkpoint: Void?
        ): SplitEnumerator<SourceSplit, Void> {
            return SimpleSplitEnumerator()
        }

        override fun getSplitSerializer(): SimpleVersionedSerializer<SourceSplit> {
            return SplitSerializer()
        }

        override fun getEnumeratorCheckpointSerializer(): SimpleVersionedSerializer<Void> {
            return VoidSerializer()
        }
    }

    class SourceWithSleepReader(
        private val inputIterator: Iterator<ByteArray>,
        private val sleep: Long
    ) : SourceReader<ByteArray, SourceSplit> {
        
        override fun start() {}

        override fun pollNext(output: ReaderOutput<ByteArray>?): InputStatus {
            if (stop) return InputStatus.END_OF_INPUT

            if (inputIterator.hasNext()) {
                output?.collect(inputIterator.next())
                Thread.sleep(sleep)
                return InputStatus.MORE_AVAILABLE
            }

            return InputStatus.END_OF_INPUT
        }

        override fun snapshotState(checkpointId: Long): List<SourceSplit> {
            return emptyList()
        }

        override fun isAvailable(): CompletableFuture<Void> {
            return CompletableFuture.completedFuture(null)

        override fun notifyNoMoreSplits() {}
        override fun addSplits(splits: MutableList<SourceSplit>?) {}
        override fun close() {

        class SimpleSplitEnumerator : SplitEnumerator<SourceSplit, Void> {
        override fun start() {}
        override fun handleSplitRequest(subtaskId: Int, requesterHostname: String?) {}
        override fun addSplitsBack(splits: List<SourceSplit>, subtaskId: Int) {}
        override fun addReader(readerId: Int) {}
        override fun snapshotState(checkpointId: Long): Void? = null
        override fun close() {}
    }

    class SplitSerializer: SimpleVersionedSerializer<SourceSplit> {
        override fun getVersion(): Int = 1
        override fun serialize(split: SourceSplit?): ByteArray {
            return split.toString().toByteArray()
        }
        override fun deserialize(version: Int, serialized: ByteArray?): SourceSplit? {
            throw UnsupportedOperationException("No splits expected.")
        }
    }

    class VoidSerializer: SimpleVersionedSerializer<Void> {
        override fun getVersion(): Int = 1
        override fun serialize(split: Void?): ByteArray {
            return split.toString().toByteArray()
        }
        override fun deserialize(version: Int, serialized: ByteArray?): Void? {
            throw UnsupportedOperationException("No splits expected.")
        }
    }

但是,我在理解此解决方案是否能保证并行性方面遇到问题。我对 Flink 很陌生,刚刚离开文档,我在理解这一点时遇到了问题。

kotlin parallel-processing apache-flink
1个回答
0
投票

当调用新 Source 接口的

createReader()
方法时,它会传递一个
SourceReaderContext
,它提供有关并行性和正在调用的实例(其子任务索引)的详细信息。这应该为您提供实现正确的并行源函数所需的信息。

© www.soinside.com 2019 - 2024. All rights reserved.