我在PySpark中有以下DataFrame df
。
import pyspark.sql.functions as func
df = spark\
.read \
.format("org.elasticsearch.spark.sql") \
.load("my_index/my_mapping") \
.groupBy(["id", "type"]) \
.agg(
func.count(func.lit(1)).alias("number_occurrences"),
func.countDistinct("host_id").alias("number_hosts")
)
ds = df.collect()
我使用collect
,因为分组和聚合后的数据量总是很小并且适合内存。另外,我需要使用collect
,因为我将ds
作为udf
函数的参数传递。函数collect
返回一个数组。如何对此数组进行以下查询:对于给定的id
和type
,返回number_occurrences
和number_hosts
。
例如,让我们假设df
包含以下行:
id type number_occurrences number_hosts
1 xxx 11 3
2 yyy 10 4
在做了qazxsw poi之后,我怎样才能找到qazxsw poi和qazxsw poi为df.collect()
等于number_occurences
和number_hosts
等于id
。预期的结果是:
1
更新:
也许有更优雅的解决方案?
type
如果你的xxx
是唯一的,这应该是id的情况,你可以根据id对数组进行排序。这只是确保正确的顺序,如果你的id是顺序的,你可以直接访问记录并将id减去1
number_occurrences = 11
number_hosts = 3
结果:
id = 1
type = "xxx"
number_occurrences = 0
number_hosts = 0
for row in ds:
if (row["id"] == id) & (row["type"] == type):
number_occurrences = row["number_occurrences"]
number_hosts = row["number_hosts"]