文件系统连接器接收器如何工作

问题描述 投票:0回答:1

我使用以下简单代码来说明文件系统连接器的行为。 我有两个观察结果想要询问并确认。

  1. 如果我没有启用检查点,那么所有生成的part-XXX文件的文件名中总是包含

    inprogress
    ,这是否意味着这些文件没有提交?另外,这是否意味着如果我想使用文件系统连接器接收器,那么我总是需要
    enable checkpointing
    以便可以提交生成的文件并且下游(如hive或flink)可以发现并读取这些文件?

  2. 分区中的

    inprogress
    文件什么时候恢复正常?当新分区创建后,检查点开始运行,然后使前一个分区中的文件从inprogress变为正式分区时,是否会发生这种情况?如果是这样,则可能存在延迟(检查点间隔)以使分区可见。

  3. 我在代码中设置了滚动间隔为20秒,但是当我查看生成的part-XXX文件时,后续文件的创建时间相差25秒。我以为应该是20秒

例如,

part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-10 2021-0‎1-0‎3 ‏‎12:39:04  
part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-11 2021-0‎1-0‎3 ‏‎12:39:29

代码是:

 val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.enableCheckpointing(10*1000)
    env.setStateBackend(new FsStateBackend("file:///d:/flink-checkpoints"))
    val ds: DataStream[MyEvent] = env.addSource(new InfiniteEventSource(emitInterval = 5 * 1000))
    val tenv = StreamTableEnvironment.create(env)
    tenv.createTemporaryView("sourceTable", ds)

    ds.print()

    val ddl =
      s"""
      create table sinkTable(
      id string,
      p_day STRING,
      p_hour STRING,
      p_min STRING

      ) partitioned by(p_day, p_hour, p_min) with (
        'connector' = 'filesystem',
        'path' = 'D:/csv-${System.currentTimeMillis()}',
        'format' = 'csv',
        'sink.rolling-policy.check-interval' = '5 s',
        'sink.rolling-policy.rollover-interval' = '20 s',
        'sink.partition-commit.trigger'='process-time',
         'sink.partition-commit.policy.kind'='success-file',
        'sink.partition-commit.delay' = '0 s'
      )
      """.stripMargin(' ')

    tenv.executeSql(ddl)

    tenv.executeSql(
      """
     insert into sinkTable
      select id, date_format(occurrenceTime,'yyyy-MM-dd'), date_format(occurrenceTime, 'HH'), date_format(occurrenceTime, 'mm')  from sourceTable

     """.stripMargin(' '))

    env.execute()
  }
apache-flink flink-sql
1个回答
2
投票

第 1 点已在 StreamingFileSink 文档中介绍:

重要提示:使用 StreamingFileSink 时需要启用检查点。零件文件只能在成功的检查点上完成。如果禁用检查点,零件文件将永远处于

in-progress
pending
状态,并且无法被下游系统安全读取。

对于第 2 点,部分文件生命周期记录在here,这解释了

in-progress
文件根据滚动策略转换为
pending
,并且仅在检查点完成时才变为
finished
。因此,根据滚动策略和检查点间隔,某些文件可能会
pending
持续相当长的一段时间。

对于第 3 点,

rollover-interval
为 20 秒,
check-interval
为 5 秒,翻转将在 20 到 25 秒之间发生。有关 check-interval 的说明,请参阅
滚动策略
文档:

检查基于时间的滚动策略的时间间隔。这控制频率以检查零件文件是否应基于“sink.rolling-policy.rollover-interval”进行翻转。

© www.soinside.com 2019 - 2024. All rights reserved.