如何在pysparK中定义半结构化文本文件的Schema

问题描述 投票:0回答:0
1 2013-07-25    11599,CLOSED
2 2013-07-25    256,PENDING_PAYMENT
3 2013-07-25    12111,COMPLETE
4 2013-07-25    8827,CLOSED
5 2013-07-25    11318,COMPLETE
6 2013-07-25    7130,COMPLETE
7 2013-07-25    4530,COMPLETE
8 2013-07-25    2911,PROCESSING
9 2013-07-25    5657,PENDING_PAYMENT
10 2013-07-25   5648,PENDING_PAYMENT
11 2013-07-25   918,PAYMENT_REVIEW
12 2013-07-25   1837,CLOSED

以上数据为半结构化文本文件数据。

第二列由空格分隔。 第三列由制表符分隔。 第 4 列由 , 分隔。

如何为每一列定义架构(数据类型),例如第一列“int”、第二列“timestamp”、第三列“int”、第四列“string”。

我尝试按照下面的代码将此记录分成每一行

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pysp  ark.sql.functions import regexp_extract
from pyspark.sql.types import IntegerType, StructField, StructType, 
     StringType, TimestampType
 
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")

my_conf.set("spark.master","local[*]")

spark = SparkSession.builder.config(conf=my_conf).getOrCreate()

schema1 = StructType([StructField("order_id", IntegerType(),True),
StructField("date", TimestampType(),True),
StructField("customer_id", IntegerType(),True),
StructField("status", StringType(),True)])


myregex = r'^(\S+) (\S+)\t(\S+)\,(\S+)'

lines_df = spark.read.format("text")\
           .option("path","C:/Users/Lenovo/Desktop/week11/week 11 
            datasets/orders_new.csv").load()
 
 
final_df =lines_df.select(regexp_extract('value',myregex,1).alias("order_id"),
regexp_extract('value',myregex,2).alias("date"),
regexp_extract('value',myregex,3).alias("customer_id"),
regexp_extract('value',myregex,4).alias("status"))


final_df.show()`

ans========
+--------+----------+-----------+---------------+
|order_id|      date|customer_id|         status|
+--------+----------+-----------+---------------+
|       1|2013-07-25|      11599|         CLOSED|
|       2|2013-07-25|        256|PENDING_PAYMENT|
|       3|2013-07-25|      12111|       COMPLETE|
|       4|2013-07-25|       8827|         CLOSED|
|       5|2013-07-25|      11318|       COMPLETE|
|       6|2013-07-25|       7130|       COMPLETE|
|       7|2013-07-25|       4530|       COMPLETE|
|       8|2013-07-25|       2911|     PROCESSING|
|       9|2013-07-25|       5657|PENDING_PAYMENT|
|      10|2013-07-25|       5648|PENDING_PAYMENT|
|      11|2013-07-25|        918| PAYMENT_REVIEW|
|      12|2013-07-25|       1837|         CLOSED|
+--------+----------+-----------+---------------+

final_df.printSchema()

|-- order_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- status: string (nullable = true)

as in printschema i get string datatype only...



------but now to define schema for this
whenever i do

    df=spark.createDataFrame(final_df.rdd,schema1)

    final_df.show()    -------i get error here

so how to define schema
plzz tell..
python pyspark apache-spark-sql bigdata pyspark-schema
© www.soinside.com 2019 - 2024. All rights reserved.