我有Python脚本:
import sys
for line in sys.stdin:
print("hello " + line)
我在集群中的工作人员上运行它:
def run(spark: SparkSession) = {
val data = List("john","paul","george","ringo")
val dataRDD = sc.makeRDD(data)
val scriptPath = getClass.getResource("test.py").getPath
val pipeRDD = dataRDD.pipe(command = "python3 " ++ scriptPath)
pipeRDD.foreach(println)
}
输出
你好约翰
你好林戈
你好乔治
你好保罗
我有几个问题,请告诉我。 我可以在 python 脚本中访问 Spark 会话吗? 或者我可以在 python 脚本中创建一个文件并将其保存到 hdfs 文件系统吗?
其实这就是我想做的 我想在 python 脚本中创建 csv 文件,并将它们保存到 hdfs。
还有一个小问题。 是否可以向工人发送命令来安装 python 包?
例如:pip install pandas
!更新: 我对 python 文件做了一些更改。
#!/usr/bin/python
# -*- coding: utf-8 -*-
#import pandas as pd
import sys
import os
for line in sys.stdin:
with open('readme.csv', 'w') as f:
f.write('Name,Last Name\nМихаил,Зубенко')
print(os.getcwd() + '/readme.csv')
文件在容器内创建:
现在我有一个问题。我如何访问该文件?
我可以在 python 脚本中访问 Spark 会话吗
不使用管道命令,不。
Python 脚本中的文件
对于初学者,我建议您使用 PySpark 而不是 Scala,假设您“需要”Python。
rdd = sparkContext.parallelize(["john","paul","george","ringo"])
hello = rdd.mapValues(lambda s: "hello " + s)
for s in rdd.collect():
print(s)
创建一个文件...并将其保存到hdfs文件系统...
或者,您也可以从 Scala 写入 HDFS。根本不清楚为什么需要 Python。 Scala 也可以创建/读取 CSV 文件。
[在工人身上]安装 python 包
有可能吗,是的,但是,PySpark 已经在
spark-submit
和 --py-files
争论期间使用 ZIP/EGG 文件提供了支持
例如,熊猫
SparkSQL DataFrames 主要取代了对 Pandas 的需求,你应该使用它们而不是 RDD。您可以使用 Polars 项目在 Spark 原生数据帧和 pandas 之间进行转换