我有这段代码,我使用 RichParallelSourceFunction 创建一个实现 SourceFuntion 接口的类。
class SourceWithSleep(private val input: List<ByteArray>, private val sleep: Long) :
RichParallelSourceFunction<ByteArray>() {
@Volatile
private var stop = false
override fun run(ctx: SourceFunction.SourceContext<ByteArray>) {
input.forEach { ctx.collect(it) }
Thread.sleep(sleep)
}
override fun cancel() {
stop = true
}
}
如何根据 Flink 文档迁移到使用 Source 接口,但保持并行性?我相信 Source 没有考虑到这一点。
我试过这个:
class SourceWithSleep(
private val input: List<ByteArray>,
private val sleep: Long
) : Source<ByteArray, SourceSplit, Void> {
override fun getBoundedness(): Boundedness = Boundedness.BOUNDED
override fun createReader(
readerContext: SourceReaderContext
): SourceReader<ByteArray, SourceSplit> {
return SourceWithSleepReader(input.iterator(), sleep)
}
override fun createEnumerator(
enumeratorContext: SplitEnumeratorContext<SourceSplit>
): SplitEnumerator<SourceSplit, Void> {
return SimpleSplitEnumerator()
}
override fun restoreEnumerator(
enumeratorContext: SplitEnumeratorContext<SourceSplit>,
checkpoint: Void?
): SplitEnumerator<SourceSplit, Void> {
return SimpleSplitEnumerator()
}
override fun getSplitSerializer(): SimpleVersionedSerializer<SourceSplit> {
return SplitSerializer()
}
override fun getEnumeratorCheckpointSerializer(): SimpleVersionedSerializer<Void> {
return VoidSerializer()
}
}
class SourceWithSleepReader(
private val inputIterator: Iterator<ByteArray>,
private val sleep: Long
) : SourceReader<ByteArray, SourceSplit> {
override fun start() {}
override fun pollNext(output: ReaderOutput<ByteArray>?): InputStatus {
if (stop) return InputStatus.END_OF_INPUT
if (inputIterator.hasNext()) {
output?.collect(inputIterator.next())
Thread.sleep(sleep)
return InputStatus.MORE_AVAILABLE
}
return InputStatus.END_OF_INPUT
}
override fun snapshotState(checkpointId: Long): List<SourceSplit> {
return emptyList()
}
override fun isAvailable(): CompletableFuture<Void> {
return CompletableFuture.completedFuture(null)
override fun notifyNoMoreSplits() {}
override fun addSplits(splits: MutableList<SourceSplit>?) {}
override fun close() {
class SimpleSplitEnumerator : SplitEnumerator<SourceSplit, Void> {
override fun start() {}
override fun handleSplitRequest(subtaskId: Int, requesterHostname: String?) {}
override fun addSplitsBack(splits: List<SourceSplit>, subtaskId: Int) {}
override fun addReader(readerId: Int) {}
override fun snapshotState(checkpointId: Long): Void? = null
override fun close() {}
}
class SplitSerializer: SimpleVersionedSerializer<SourceSplit> {
override fun getVersion(): Int = 1
override fun serialize(split: SourceSplit?): ByteArray {
return split.toString().toByteArray()
}
override fun deserialize(version: Int, serialized: ByteArray?): SourceSplit? {
throw UnsupportedOperationException("No splits expected.")
}
}
class VoidSerializer: SimpleVersionedSerializer<Void> {
override fun getVersion(): Int = 1
override fun serialize(split: Void?): ByteArray {
return split.toString().toByteArray()
}
override fun deserialize(version: Int, serialized: ByteArray?): Void? {
throw UnsupportedOperationException("No splits expected.")
}
}
但是,我在理解此解决方案是否能保证并行性方面遇到问题。我对 Flink 很陌生,刚刚读完文档,我在理解这一点时遇到了问题。
当调用新 Source 接口的
createReader()
方法时,它会传递一个 SourceReaderContext
,它提供有关并行性和正在调用的实例(其子任务索引)的详细信息。这应该为您提供实现正确的并行源函数所需的信息。