我正在努力在 scala flink 应用程序上编写单元测试。
例如,我有一个如下所示的异步映射器。它需要一个带有 id 的 User 对象,并随着年龄而丰富:
case class User(id: String)
case class UserWithMeta(id: String, age: Long)
class EnrichWithUserMeta extends RichAsyncFunction[User, UserWithMeta] {
protected var userMetadataCache: CaffeineHelper[Long] = _
override def open(parameters: Configuration): Unit = {
userMetadataCache = CaffeineHelper.newCache[Long](10.seconds)
}
override def asyncInvoke(input: User, resultFuture: ResultFuture[UserWithMeta]): Unit = {
val age = getUserAge(input.id)
resultFuture.complete(Seq(UserWithMeta(input.id, age)))
}
private def getUserAge(userId: String): Long = {
userMetadataCache.get(userId) match {
case Some(age) => age
case None => { // Cache miss. Get it from DB.
val age = getAgeFromDB(content)
userMetadataCache.put(userId)(age)
age
}
}
}
}
上面的映射器有一个 CaffeineHelper 的缓存实例。 CaffeineHelper 是 Caffeine 缓存库的包装器。这是 CaffeineHelper.scala:
trait CaffeineHelper[V] extends Serializable {
@transient protected var cache: CaffeineCache[V]
def get(id: String): Option[V] = Try(cache.get(id).get).toOption
def put(key: String)(value: V) = cache.put(key)(value)
}
object CaffeineHelper {
def newCache[V](d: Duration): CaffeineHelper[V] = {
new CaffeineHelper[V] {
override protected var cache: CaffeineCache[V] = ??? // Actual implementation.
}
}
}
使用上面的异步映射器的主要 flink 应用程序运行良好。但是,当我尝试编写单元测试时,模拟的 CaffeineHelper 无法序列化。
这是测试类:
package org.example
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.scala.{AsyncDataStream, StreamExecutionEnvironment}
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.mockito.IdiomaticMockito.StubbingOps
import org.mockito.Mockito.withSettings
import org.mockito.mock.SerializableMode
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar.mock
import scalacache.caffeine.CaffeineCache
import java.util.concurrent.TimeUnit
class EnrichWithUserMetaTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
private val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().build())
before(flinkCluster.before())
after(flinkCluster.after())
"EnrichWithUserMetaMapper" should "enrich with brand" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val caffeineMock = mock[CaffeineHelper[Long]](withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS))
val ageMapper = new EnrichWithUserMeta() {
override def open(parameters: Configuration): Unit = {
userMetadataCache = caffeineMock
}
}
caffeineMock.get("content1") returns Some(4L)
val stream = env.fromElements(
User("user1")
)
val enrichedProducts = AsyncDataStream
.unorderedWait(stream, ageMapper, 5, TimeUnit.SECONDS, 1)
.executeAndCollect(1)
enrichedProducts.head shouldBe UserWithMeta("user1", 4L)
}
}
错误信息:
public default scala.Option org.example.CaffeineHelper.get(java.lang.String) is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.common.InvalidProgramException: public default scala.Option org.example.CaffeineHelper.get(java.lang.String) is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
我还尝试将模拟配置为可序列化
val caffeineMock = mock[CaffeineHelper[Long]](withSettings().serializable())
但我还有另一个错误:
org.mockito.internal.invocation.RealMethod$FromCallable@656a3d6b is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.common.InvalidProgramException: org.mockito.internal.invocation.RealMethod$FromCallable@656a3d6b is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
我还尝试过其他模拟库,例如ScalaMock。但也没有成功。
您应该将
userMetadataCache
标记为 @transient
。您不需要 Flink 在 Flink 客户端中初始化此变量,序列化它,并将其发送到每个任务管理器 - 这就是它现在正在尝试做的事情。