spark drop 重复项中的序列化错误

问题描述 投票:0回答:1

我在 Spark 中使用

dropDuplicates
函数时遇到序列化问题。这是我正在使用的代码:

override def innerTransform(dataFrames: Map[ReaderKey, DataFrame]): DataFrame = {
    val fromKafka = super.innerTransform(dataFrames)
    fromKafka.dropDuplicates("column8")
}

[java.io.InvalidObjectException: ReflectiveOperationException during deserialization
    at java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:280)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1190)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2266)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2157)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721)
    at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1100)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2423)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1100)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2423)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1100)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2423)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:88)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.reflect.InvocationTargetException
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:278)
    ... 91 more
Caused by: java.lang.IllegalArgumentException: too many arguments
    at java.base/java.lang.invoke.LambdaMetafactory.altMetafactory(LambdaMetafactory.java:511)
    at scala.runtime.LambdaDeserializer$.makeCallSite$1(LambdaDeserializer.scala:105)
    at scala.runtime.LambdaDeserializer$.deserializeLambda(LambdaDeserializer.scala:114)
    at scala.runtime.LambdaDeserialize.deserializeLambda(LambdaDeserialize.java:38)
    at org.apache.spark.sql.execution.LocalTableScanExec.$deserializeLambda$(LocalTableScanExec.scala)
    ... 96 more]

这是我从innerTransform获得的结果模式:

root
 |-- column1: struct (nullable = true)
 |    |-- column1: binary (nullable = true)
 |    |-- column2: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- column1: string (nullable = true)
 |    |    |    |-- column2: binary (nullable = true)
 |    |-- column3: string (nullable = true)
 |    |-- column4: integer (nullable = false)
 |    |-- column5: long (nullable = false)
 |    |-- column6: timestamp (nullable = true)
 |    |-- column7: integer (nullable = false)
 |-- column8: string (nullable = true)
 |-- column9: string (nullable = true)
 |-- column10: string (nullable = true)
 |-- column11: string (nullable = true)
 |-- column12: string (nullable = true)
 |-- column13: string (nullable = true)
 |-- column14: string (nullable = true)
 |-- column15: string (nullable = true)
 |-- column16: string (nullable = true)
 |-- column17: integer (nullable = true)
 |-- column18: string (nullable = true)
 |-- column19: string (nullable = true)
 |-- column20: string (nullable = true)
 |-- column21: struct (nullable = true)
 |    |-- column1: string (nullable = true)
 |    |-- column2: string (nullable = true)
 |    |-- column3: string (nullable = true)
 |    |-- column4: string (nullable = true)
 |    |-- column5: string (nullable = true)
 |    |-- column6: string (nullable = true)
 |    |-- column7: string (nullable = true)
 |-- column22: struct (nullable = true)
 |    |-- column1: long (nullable = false)
 |    |-- column2: long (nullable = false)
scala apache-spark spark-structured-streaming
1个回答
0
投票

我目前也面临着同样的问题,如果您找到解决方案,请与我分享, 谢谢。

© www.soinside.com 2019 - 2024. All rights reserved.