在集群中的执行器上运行 python 脚本 [Scala/Spark]

问题描述 投票:0回答:1

我有Python脚本:

import sys

for line in sys.stdin:
  print("hello " + line)

我在集群中的工作人员上运行它:

def run(spark: SparkSession) = {

  val data = List("john","paul","george","ringo")

  val dataRDD = sc.makeRDD(data)
  val scriptPath = getClass.getResource("test.py").getPath
  val pipeRDD = dataRDD.pipe(command = "python3 " ++ scriptPath)

  pipeRDD.foreach(println)
}

输出

你好约翰

你好林戈

你好乔治

你好保罗

我有几个问题,请告诉我。 我可以在 python 脚本中访问 Spark 会话吗? 或者我可以在 python 脚本中创建一个文件并将其保存到 hdfs 文件系统吗?

其实这就是我想做的 我想在 python 脚本中创建 csv 文件,并将它们保存到 hdfs。

还有一个小问题。 是否可以向工人发送命令来安装 python 包?

例如:pip install pandas

!更新: 我对 python 文件做了一些更改。

#!/usr/bin/python
# -*- coding: utf-8 -*-

#import pandas as pd
import sys
import os

for line in sys.stdin:
    with open('readme.csv', 'w') as f:
        f.write('Name,Last Name\nМихаил,Зубенко')

print(os.getcwd() + '/readme.csv')

文件在容器内创建:

enter image description here

现在我有一个问题。我如何访问该文件?

python scala apache-spark hadoop hdfs
1个回答
1
投票

我可以在 python 脚本中访问 Spark 会话吗

不使用管道命令,不。

Python 脚本中的文件

对于初学者,我建议您使用 PySpark 而不是 Scala,假设您“需要”Python。

rdd = sparkContext.parallelize(["john","paul","george","ringo"])
hello = rdd.mapValues(lambda s: "hello " + s)
for s in rdd.collect():
  print(s)

创建一个文件...并将其保存到hdfs文件系统...

或者,您也可以从 Scala 写入 HDFS。根本不清楚为什么需要 Python。 Scala 也可以创建/读取 CSV 文件。

[在工人身上]安装 python 包

有可能吗,是的,但是,PySpark 已经在

spark-submit
--py-files
争论期间使用 ZIP/EGG 文件提供了支持

例如,熊猫

SparkSQL DataFrames 主要取代了对 Pandas 的需求,你应该使用它们而不是 RDD。您可以使用 Polars 项目在 Spark 原生数据帧和 pandas 之间进行转换

© www.soinside.com 2019 - 2024. All rights reserved.