我正在尝试设置 Spark 在我的 ThinkPad 笔记本电脑上运行。我按照下面链接中列出的说明进行操作。
https://medium.com/@ansabiqbal/setting-up-apache-spark-pyspark-on-windows-11-machine-e16b7382624a
我的系统属性看起来像这样。
因此,您可以看到我的 Hadoop_Home、Java_Home、Spark_Home 和 Path 值。我认为这些都是正确的,除非我在这里遗漏了一些东西。
现在,我进入 Anaconda 提示符并输入“pip install findspark”,然后在同一命令提示符中输入“pyspark”。
我还编辑了“bashrc.sh”文件,并在文件底部添加了四行: 导出 SPARK_HOME=/opt/spark 导出 PATH=$SPARK_HOME/bin:$PATH 导出 PYSPARK_DRIVER_PYTHON=jupyter 导出 PYSPARK_DRIVER_PYTHON_OPTS='笔记本'
然后,我保存了更改并关闭了文件。现在,我启动 Jupyter Notebook 并输入以下代码。
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name
spark = SparkSession.builder.appName("SampleDataExample").getOrCreate()
folder_path = "C:\\Users\\ryan_\\Desktop\\Coding\\Python\\CSV Merge All Files into One File\\multiple_text_files\\*.txt"
df = spark.read.text(folder_path).withColumn("filename", input_file_name())
df.show()
它运行了几分钟,但什么也没发生,所以我取消了作业并重新启动了内核。我觉得这项工作应该在几秒钟内完成。我觉得有些东西设置不正确,或者配置不正确,或者其他什么。这里有人经历过这种行为吗?有人有针对这种情况的解决方案吗?
重新启动我的机器并重新启动内核后,我得到了下面的代码来工作。
# Read a single CSV file into a dataframe
from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder.appName("ReadCSV").getOrCreate()
# Path to the CSV file
file_path = 'C:\\Users\\Book1.csv'
# Read the CSV file into a DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)
# Show the DataFrame
df.show()
另外,让下面的代码正常工作,所以我想我现在已经准备好了。
# Merge multiple CSV files in a folder into a single Dataframe
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import os
# Create a Spark session
spark = SparkSession.builder.appName("MergeCSVFiles").getOrCreate()
# Path to the folder containing CSV files
folder_path = 'C:\\Users\\CSV\\'
# List all CSV files in the folder
csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
# Initialize an empty DataFrame to store the merged data
merged_df = None
# Loop through each CSV file and merge into the DataFrame
for file_name in csv_files:
file_path = os.path.join(folder_path, file_name)
# Read the CSV file into a DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)
# Add a new column with the input file name
df = df.withColumn("file_name", lit(file_name))
# Union the DataFrame with the merged DataFrame
if merged_df is None:
merged_df = df
else:
merged_df = merged_df.union(df)
# Show the merged DataFrame
merged_df.show()