我想按国家/地区对 Overture Maps 全球建筑多边形 数据集进行分区,我目前正在尝试使用 Apache Sedona 通过在空间上将建筑多边形(超过 2B 行)连接到国家多边形来以分布式方式完成此操作(大约 200 行,但具有许多顶点的非常复杂的几何图形)为建筑物分配“国家/地区”列。我尝试将建筑物多边形映射到质心,这样我就可以进行更简单的点内多边形连接,而不是多边形内多边形连接。然而,我的作业运行时间相当长,我上次检查它是在 24 小时以上,然后终止了该作业以节省资源。
我当前的实现如下所示:
import org.apache.sedona.sql.utils.Adapter
import org.apache.sedona.core.spatialOperator.JoinQuery
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, expr, udf, element_at}
import org.apache.sedona.core.enums.{GridType, IndexType}
object OverturePartitioner {
val NUM_PARTITIONS = 1000
val OvertureReleasePath1 = "s3://overturemaps-us-west-2/release/2024-05-16-beta.0"
val LatestOvertureReleasePath = "s3://overturemaps-us-west-2/release/2024-06-13-beta.0"
val makeWKTPoint = udf((xmin: Float, ymin: Float, xmax: Float, ymax: Float) => {
val x = (xmin + xmax) / 2
val y = (ymin + ymax) / 2
s"POINT ($x $y)"
})
def run(
spark: SparkSession,
outputPath: String,
numPartitions: Int
): Unit = {
import spark.implicits._
val countries = spark.read
.parquet(s"$OvertureReleasePath1/theme=divisions")
.where(
col("type") === "division_area" and
col("subtype") === "country"
)
.select(col("Country"), col("geometry"))
val buildings = spark.read
.parquet(s"$LatestOvertureReleasePath/theme=buildings/type=building")
.select(
col("id"),
col("geometry"),
col("bbox")
)
val countryWithGeom = countries.withColumn("geom", expr("ST_GeomFromWKB(geometry)"))
val countrySpatial = Adapter.toSpatialRdd(countryWithGeom, "geom")
val buildingsWithGeom = buildings
.withColumn(
"centroids",
makeWKTPoint(
$"bbox.xmin",
$"bbox.ymin",
$"bbox.xmax",
$"bbox.ymax"
)
)
.withColumn("geom", expr("ST_GeomFromWKT(centroids)"))
.select(col("id"), col("geom"))
val buildingsSpatial = Adapter.toSpatialRdd(buildingsWithGeom, "geom")
buildingsSpatial.rawSpatialRDD.repartition(NUM_PARTITIONS)
buildingsSpatial.analyze()
buildingsSpatial.spatialPartitioning(GridType.KDBTREE, NUM_PARTITIONS)
countrySpatial.spatialPartitioning(buildingsSpatial.getPartitioner)
val buildOnSpatialPartitionedRDD = true
buildingsSpatial.buildIndex(IndexType.RTREE, buildOnSpatialPartitionedRDD)
val considerBoundaryIntersection = true
val usingIndex = true
val joined = JoinQuery.SpatialJoinQueryFlat(
buildingsSpatial,
countrySpatial,
usingIndex,
considerBoundaryIntersection
)
val buildingsJoined = joined.rdd
.map {
case (country, building) =>
(
building.getUserData.toString.split("\t")(0),
country.getUserData.toString.split("\t")(0)
)
}
.toDF("id", "country")
buildings.join(buildingsJoined, "id").write.partitionBy("country").parquet(outputPath)
我可以添加任何其他优化来提高此查询的效率吗?我无法通过 Apache Sedona 找到任何进一步的文档来为此类作业提供任何额外的优化,也许它无法进一步优化,这是 Apache Sedona 的限制。除了 Apache Sedona 之外,还有其他可行的替代方案可以实现这种规模的空间分区吗?
你喜欢塞多纳吗?如果没有,请考虑使用 Google BigQuery。此查询花费了 36 秒并在免费(沙盒)层中运行:
select d.names.primary, count(*) cnt
from `bigquery-public-data.overture_maps.building` b
join `bigquery-public-data.overture_maps.division_area` d
on st_intersects(b.geometry, d.geometry)
where d.subtype = 'country'
group by d.names.primary
order by cnt desc
我已经打印了计数,但同样您可以保留完整的结果集。
1 India 433323993
2 United States of America 154632372
3 Brasil 115620105
4 Indonesia 110963126
5 Rossiya 71895191
...