在扩展点数据集和全球国家多边形之间执行大型空间连接的有效方法?

问题描述 投票:0回答:1

我想按国家/地区对 Overture Maps 全球建筑多边形 数据集进行分区,我目前正在尝试使用 Apache Sedona 通过在空间上将建筑多边形(超过 2B 行)连接到国家多边形来以分布式方式完成此操作(大约 200 行,但具有许多顶点的非常复杂的几何图形)为建筑物分配“国家/地区”列。我尝试将建筑物多边形映射到质心,这样我就可以进行更简单的点内多边形连接,而不是多边形内多边形连接。然而,我的作业运行时间相当长,我上次检查它是在 24 小时以上,然后终止了该作业以节省资源。

我当前的实现如下所示:

import org.apache.sedona.sql.utils.Adapter
import org.apache.sedona.core.spatialOperator.JoinQuery
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, expr, udf, element_at}
import org.apache.sedona.core.enums.{GridType, IndexType}

object OverturePartitioner {

  val NUM_PARTITIONS = 1000
  val OvertureReleasePath1 = "s3://overturemaps-us-west-2/release/2024-05-16-beta.0"

  val LatestOvertureReleasePath = "s3://overturemaps-us-west-2/release/2024-06-13-beta.0"
  val makeWKTPoint = udf((xmin: Float, ymin: Float, xmax: Float, ymax: Float) => {
    val x = (xmin + xmax) / 2
    val y = (ymin + ymax) / 2
    s"POINT ($x $y)"
  })

  def run(
    spark: SparkSession,
    outputPath: String,
    numPartitions: Int  
  ): Unit = {
    import spark.implicits._

    val countries = spark.read
      .parquet(s"$OvertureReleasePath1/theme=divisions")
      .where(
        col("type") === "division_area" and
          col("subtype") === "country"
      )
      .select(col("Country"), col("geometry"))

    val buildings = spark.read
      .parquet(s"$LatestOvertureReleasePath/theme=buildings/type=building")
      .select(
        col("id"),
        col("geometry"),
        col("bbox")
      )


    val countryWithGeom = countries.withColumn("geom", expr("ST_GeomFromWKB(geometry)"))
    val countrySpatial = Adapter.toSpatialRdd(countryWithGeom, "geom")
    val buildingsWithGeom = buildings
      .withColumn(
        "centroids",
        makeWKTPoint(
          $"bbox.xmin",
          $"bbox.ymin",
          $"bbox.xmax",
          $"bbox.ymax"
        )
      )
      .withColumn("geom", expr("ST_GeomFromWKT(centroids)"))
      .select(col("id"), col("geom"))

    val buildingsSpatial = Adapter.toSpatialRdd(buildingsWithGeom, "geom")

    buildingsSpatial.rawSpatialRDD.repartition(NUM_PARTITIONS)
    buildingsSpatial.analyze()
    buildingsSpatial.spatialPartitioning(GridType.KDBTREE, NUM_PARTITIONS)
    countrySpatial.spatialPartitioning(buildingsSpatial.getPartitioner)

    val buildOnSpatialPartitionedRDD = true
    buildingsSpatial.buildIndex(IndexType.RTREE, buildOnSpatialPartitionedRDD)

    val considerBoundaryIntersection = true
    val usingIndex = true
    val joined = JoinQuery.SpatialJoinQueryFlat(
      buildingsSpatial,
      countrySpatial,
      usingIndex,
      considerBoundaryIntersection
    )

    val buildingsJoined = joined.rdd
      .map {
        case (country, building) =>
          (
            building.getUserData.toString.split("\t")(0),
            country.getUserData.toString.split("\t")(0)
          )
      }
      .toDF("id", "country")

    buildings.join(buildingsJoined, "id").write.partitionBy("country").parquet(outputPath)

我可以添加任何其他优化来提高此查询的效率吗?我无法通过 Apache Sedona 找到任何进一步的文档来为此类作业提供任何额外的优化,也许它无法进一步优化,这是 Apache Sedona 的限制。除了 Apache Sedona 之外,还有其他可行的替代方案可以实现这种规模的空间分区吗?

apache-spark gis geospatial spatial apache-sedona
1个回答
0
投票

你喜欢塞多纳吗?如果没有,请考虑使用 Google BigQuery。此查询花费了 36 秒并在免费(沙盒)层中运行:

select d.names.primary, count(*) cnt 
from `bigquery-public-data.overture_maps.building` b
join `bigquery-public-data.overture_maps.division_area` d
on st_intersects(b.geometry, d.geometry)
where d.subtype = 'country'
group by d.names.primary
order by cnt desc

我已经打印了计数,但同样您可以保留完整的结果集。

1   India                    433323993
2   United States of America 154632372
3   Brasil                   115620105
4   Indonesia                110963126
5   Rossiya                   71895191
...
© www.soinside.com 2019 - 2024. All rights reserved.