在pysparkSQL中,我有一个名为bmd2
的DataFrame,如下所示:
DataFrame[genres: string, id: int, tagline: string, title: string, vote_average: double, vote_count: int]
数据bmd2['genres']
是这样的:
bmd2.select('genres').show():
+--------------------+
| genres|
+--------------------+
|[{'id': 16, 'name...|
|[{'id': 12, 'name...|
|[{'id': 10749, 'n...|
|[{'id': 35, 'name...|
|[{'id': 35, 'name...|
|[{'id': 28, 'name...|
|[{'id': 35, 'name...|
|[{'id': 28, 'name...|
|[{'id': 28, 'name...|
|[{'id': 12, 'name...|
|[{'id': 35, 'name...|
|[{'id': 35, 'name...|
|[{'id': 10751, 'n...|
|[{'id': 36, 'name...|
|[{'id': 28, 'name...|
|[{'id': 18, 'name...|
|[{'id': 18, 'name...|
|[{'id': 80, 'name...|
|[{'id': 80, 'name...|
|[{'id': 28, 'name...|
+--------------------+
only showing top 20 rows
列'genres'中的数据类型是字符串,但它们可以在python中转换为带有'eval function'的dicts列表。那么我应该如何在这里应用eval()将字符串传递到每一行的列表?我尝试了很多方法:
- bmd2.select('genres'.astype('list')):AttributeError:'str'对象没有属性'astype'
- bmd2.select(eval('genres')):NameError:未定义名称'genres'
- bmd2.withColumn('genres',eval('genres')):NameError:名称'genres'未定义
我写这个作为答案,因为我找不到评论选项。我建议你看一下pyspark.sql.functions中的from_json。例如,这是你如何使用它:
# given a row that looks like:
+----------genres-------------+
| [{ id:1, name:"hiphop"}] |
+-----------------------------+
# define a schema
schema = ArrayType(StructType().add("id", IntegerType())\
.add("name", StringType()))
# transform
new_df = df.select(from_json("genres", schema).alias("genres_dict"))
# display
new_df.printSchema()
new_df.show()
还有一种方法可以使用名为regexp_extract的函数来实现此目的。但以上是我个人的偏好。此外,如果要切换回原始字符串,可以使用to_json函数。希望这可以帮助。
我通过使用UDF解决了我的问题,UDF是用户定义的函数。
首先,导入它:
from pyspark.sql.functions import udf
然后,定义您的UDF,就像一个匿名函数:
getdirector = udf(lambda x:[i['name'] for i in x if i['job'] == 'Director'],StringType())
您应该在此处指定返回值的类型,以便获得具有预期类型的返回值。然后,您可以像其他函数一样在代码中调用此UDF。
cres2 = cres1.select('id',getcharacter('cast').alias('cast'),getdirector('crew').alias('crew'))
在这个问题中,我可以修改UDF以获得我需要的任何类型。