Flink Scala 无法序列化模拟对象

问题描述 投票:0回答:1

我正在努力在 scala flink 应用程序上编写单元测试。

例如,我有一个如下所示的异步映射器。它需要一个带有 id 的 User 对象,并随着年龄而丰富:

case class User(id: String)
case class UserWithMeta(id: String, age: Long)

class EnrichWithUserMeta extends RichAsyncFunction[User, UserWithMeta] {
  protected var userMetadataCache: CaffeineHelper[Long] = _

  override def open(parameters: Configuration): Unit = {
    userMetadataCache = CaffeineHelper.newCache[Long](10.seconds)
  }

  override def asyncInvoke(input: User, resultFuture: ResultFuture[UserWithMeta]): Unit = {
    val age = getUserAge(input.id)
    resultFuture.complete(Seq(UserWithMeta(input.id, age)))
  }

  private def getUserAge(userId: String): Long = {
    userMetadataCache.get(userId) match {
      case Some(age) => age
      case None => { // Cache miss. Get it from DB.
        val age = getAgeFromDB(content)
        userMetadataCache.put(userId)(age)
        age
      }
    }
  }
}

上面的映射器有一个 CaffeineHelper 的缓存实例。 CaffeineHelper 是 Caffeine 缓存库的包装器。这是 CaffeineHelper.scala:

trait CaffeineHelper[V] extends Serializable {
  @transient protected var cache: CaffeineCache[V]

  def get(id: String): Option[V] = Try(cache.get(id).get).toOption

  def put(key: String)(value: V) = cache.put(key)(value)
}

object CaffeineHelper {
  def newCache[V](d: Duration): CaffeineHelper[V] = {
    new CaffeineHelper[V] {
      override protected var cache: CaffeineCache[V] = ??? // Actual implementation.
    }
  }
}

使用上面的异步映射器的主要 flink 应用程序运行良好。但是,当我尝试编写单元测试时,模拟的 CaffeineHelper 无法序列化。

这是测试类:

package org.example

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.scala.{AsyncDataStream, StreamExecutionEnvironment}
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.mockito.IdiomaticMockito.StubbingOps
import org.mockito.Mockito.withSettings
import org.mockito.mock.SerializableMode
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar.mock
import scalacache.caffeine.CaffeineCache
import java.util.concurrent.TimeUnit


class EnrichWithUserMetaTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
  private val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().build())

  before(flinkCluster.before())
  after(flinkCluster.after())

  "EnrichWithUserMetaMapper" should "enrich with brand" in {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val caffeineMock = mock[CaffeineHelper[Long]](withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS))

    val ageMapper = new EnrichWithUserMeta() {
      override def open(parameters: Configuration): Unit = {
        userMetadataCache = caffeineMock
      }
    }

    caffeineMock.get("content1") returns Some(4L)

    val stream = env.fromElements(
      User("user1")
    )

    val enrichedProducts = AsyncDataStream
      .unorderedWait(stream, ageMapper, 5, TimeUnit.SECONDS, 1)
      .executeAndCollect(1)

    enrichedProducts.head shouldBe UserWithMeta("user1", 4L)
  }
}

错误信息:

public default scala.Option org.example.CaffeineHelper.get(java.lang.String) is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.common.InvalidProgramException: public default scala.Option org.example.CaffeineHelper.get(java.lang.String) is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)

我还尝试将模拟配置为可序列化

val caffeineMock = mock[CaffeineHelper[Long]](withSettings().serializable())

但我还有另一个错误:

org.mockito.internal.invocation.RealMethod$FromCallable@656a3d6b is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.common.InvalidProgramException: org.mockito.internal.invocation.RealMethod$FromCallable@656a3d6b is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)

我还尝试过其他模拟库,例如ScalaMock。但也没有成功。

apache-flink flink-streaming
1个回答
0
投票

您应该将

userMetadataCache
标记为
@transient
。您不需要 Flink 在 Flink 客户端中初始化此变量,序列化它,并将其发送到每个任务管理器 - 这就是它现在正在尝试做的事情。

© www.soinside.com 2019 - 2024. All rights reserved.