PySpark 流式传输与 AWS Kinesis Datastream 连接

问题描述 投票:0回答:1

我正在尝试将 AWS Kinesis 数据流读入

PySpark
sql 数据帧。

这是我的Python代码

import pyspark as ps

spark = (
    ps.sql.SparkSession.builder
    .config(map= {
        'spark.jars': r'./streaming-app-poc/src/spark-streaming-sql-kinesis-connector_2.12-1.0.0.jar'
    })
    .getOrCreate()
)

(
    spark
    .readStream
    .format('aws-kinesis')
    .options(**{
        'kinesis.region'          : 'us-east-2',
        'kinesis.streamName'      : 'sensor-data-stream',
        'kinesis.consumerType'    : 'GetRecords',
        'kinesis.endpointUrl'     : 'https://kinesis.us-east-2.amazonaws.com',
        'kinesis.startingposition': 'LATEST'
    })
    .load()
)

我收到以下错误

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[15], line 12
      1 (
      2     spark
      3     .readStream
      4     .format('aws-kinesis')
      5     .options(**{
      6         'kinesis.region'          : 'us-east-2',
      7         'kinesis.streamName'      : 'sensor-data-stream',
      8         'kinesis.consumerType'    : 'GetRecords',
      9         'kinesis.endpointUrl'     : 'https://kinesis.us-east-2.amazonaws.com',
     10         'kinesis.startingposition': 'LATEST'
     11     })
---> 12     .load()
     13 )

File c:\Users\hdahmed\AppData\Local\miniconda3\envs\pyspark\Lib\site-packages\pyspark\sql\streaming\readwriter.py:304, in DataStreamReader.load(self, path, format, schema, **options)
    302     return self._df(self._jreader.load(path))
    303 else:
--> 304     return self._df(self._jreader.load())

File c:\Users\hdahmed\AppData\Local\miniconda3\envs\pyspark\Lib\site-packages\py4j\java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File c:\Users\hdahmed\AppData\Local\miniconda3\envs\pyspark\Lib\site-packages\pyspark\errors\exceptions\captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
    177 def deco(*a: Any, **kw: Any) -> Any:
    178     try:
--> 179         return f(*a, **kw)
    180     except Py4JJavaError as e:
    181         converted = convert_exception(e.java_exception)

File c:\Users\hdahmed\AppData\Local\miniconda3\envs\pyspark\Lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o92.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: aws-kinesis. Please find packages at `https://spark.apache.org/third-party-projects.html`.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
    at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:158)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:145)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.ClassNotFoundException: aws-kinesis.DefaultSource
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
    at scala.util.Failure.orElse(Try.scala:224)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
    ... 12 more

我已使用

PySpark
安装了
pip install pyspark
并安装了来自
https://github.com/awslabs/spark-sql-kinesis-connector
spark-kinesis

连接器 jar 文件

我是

spark
PySpark
的新手,据我了解,
spark
无法在 jar 文件中找到类。

我想知道我在哪里犯了错误或者我的实现中有问题

如果有人可以分享

spark-kinesis
连接器的文档,我们将不胜感激

apache-spark pyspark apache-spark-sql spark-streaming amazon-kinesis
1个回答
0
投票

此图片展示了Python代码,代码的目的是使用PySpark从AWS Kinesis Data Stream中读取数据并加载到SQL数据帧中。

以下是对be代码的分析:

  1. 导入模块: 将 pyspark 导入为 ps 这行代码导入了pyspark模块,并将其简称为ps。

  2. 创建SparkSession:

    火花=( ps.sql.SparkSession.builder .config(“spark.jars”,“r'/streaming-app-poc/src/spark-streaming-sql-kinesis-connector'”) .getOrCreate() ) 这部分代码了一个SparkSession。SparkSession是使用Spark功能的入口点。配置中指定了一个JAR文件的路径,这可能是用于创建连接Kinesis的Spark连接器。

  3. 读取Kinesis数据流:

    ( Spark.readStream .format("aws - 运动") 。选项( kinesis.region = "美国-东部-2", kinesis.streamName = "传感器-数据-流", kinesis.consumerType = "获取记录", kinesis.endpointUrl = "https://kinesis.us - east - 2.amazonaws.com", kinesis.startingPosition = "最新" ) 。加载() ) 这部分代码尝试从 AWS Kinesis 中读取数据流。它指定了以下参数:

• 区域:Kinesis 数据流所在的 AWS 区域(us - east - 2)。

•streamName:要读取的Kinesis数据流的名称(传感器-数据-流)。

• ConsumerType:消费者类型(GetRecords)。

• endpointUrl:Kinesis 服务的端点URL。

•起始位置:从数据流的最新位置开始读取(LATEST)。

  1. 错误信息: 图片的下半部分显示了一个Py4JJavaError,这表明在上述执行代码时出现了Java相关的错误。错误的具体位置在Cell In [15],第12行。

错误可能与配置或连接问题有关。可能的解决方法包括:

• 检查JAR文件路径是否正确。

•确保AWS凭证和权限正确配置,以便能够访问Kinesis数据流。

• 检查网络连接,确保能够访问Kinesis 服务。

总结: 我的代码尝试使用 PySpark 从 AWS Kinesis 中读取数据流,但遇到了 Java 相关的错误。需要进一步检查配置和连接问题来解决此错误。

© www.soinside.com 2019 - 2024. All rights reserved.