批量处理以减少时间

问题描述 投票:0回答:1

我正在尝试构建一个自动获取纬度和经度的函数。我的文件非常大,有超过 75k 行,仅 1k 行的处理时间大约需要 24 分钟。我试图在我的代码中包含一些批处理,但我不是 python 专家,因此在线资源是我的最佳选择。我已经能够为此想出很好的代码,并且网上似乎没有太多关于批处理的信息。有谁知道如何使此代码处理速度更快?


from geopy.geocoders import Nominatim
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from geopy.exc import GeocoderTimedOut, GeocoderUnavailable

# Read the CSV file
df = spark.read.format("csv").option("header","true").load("Files/locations.csv")

# Limit to 1000 rows
df_limited = df.limit(1000)

# Define a function to get coordinates
def get_coordinates(address):
    geolocator = Nominatim(user_agent="address_geocoder")
    try:
        location = geolocator.geocode(address)
        if location:
            return (location.latitude, location.longitude)
        else:
            return (None, None)
    except (GeocoderTimedOut, GeocoderUnavailable):
        return (None, None)

# Create a UDF (User Defined Function)
coordinates_udf = udf(get_coordinates, StructType([
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True)
]))

# Apply the UDF to create new columns
df_with_coordinates = df_limited.withColumn("coordinates", coordinates_udf(col("ADDRESS")))

# Split the coordinates into separate latitude and longitude columns
df_final = df_with_coordinates.withColumn("latitude", col("coordinates.latitude")) \
                              .withColumn("longitude", col("coordinates.longitude")) \
                              .drop("coordinates")



# Display the final DataFrame
display(df_final)

希望有任何意见或建议!谢谢你:)

python pyspark bigdata
1个回答
0
投票

正如评论中提到的,您受到用于获取纬度和经度的服务的速率限制的限制(而不是您的程序那么慢)。

Nominatim 明确不鼓励使用他们的服务来执行批量任务

我建议您使用其他数据源。例如你可以

  • 按照 Nominatim 网站的建议,直接从 OSM Planet 获取数据
  • 使用 Openaddresses 具有在 OSM 数据中也使用的数据,但只有地址,这使得文件小很多
  • 或者,如果您确实需要使用 API,而不是从源下载数据,您可以使用 google 地图 api,但这是一个 付费 api。 (我认为您不会找到一个 API 可以让您免费且无速率限制地进行如此数量的地理编码……)
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.