1 2013-07-25 11599,CLOSED
2 2013-07-25 256,PENDING_PAYMENT
3 2013-07-25 12111,COMPLETE
4 2013-07-25 8827,CLOSED
5 2013-07-25 11318,COMPLETE
6 2013-07-25 7130,COMPLETE
7 2013-07-25 4530,COMPLETE
8 2013-07-25 2911,PROCESSING
9 2013-07-25 5657,PENDING_PAYMENT
10 2013-07-25 5648,PENDING_PAYMENT
11 2013-07-25 918,PAYMENT_REVIEW
12 2013-07-25 1837,CLOSED
以上数据为半结构化文本文件数据。
第二列由空格分隔。 第三列由制表符分隔。 第 4 列由 , 分隔。
如何为每一列定义架构(数据类型),例如第一列“int”、第二列“timestamp”、第三列“int”、第四列“string”。
我尝试按照下面的代码将此记录分成每一行
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pysp ark.sql.functions import regexp_extract
from pyspark.sql.types import IntegerType, StructField, StructType,
StringType, TimestampType
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
schema1 = StructType([StructField("order_id", IntegerType(),True),
StructField("date", TimestampType(),True),
StructField("customer_id", IntegerType(),True),
StructField("status", StringType(),True)])
myregex = r'^(\S+) (\S+)\t(\S+)\,(\S+)'
lines_df = spark.read.format("text")\
.option("path","C:/Users/Lenovo/Desktop/week11/week 11
datasets/orders_new.csv").load()
final_df =lines_df.select(regexp_extract('value',myregex,1).alias("order_id"),
regexp_extract('value',myregex,2).alias("date"),
regexp_extract('value',myregex,3).alias("customer_id"),
regexp_extract('value',myregex,4).alias("status"))
final_df.show()`
ans========
+--------+----------+-----------+---------------+
|order_id| date|customer_id| status|
+--------+----------+-----------+---------------+
| 1|2013-07-25| 11599| CLOSED|
| 2|2013-07-25| 256|PENDING_PAYMENT|
| 3|2013-07-25| 12111| COMPLETE|
| 4|2013-07-25| 8827| CLOSED|
| 5|2013-07-25| 11318| COMPLETE|
| 6|2013-07-25| 7130| COMPLETE|
| 7|2013-07-25| 4530| COMPLETE|
| 8|2013-07-25| 2911| PROCESSING|
| 9|2013-07-25| 5657|PENDING_PAYMENT|
| 10|2013-07-25| 5648|PENDING_PAYMENT|
| 11|2013-07-25| 918| PAYMENT_REVIEW|
| 12|2013-07-25| 1837| CLOSED|
+--------+----------+-----------+---------------+
final_df.printSchema()
|-- order_id: string (nullable = true)
|-- date: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- status: string (nullable = true)
as in printschema i get string datatype only...
------but now to define schema for this
whenever i do
df=spark.createDataFrame(final_df.rdd,schema1)
final_df.show() -------i get error here
so how to define schema
plzz tell..