我有两个像这样的数组:
Array 1 (locs_array) 是一个二维数组,包含我称之为 lon 和 lat 的值对,例如
array([[-122.463425, 47.195741],
[-122.498139, 47.190166]])
Array 2 (parents_array) 也是一个二维数组,包含以下列 [id, centerlon, centerlat, upperlon, lowerlon, upperlat, lowerlat] 例如:
array([[ 1. , -122.463425 , 47.195741 , -122.46331271,
-122.46353729, 47.19585367, 47.19562833],
[ 2. , -122.498149 , 47.190275 , -122.49803671,
-122.49826129, 47.19038767, 47.19016233]])
放一点上下文,第一个数组是经/纬度位置列表,而第二个数组包含网格系统的定义,该网格系统是通过存储经度和纬度的中心、上下坐标来实现的。
我基本上是在寻找最快的方法来找到各个位置属于(数组 1)的网格(数组 2)。
这就是我目前所做的:
这是我为给定的 lon/lat 对找到父级的函数:
def _findFirstParent(self, lon, lat, parents_array):
parentID = -1
a = np.where(parents_array[:,3] >= lon)[0] # indices with upperLon >= lon
b = np.where(parents_array[:,4] <= lon)[0] # indices with lowerLon <= lon
c = np.intersect1d(a, b) # lon matches
d = np.where(parents_array[:,5] >= lat)[0] # indices with upperLat >= lat
e = np.where(parents_array[:,6] <= lat)[0] # indices with lowerLat <= lat
f = np.intersect1d(d, e) # lat matches
g = np.intersect1d(c, f) # both match
if len(g) > 0:
parentID = g[0]
return parentID
我使用以下代码调用它:
for i in range(len(locs_array)):
each_lon = locs_array[i][0]
each_lat = locs_array[i][1]
parentID = findFirstParent(each_lon, each_lat, parents_array)
数组 1(位置数组)包含超过 1 亿条记录,但我认为如果需要我可以将其分解为更小的块。 数组 2(网格数组)包含超过一百万条记录。
我有什么选择可以让它更快?任何帮助将不胜感激。
向量化此类循环的一种方法是广播以强制 numpy 比较来自
loc_array
的任何条目与来自 parent_array
. 的任何条目
例如,如果您试图找到 A 的每个整数,B 的哪个整数是它的倍数,而不是迭代 A 的值,您可以
A=np.array([1,2,3,4,5,6,7,8,9,10])
B=np.array([11,12,13,14,15,16,17,18,19,20])
res = B[None,:]%A[:,None] == 0
这将返回一个二维数组,例如
res[i,j]
为真当且仅当 B[j]
是 A[i]
的倍数。然后np.argmax(B[None,:]%A[:,None] == 0, axis=1)
给出答案(假设总有一个,就是这里的情况)
所以,在你的情况下
(locs_array[None,:,0]<=parents_array[:,None,3]) & (locs_array[None,:,0]>=parents_array[:,None,4]) & (locs_array[None,:,1]<=parents_array[:,None,5]) & (locs_array[None,:,1]>=parents_array[:,None,6])
是一个二维数组,其索引
[i,j]
的值为真当且仅当 locs_array[j]
在 parents_array[i]
的边界内
所以
idx = np.argmax((locs_array[None,:,0]<=parents_array[:,None,3]) & (locs_array[None,:,0]>=parents_array[:,None,4]) & (locs_array[None,:,1]<=parents_array[:,None,5]) & (locs_array[None,:,1]>=parents_array[:,None,6]), axis=0)
是
len(locs_array)
整数的数组,每个整数都是 parents_array
中每个位置所在的条目的索引。
换句话说,idx[j]=i
意味着 locs_array[j]
在 parents_array[i]
的边界内。我相信,这就是你想要的。
两个警告
j
的条目 loc_array
与 parents_array
的任何条目没有边界,那么 idx[j]
将为 0。因此您可能需要仔细检查结果是否为 0(因为 0 可能意味着两者该位置包含在 parents_array[0]
中或它不包含在 none 中)。或者,更好的解决方案是设置一个哨兵:让 parents_array[0]
成为不可能的条目(例如,上限小于下限,因此其中不能包含任何内容)。那么,你知道如果index为0,就代表无解。locs_array[j]
与所有 parents_array[i]
进行比较。只是 numpy 为我们处理了 for 循环。因此,同样,当然更快(无法对其进行基准测试,因为您没有提供 [mre]),但仍然是相同的算法。评论中建议的是另一种(更快,具有不同的 O 复杂度)算法,它可以受益于 parents_array
已排序的知识。这允许,例如二分法搜索。但是向量化并不是那么容易。