使用 Python 在 AWS EMR 上执行 Flink 作业失败并出现“NoClassDefFoundError”

问题描述 投票:0回答:1

我尝试使用 Python 3.9 和 Apache Flink 以及 PyFlink 在 AWS EMR 集群 (v7.3.0) 上运行 Flink 作业。我的作业从 AWS Kinesis 流中读取数据并将流数据打印到控制台。但是,当我在 EMR 上执行作业时(通过 SSH 连接到主节点后),它失败并出现以下错误:

# test.py

import argparse
import json
from pyflink.datastream.connectors.kinesis import FlinkKinesisConsumer
from pyflink.common import SimpleStringSchema, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.functions import MapFunction


class ProcessImage(MapFunction):
    def open(self, runtime_context):
        print("Constructor launched ...")

    def map(self, value):
        try:
            if not value:
                print("Empty Kinesis record received.")
                return None

            # Parse the JSON-formatted record
            record = json.loads(value)
            print(f"Processed record: {record}")

            return value
        except Exception as e:
            print(f"Error processing record: {e}")
            raise


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="StreamingApp")

    parser.add_argument(
        "--stream_name",
        type=str,
        help="The name of Kinesis Stream to connect Flink with.",
        required=True,
    )
    parser.add_argument(
        "--region",
        type=str,
        help="The region name of the streaming application.",
        choices=["us-east-1", "us-west-2"],
        required=True,
    )

    args = parser.parse_args()
    stream_name: str = args.stream_name
    region: str = args.region

    # Set up the Flink environment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

    env.add_jars(
        "file:////home/hadoop/flink-connector-kinesis-4.3.0-1.18.jar",
        "file:////home/hadoop/joda-time-2.12.5.jar",
    )

    # Kinesis Consumer properties
    kinesis_consumer_config = {
        "aws.region": region,
        "stream.initial.position": "LATEST",
        "aws.credentials.provider": "AUTO",
    }

    # Set up the Kinesis consumer to read from the Kinesis stream
    kinesis_source = FlinkKinesisConsumer(
        stream_name,
        SimpleStringSchema(),
        kinesis_consumer_config,
    )

    # Define the stream pipeline
    stream = env.add_source(kinesis_source)

    # Process record
    processed_stream = stream.map(ProcessImage(), output_type=Types.STRING())
    processed_stream.print()

    # Execute the Flink job
    env.execute()


sh-5.2$ python3 /home/hadoop/test.py --stream_name TestStream --region us-west-2
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR orHADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Traceback (most recent call last):
  File "/home/hadoop/test.py", line 67, in <module>
    main()
  File "/home/hadoop/test.py", line 63, in main
    env.execute("Flink Kinesis Processing Job")
  File "/home/ssm-user/.local/lib/python3.9/site-packages/pyflink/datastream/stream_execution_environment.py", line 824, in execute
    return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/home/ssm-user/.local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/home/ssm-user/.local/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 146, in deco
    return f(*a, **kw)
  File "/home/ssm-user/.local/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
...
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
....
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration
        at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)
        at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:36)
        at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.createKinesisClient(KinesisProxy.java:268)
        at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.<init>(KinesisProxy.java:152)
        at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.create(KinesisProxy.java:280)
        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:412)
        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:366)
        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:541)
        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)

错误的主要部分似乎是:

Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration
        at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)

我已尝试添加所需的 JAR 文件,包括在我的 Python 脚本中使用

flink-connector-kinesis-4.3.0-1.18.jar
并验证我具有正确的 IAM 权限,但是,该脚本仍然始终失败。

是否有其他人遇到过此问题?这是否可能是由于 AWS SDK 与 Flink Kinesis 连接器版本之间不兼容造成的?
  1. Kinesis 连接器在 EMR 设置中的 PyFlink 上工作是否需要任何额外的依赖项或配置?
  2. 此问题是否与 EMR 上 Flink 中的默认类加载器配置有关?
  3. 谢谢。

apache-flink amazon-emr flink-streaming pyflink
1个回答
0
投票
env.add_jars()

jar 不包含依赖项。您需要包含所有必需的依赖项。有一个方便的 uber/fat jar 可以用于此目的,从这里获取它

https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kinesis/4.3.0-1.18
它被称为

flink-connector-kinesis-4.3.0-1.18

,因为它旨在与 Flink SQL 客户端一起使用。然而,这可以在 Python 应用程序中重用,并且不仅仅限于 SQL;它也包含数据流和表连接器

    

© www.soinside.com 2019 - 2024. All rights reserved.