我在 Spark 中使用
dropDuplicates
函数时遇到序列化问题。这是我正在使用的代码:
override def innerTransform(dataFrames: Map[ReaderKey, DataFrame]): DataFrame = {
val fromKafka = super.innerTransform(dataFrames)
fromKafka.dropDuplicates("column8")
}
[java.io.InvalidObjectException: ReflectiveOperationException during deserialization
at java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:280)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1190)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2266)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2157)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1100)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2423)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1100)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2423)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1100)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2423)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:88)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:278)
... 91 more
Caused by: java.lang.IllegalArgumentException: too many arguments
at java.base/java.lang.invoke.LambdaMetafactory.altMetafactory(LambdaMetafactory.java:511)
at scala.runtime.LambdaDeserializer$.makeCallSite$1(LambdaDeserializer.scala:105)
at scala.runtime.LambdaDeserializer$.deserializeLambda(LambdaDeserializer.scala:114)
at scala.runtime.LambdaDeserialize.deserializeLambda(LambdaDeserialize.java:38)
at org.apache.spark.sql.execution.LocalTableScanExec.$deserializeLambda$(LocalTableScanExec.scala)
... 96 more]
这是我从innerTransform获得的结果模式:
root
|-- column1: struct (nullable = true)
| |-- column1: binary (nullable = true)
| |-- column2: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- column1: string (nullable = true)
| | | |-- column2: binary (nullable = true)
| |-- column3: string (nullable = true)
| |-- column4: integer (nullable = false)
| |-- column5: long (nullable = false)
| |-- column6: timestamp (nullable = true)
| |-- column7: integer (nullable = false)
|-- column8: string (nullable = true)
|-- column9: string (nullable = true)
|-- column10: string (nullable = true)
|-- column11: string (nullable = true)
|-- column12: string (nullable = true)
|-- column13: string (nullable = true)
|-- column14: string (nullable = true)
|-- column15: string (nullable = true)
|-- column16: string (nullable = true)
|-- column17: integer (nullable = true)
|-- column18: string (nullable = true)
|-- column19: string (nullable = true)
|-- column20: string (nullable = true)
|-- column21: struct (nullable = true)
| |-- column1: string (nullable = true)
| |-- column2: string (nullable = true)
| |-- column3: string (nullable = true)
| |-- column4: string (nullable = true)
| |-- column5: string (nullable = true)
| |-- column6: string (nullable = true)
| |-- column7: string (nullable = true)
|-- column22: struct (nullable = true)
| |-- column1: long (nullable = false)
| |-- column2: long (nullable = false)
我目前也面临着同样的问题,如果您找到解决方案,请与我分享, 谢谢。