我试图通过使用数据集来熟悉 Spark。数据集具有以下列:
[ "zipcode", "city", "loc", "pop", "state"}
它包含有关美国城市的信息(位置、人口......)。这是行的示例:
{ "zipcode" : "10009", "city" : "NEW YORK", "loc" : [ -73.979591, 40.726188 ], "pop" : 57426, "state" : "NY" }
我想做的是总结美国不同地区的人口。为了让我更接近这个目标,我正在考虑尝试将所有彼此相距一定距离的城市的人口相加。因此,如果两个城市的位置相差小于5公里,则将它们分组在同一行并将其人口相加。我怎么能做到这一点?
另外,我假设为了做到这一点,我必须将
"loc"
列转换为整数元组(或类似的东西),但我不知道该怎么做:
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.functions.col
object SimpleApp {
def analyzeUSPopulation(implicit spark: SparkSession, path: String): Unit = {
val file = "us_population.json"
val data = spark.read
.json(path+file)
.withColumn("pop", col("pop").cast("int"))
//.withColumn("loc", TURN INTO A TUPLE OF INTS)
.cache()
val result = SOMEHOW_GROUP_BY_REGION.show()
}
def main(args: Array[String]): Unit = {
implicit val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
implicit val path = "..."
analyzeUSPopulation
spark.stop()
}
}
谢谢!
我尝试过查看
groupBy
和 agg
,但由于我是 Spark 新手,我不知道如何实现我的目的。
我曾经在采访中被问过类似的问题,当时无法弄清楚,但我研究了一下。如果您能够理解其背后的数学原理,那么这很简单。公式为半正矢距离公式
如果您按照以下步骤操作,您就可以轻松做到这一点 1.) 将 loc 列从数组转换为 double 值的元组 2.) 对获得的纬度和经度使用半正矢公式 3.) 简单连接以对附近城市进行分组 4.) 5公里范围内城市的过滤条款
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DoubleType, IntegerType
from math import radians, cos, sin, sqrt, atan2
df = df.withColumn("latitude", F.col("loc")[1].cast(DoubleType())).withColumn("longitude", F.col("loc")[0].cast(DoubleType()))
def haversine(lat1, lon1, lat2, lon2):
R = 6371.0
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
return R * c
haversine_udf = F.udf(haversine, DoubleType())
joined_df = df.alias("df1").crossJoin(df.alias("df2")).filter(F.col("df1.zipcode") != F.col("df2.zipcode"))
distance_df = joined_df.withColumn(
"distance_km",
haversine_udf(
F.col("df1.latitude"), F.col("df1.longitude"),
F.col("df2.latitude"), F.col("df2.longitude")
)
)
close_cities_df = distance_df.filter(F.col("distance_km") < 5)
result_df = close_cities_df.groupBy("df1.city").agg(
F.sum("df1.pop").alias("total_population")
)
result_df.show()