向量化在另一个数组中搜索元素的 For 循环

问题描述 投票:0回答:1

我有两个像这样的数组:

Array 1 (locs_array) 是一个二维数组,包含我称之为 lon 和 lat 的值对,例如

array([[-122.463425,   47.195741],
       [-122.498139,   47.190166]])

Array 2 (parents_array) 也是一个二维数组,包含以下列 [id, centerlon, centerlat, upperlon, lowerlon, upperlat, lowerlat] 例如:

array([[   1.        , -122.463425  ,   47.195741  , -122.46331271,
        -122.46353729,   47.19585367,   47.19562833],
       [   2.        , -122.498149  ,   47.190275  , -122.49803671,
        -122.49826129,   47.19038767,   47.19016233]])

放一点上下文,第一个数组是经/纬度位置列表,而第二个数组包含网格系统的定义,该网格系统是通过存储经度和纬度的中心、上下坐标来实现的。

我基本上是在寻找最快的方法来找到各个位置属于(数组 1)的网格(数组 2)。

这就是我目前所做的:

这是我为给定的 lon/lat 对找到父级的函数:

def _findFirstParent(self, lon, lat, parents_array):
    parentID = -1
    a = np.where(parents_array[:,3] >= lon)[0] # indices with upperLon >= lon
    b = np.where(parents_array[:,4] <= lon)[0] # indices with lowerLon <= lon
    c = np.intersect1d(a, b) # lon matches

    d = np.where(parents_array[:,5] >= lat)[0] # indices with upperLat >= lat
    e = np.where(parents_array[:,6] <= lat)[0] # indices with lowerLat <= lat
    f = np.intersect1d(d, e) # lat matches

    g = np.intersect1d(c, f) # both match
    if len(g) > 0:
        parentID = g[0]
    return parentID

我使用以下代码调用它:

for i in range(len(locs_array)):
     each_lon = locs_array[i][0]
     each_lat = locs_array[i][1]
     parentID = findFirstParent(each_lon, each_lat, parents_array)

数组 1(位置数组)包含超过 1 亿条记录,但我认为如果需要我可以将其分解为更小的块。 数组 2(网格数组)包含超过一百万条记录。

我有什么选择可以让它更快?任何帮助将不胜感激。

python arrays numpy optimization search
1个回答
0
投票

向量化此类循环的一种方法是广播以强制 numpy 比较来自

loc_array
的任何条目与来自
parent_array
.

的任何条目

例如,如果您试图找到 A 的每个整数,B 的哪个整数是它的倍数,而不是迭代 A 的值,您可以

A=np.array([1,2,3,4,5,6,7,8,9,10])
B=np.array([11,12,13,14,15,16,17,18,19,20])

res = B[None,:]%A[:,None] == 0

这将返回一个二维数组,例如

res[i,j]
为真当且仅当
B[j]
A[i]
的倍数。然后
np.argmax(B[None,:]%A[:,None] == 0, axis=1)
给出答案(假设总有一个,就是这里的情况)

所以,在你的情况下

(locs_array[None,:,0]<=parents_array[:,None,3]) & (locs_array[None,:,0]>=parents_array[:,None,4]) & (locs_array[None,:,1]<=parents_array[:,None,5]) & (locs_array[None,:,1]>=parents_array[:,None,6])

是一个二维数组,其索引

[i,j]
的值为真当且仅当
locs_array[j]
parents_array[i]

的边界内

所以

idx = np.argmax((locs_array[None,:,0]<=parents_array[:,None,3]) & (locs_array[None,:,0]>=parents_array[:,None,4]) & (locs_array[None,:,1]<=parents_array[:,None,5]) & (locs_array[None,:,1]>=parents_array[:,None,6]), axis=0)

len(locs_array)
整数的数组,每个整数都是
parents_array
中每个位置所在的条目的索引。 换句话说,
idx[j]=i
意味着
locs_array[j]
parents_array[i]
的边界内。我相信,这就是你想要的。

两个警告

  • 假设总是有答案。如果
    j
    的条目
    loc_array
    parents_array
    的任何条目没有边界,那么
    idx[j]
    将为 0。因此您可能需要仔细检查结果是否为 0(因为 0 可能意味着两者该位置包含在
    parents_array[0]
    中或它不包含在 none 中)。或者,更好的解决方案是设置一个哨兵:让
    parents_array[0]
    成为不可能的条目(例如,上限小于下限,因此其中不能包含任何内容)。那么,你知道如果index为0,就代表无解。
  • 那只是你算法的向量化。我很确定它比您的实施快得多。因为你在 python 中迭代(使用你的 for 循环),而我在 numpy 中迭代(让广播完成所有组合)。但这仍然是完全相同的计算:将所有
    locs_array[j]
    与所有
    parents_array[i]
    进行比较。只是 numpy 为我们处理了 for 循环。因此,同样,当然更快(无法对其进行基准测试,因为您没有提供 [mre]),但仍然是相同的算法。评论中建议的是另一种(更快,具有不同的 O 复杂度)算法,它可以受益于
    parents_array
    已排序的知识。这允许,例如二分法搜索。但是向量化并不是那么容易。
© www.soinside.com 2019 - 2024. All rights reserved.