我尝试使用 Python 3.9 和 Apache Flink 以及 PyFlink 在 AWS EMR 集群 (v7.3.0) 上运行 Flink 作业。我的作业从 AWS Kinesis 流中读取数据并将流数据打印到控制台。但是,当我在 EMR 上执行作业时(通过 SSH 连接到主节点后),它失败并出现以下错误:
# test.py
import argparse
import json
from pyflink.datastream.connectors.kinesis import FlinkKinesisConsumer
from pyflink.common import SimpleStringSchema, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.functions import MapFunction
class ProcessImage(MapFunction):
def open(self, runtime_context):
print("Constructor launched ...")
def map(self, value):
try:
if not value:
print("Empty Kinesis record received.")
return None
# Parse the JSON-formatted record
record = json.loads(value)
print(f"Processed record: {record}")
return value
except Exception as e:
print(f"Error processing record: {e}")
raise
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="StreamingApp")
parser.add_argument(
"--stream_name",
type=str,
help="The name of Kinesis Stream to connect Flink with.",
required=True,
)
parser.add_argument(
"--region",
type=str,
help="The region name of the streaming application.",
choices=["us-east-1", "us-west-2"],
required=True,
)
args = parser.parse_args()
stream_name: str = args.stream_name
region: str = args.region
# Set up the Flink environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.add_jars(
"file:////home/hadoop/flink-connector-kinesis-4.3.0-1.18.jar",
"file:////home/hadoop/joda-time-2.12.5.jar",
)
# Kinesis Consumer properties
kinesis_consumer_config = {
"aws.region": region,
"stream.initial.position": "LATEST",
"aws.credentials.provider": "AUTO",
}
# Set up the Kinesis consumer to read from the Kinesis stream
kinesis_source = FlinkKinesisConsumer(
stream_name,
SimpleStringSchema(),
kinesis_consumer_config,
)
# Define the stream pipeline
stream = env.add_source(kinesis_source)
# Process record
processed_stream = stream.map(ProcessImage(), output_type=Types.STRING())
processed_stream.print()
# Execute the Flink job
env.execute()
sh-5.2$ python3 /home/hadoop/test.py --stream_name TestStream --region us-west-2
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR orHADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Traceback (most recent call last):
File "/home/hadoop/test.py", line 67, in <module>
main()
File "/home/hadoop/test.py", line 63, in main
env.execute("Flink Kinesis Processing Job")
File "/home/ssm-user/.local/lib/python3.9/site-packages/pyflink/datastream/stream_execution_environment.py", line 824, in execute
return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
File "/home/ssm-user/.local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/ssm-user/.local/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 146, in deco
return f(*a, **kw)
File "/home/ssm-user/.local/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
...
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
....
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration
at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)
at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:36)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.createKinesisClient(KinesisProxy.java:268)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.<init>(KinesisProxy.java:152)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.create(KinesisProxy.java:280)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:412)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:366)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:541)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
错误的主要部分似乎是:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration
at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)
我已尝试添加所需的 JAR 文件,包括在我的 Python 脚本中使用
flink-connector-kinesis-4.3.0-1.18.jar
并验证我具有正确的 IAM 权限,但是,该脚本仍然始终失败。
是否有其他人遇到过此问题?这是否可能是由于 AWS SDK 与 Flink Kinesis 连接器版本之间不兼容造成的?