Flink RocksDB 自定义选项工厂配置错误禁用块缓存

问题描述 投票:0回答:1

我正在运行 Flink 1.15.2 并尝试在 RocksDB 中定义自定义选项工厂以禁用块缓存。

遵循这篇博文中的示例:https://shopify.engineering/optimizing-apache-flink-applications-tips

但是,我的 Flink 应用程序在将

OptionsFactory
添加到我的环境后拒绝启动。错误似乎来自这一行 https://github.com/facebook/rocksdb/blob/main/table/block_based/block_based_table_factory.cc#L599

Enable cache_index_and_filter_blocks, , but block cache is disabled

我不知道这里出了什么问题,一定是某些东西覆盖/忽略了我的列选项,但我不知道是什么。

非常感谢任何帮助。


配置在这里:

class NoBlockCacheRocksDbOptionsFactory extends ConfigurableRocksDBOptionsFactory {
  override def createDBOptions(currentOptions: DBOptions, handlesToClose: util.Collection[AutoCloseable]): DBOptions = {
    currentOptions.setMaxBackgroundJobs(20) // state.backend.rocksdb.thread.num
    currentOptions
  }

  override def createColumnOptions(
      currentOptions: ColumnFamilyOptions,
      handlesToClose: util.Collection[AutoCloseable]): ColumnFamilyOptions = {

    val blockBasedTableConfig = new BlockBasedTableConfig()
      .setNoBlockCache(true)
      .setBlockCache(null)
      .setCacheIndexAndFilterBlocks(false)
      .setCacheIndexAndFilterBlocksWithHighPriority(false)
      .setPinL0FilterAndIndexBlocksInCache(false)

    currentOptions.setTableFormatConfig(blockBasedTableConfig)
  }

  override def configure(configuration: ReadableConfig): RocksDBOptionsFactory = {
    this
  }
}

错误和堆栈跟踪:

Caused by: org.apache.flink.util.SerializedThrowable: Enable cache_index_and_filter_blocks, , but block cache is disabled
    at org.rocksdb.RocksDB.open(Native Method) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.rocksdb.RocksDB.open(RocksDB.java:306) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:80) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:134) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:315) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) ~[flink-dist-1.15.2.jar:1.15.2]
    ... 11 more
scala apache-flink flink-streaming rocksdb rocksdb-java
1个回答
0
投票

尝试将 flink 配置 state.backend.rocksdb.memory.managed 从默认值 true 设置为 false。 此设置覆盖作为 rocksdb 选项工厂的一部分设置的配置。我能够在禁用更改此值的情况下运行作业。

© www.soinside.com 2019 - 2024. All rights reserved.