java.lang.ClassCastException:类[B无法转换为类org.apache.flink.types.Row

问题描述 投票:0回答:1

我使用apache pyflink 1.18.1。来自 Apache Flink kafka 源的输入数据类型如下所示,

2023-11-01, 2.7, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA
2023-12-01, 2.6, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA
2024-01-01, 3.7, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA
2024-02-01, 3.6, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA
2024-03-01, 3.3, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA

Python 来源是

    class CustomCsvMapFunction(MapFunction):
        def map(self, value):
            str_list = value.split(',')
            return Types.ROW(str_list)  # this line brings the error.(return value type is RowTypeInfo, not flink Row type)
    
    
    source = KafkaSource.builder() \
                .set_bootstrap_servers(kafka_brokerlist) \
                .set_topics(kafka_topic) \
                .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
                .set_value_only_deserializer(SimpleStringSchema()) \
                .build()
    
    ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
    csv_ds = ds.filter(lambda str: not(str.startswith('date')))\
                .map(CustomCsvMapFunction())
            
    csv_ds.print()
    
    type_info = Types.TUPLE([Types.SQL_TIME(), Types.FLOAT(), Types.STRING()\
                , Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()])
            
    jdbcConnOptions = JdbcConnectionOptions.JdbcConnectionOptionsBuilder()\
                        .with_url(mysql_host_url)\
                        .with_driver_name('com.mysql.cj.jdbc.Driver')\
                        .with_user_name(mysql_user)\
                        .with_password(mysql_password)\
                        .build()
    
    jdbcExeOptions = JdbcExecutionOptions.builder()\
                        .with_batch_interval_ms(1000)\
                        .with_batch_size(200)\
                        .with_max_retries(5)\
                        .build()
    
    csv_ds.add_sink(
                JdbcSink.sink(
                    'insert into ' + table_name + ' values (?, ?, ?, ?, ?, ?, ?, ?)',
                    type_info, jdbcConnOptions, jdbcExeOptions))

控制台输出是

RowTypeInfo(2023-11-01, 2.7, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA)
RowTypeInfo(2023-12-01, 2.6, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA)
RowTypeInfo(2024-01-01, 3.7, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA)
RowTypeInfo(2024-02-01, 3.6, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA)
RowTypeInfo(2024-03-01, 3.3, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA)

但是jdbcsink中RowTypeInfo的对象类型不正确,导致错误

java.lang.ClassCastException:类 [B 无法转换为类 org.apache.flink.types.Row([B 位于加载程序“bootstrap”的 java.base 模块中;org.apache.flink.types.Row 位于加载器“app”的未命名模块)

我认为flink流数据类型必须是pyflink Row类型,而不是RowTypeInfo。那么如何将kafka的String数组类型输入转换为Apache Flink Row类型输出,以便我可以将输出保存到jdbc mysql中?

==更新零件

我发现一些输入数据行包含无值“”。所以我修改了一些代码,如下所示,

class CustomCsvMapFunction(MapFunction):
    def map(self, value):
        str_list = value.split(',')
        if str_list[0] != 'date' and str_list[1] != '' and str_list[1] != 'null':
            return Row(str_list[0], float(str_list[1]), str_list[2], str_list[3],
            str_list[4], str_list[5], str_list[6], str_list[7])

但是,这次 JdbcSink 上出现了不同的错误。

py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
        at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629)
        at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34)        
        at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)        
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
        at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
        at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
        at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
        at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
        at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
        at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
        at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
        at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
        at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
        at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
        at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
        at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
        at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
        ... 14 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:89)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:39)
        at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:82)
        at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:33)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at org.apache.flink.streaming.api.operators.python.process.collector.RunnerOutputCollector.collect(RunnerOutputCollector.java:52)
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.emitResult(AbstractExternalOneInputPythonFunctionOperator.java:133)
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:100)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:292)
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.processElement(AbstractExternalOneInputPythonFunctionOperator.java:146)
        at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.processElement(ExternalPythonProcessOperator.java:111)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 22 more
Caused by: java.lang.NullPointerException: Cannot read the array length because "from" is null
        at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.copy(BytePrimitiveArraySerializer.java:52)
        at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.copy(BytePrimitiveArraySerializer.java:31)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)

我希望这些更新的部分将有助于解决这些问题。

apache-flink flink-streaming pyflink
1个回答
0
投票

我也遇到了标题中描述的相同错误,我已经解决了。我发现最终运算符中的类型提示很重要。下面有一个小例子。

from pyflink.common.typeinfo import RowTypeInfo
from pyflink.common import Types, Row

ds \

.map(lambda x: Row(name=x[1].f1, id=int(x[1].f0)),
     output_type=Types.ROW_NAMED(['name', 'id'], [Types.STRING(), Types.LONG()])) \

.add_sink(
    JdbcSink.sink(
        'update user set name = ? where id = ?',
        type_info=RowTypeInfo([Types.STRING(), Types.LONG()], ['name', 'id']),
        jdbc_connection_options=JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
            .with_url('jdbc:mysql://192.168.1.1:3306/pyflink?autocommit=true')
            .with_driver_name('com.mysql.cj.jdbc.Driver')
            .with_user_name('<user_name>')
            .with_password('<password>')
            .build(),
        jdbc_execution_options=JdbcExecutionOptions.builder()
            .with_batch_size(100)
            .with_batch_interval_ms(1000)
            .with_max_retries(3)
            .build()
    )
)
© www.soinside.com 2019 - 2024. All rights reserved.