我正在 Scala 和 Apache Spark 中开发一个项目,需要将坐标从东/北 (EPSG:27700) 转换为纬度/经度 (EPSG:4326)。我有一个 Python 脚本,它在内置库中使用
pyproj
(转换器)来处理此问题,但我还没有找到在 Scala/Spark 中执行此操作的等效方法。
这是我当前使用的Python代码:
from pyproj import Transformer
import pandas as pd
data = {
'node_id': ['94489', '94555', '94806', '99118', '76202'],
'easting': [276164, 428790, 357501, 439545, 357353],
'northing': [84185, 92790, 173246, 336877, 170708]
}
df = pd.DataFrame(data)
transformer = Transformer.from_crs("epsg:27700", "epsg:4326")
lat, lon = transformer.transform(df['easting'].values, df['northing'].values)
df['longitude'] = lon
df['latitude'] = lat
print(df)
输出 DataFrame 应如下所示:
节点id | 东进 | 北移 | 经度 | 纬度 |
---|---|---|---|---|
94489 | 276164 | 84185 | -3.752811 | 50.644154 |
94555 | 428790 | 92790 | -1.593413 | 50.734016 |
94806 | 357501 | 173246 | -2.613059 | 51.456587 |
99118 | 439545 | 336877 | -1.413188 | 52.927852 |
76202 | 357353 | 170708 | -2.614883 | 51.433757 |
@MartinHH 感谢您提供参考。看起来可以在 scala/spark 中创建相同的代码
package com.test.job.function_testing
import org.apache.spark.sql.SparkSession
import geotrellis.proj4.CRS
import geotrellis.proj4.Transform
object TestCode {
def main(args: Array[String]) = {
val runLocally = true
val jobName = "Test Spark Logging Case"
implicit val spark: SparkSession = Some(SparkSession.builder.appName(jobName))
.map(sparkSessionBuilder =>
if (runLocally) sparkSessionBuilder.master("local[2]") else sparkSessionBuilder
)
.map(_.getOrCreate())
.get
import spark.implicits._
val columns = Seq("node_id", "easting", "northing")
val data = Seq(
(94489, 276164, 84185),
(94555, 428790, 92790),
(94806, 357501, 173246),
(99118, 439545, 336877),
(76202, 357353, 170708))
val df = data.toDF(columns: _*)
val eastingNorthing = CRS.fromEpsgCode(27700)
val latLong = CRS.fromEpsgCode(4326)
val transform = Transform(eastingNorthing, latLong)
import org.apache.spark.sql.functions._
def transformlatlong = udf((easting: Int, northing: Int) => {
val (long, lat) = transform(easting, northing)
(long,lat)
}
)
val newdf = df.withColumn("latlong",transformlatlong(df("easting"),df("northing")))
newdf.select(col("node_id"),col("easting"),col("northing"),col("latlong._1").as("longitude"),col("latlong._2").as("latitude")).show()
}
}
这是输出数据框
+-------+-------+--------+-------------------+------------------+
|node_id|easting|northing| longitude| latitude|
+-------+-------+--------+-------------------+------------------+
| 94489| 276164| 84185| -3.752810925839862|50.644154475886154|
| 94555| 428790| 92790|-1.5934125598396651| 50.73401609723385|
| 94806| 357501| 173246|-2.6130593045676984| 51.45658738605824|
| 99118| 439545| 336877| -1.413187622652739| 52.92785156624134|
| 76202| 357353| 170708| -2.614882589162872| 51.43375699275326|
+-------+-------+--------+-------------------+------------------+
并将以下库添加到 build.sbt
libraryDependencies += "org.locationtech.geotrellis" %% "geotrellis-raster" % "3.5.2"