我正在尝试将 AWS Kinesis 数据流读入
PySpark
sql 数据帧。
这是我的Python代码
import pyspark as ps
spark = (
ps.sql.SparkSession.builder
.config(map= {
'spark.jars': r'./streaming-app-poc/src/spark-streaming-sql-kinesis-connector_2.12-1.0.0.jar'
})
.getOrCreate()
)
(
spark
.readStream
.format('aws-kinesis')
.options(**{
'kinesis.region' : 'us-east-2',
'kinesis.streamName' : 'sensor-data-stream',
'kinesis.consumerType' : 'GetRecords',
'kinesis.endpointUrl' : 'https://kinesis.us-east-2.amazonaws.com',
'kinesis.startingposition': 'LATEST'
})
.load()
)
我收到以下错误
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[15], line 12
1 (
2 spark
3 .readStream
4 .format('aws-kinesis')
5 .options(**{
6 'kinesis.region' : 'us-east-2',
7 'kinesis.streamName' : 'sensor-data-stream',
8 'kinesis.consumerType' : 'GetRecords',
9 'kinesis.endpointUrl' : 'https://kinesis.us-east-2.amazonaws.com',
10 'kinesis.startingposition': 'LATEST'
11 })
---> 12 .load()
13 )
File c:\Users\hdahmed\AppData\Local\miniconda3\envs\pyspark\Lib\site-packages\pyspark\sql\streaming\readwriter.py:304, in DataStreamReader.load(self, path, format, schema, **options)
302 return self._df(self._jreader.load(path))
303 else:
--> 304 return self._df(self._jreader.load())
File c:\Users\hdahmed\AppData\Local\miniconda3\envs\pyspark\Lib\site-packages\py4j\java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File c:\Users\hdahmed\AppData\Local\miniconda3\envs\pyspark\Lib\site-packages\pyspark\errors\exceptions\captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
177 def deco(*a: Any, **kw: Any) -> Any:
178 try:
--> 179 return f(*a, **kw)
180 except Py4JJavaError as e:
181 converted = convert_exception(e.java_exception)
File c:\Users\hdahmed\AppData\Local\miniconda3\envs\pyspark\Lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o92.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: aws-kinesis. Please find packages at `https://spark.apache.org/third-party-projects.html`.
at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:158)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:145)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.ClassNotFoundException: aws-kinesis.DefaultSource
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
... 12 more
我已使用
PySpark
安装了 pip install pyspark
并安装了来自 https://github.com/awslabs/spark-sql-kinesis-connector的
spark-kinesis
连接器 jar 文件
我是
spark
和 PySpark
的新手,据我了解,spark
无法在 jar 文件中找到类。
我想知道我在哪里犯了错误或者我的实现中有问题
如果有人可以分享
spark-kinesis
连接器的文档,我们将不胜感激
此图片展示了Python代码,代码的目的是使用PySpark从AWS Kinesis Data Stream中读取数据并加载到SQL数据帧中。
以下是对be代码的分析:
导入模块: 将 pyspark 导入为 ps 这行代码导入了pyspark模块,并将其简称为ps。
创建SparkSession:
火花=( ps.sql.SparkSession.builder .config(“spark.jars”,“r'/streaming-app-poc/src/spark-streaming-sql-kinesis-connector'”) .getOrCreate() ) 这部分代码了一个SparkSession。SparkSession是使用Spark功能的入口点。配置中指定了一个JAR文件的路径,这可能是用于创建连接Kinesis的Spark连接器。
读取Kinesis数据流:
( Spark.readStream .format("aws - 运动") 。选项( kinesis.region = "美国-东部-2", kinesis.streamName = "传感器-数据-流", kinesis.consumerType = "获取记录", kinesis.endpointUrl = "https://kinesis.us - east - 2.amazonaws.com", kinesis.startingPosition = "最新" ) 。加载() ) 这部分代码尝试从 AWS Kinesis 中读取数据流。它指定了以下参数:
• 区域:Kinesis 数据流所在的 AWS 区域(us - east - 2)。
•streamName:要读取的Kinesis数据流的名称(传感器-数据-流)。
• ConsumerType:消费者类型(GetRecords)。
• endpointUrl:Kinesis 服务的端点URL。
•起始位置:从数据流的最新位置开始读取(LATEST)。
错误可能与配置或连接问题有关。可能的解决方法包括:
• 检查JAR文件路径是否正确。
•确保AWS凭证和权限正确配置,以便能够访问Kinesis数据流。
• 检查网络连接,确保能够访问Kinesis 服务。
总结: 我的代码尝试使用 PySpark 从 AWS Kinesis 中读取数据流,但遇到了 Java 相关的错误。需要进一步检查配置和连接问题来解决此错误。