使用循环(周期性)边界条件计算 numpy 数组中点之间距离的更快代码 (2024)

我知道如何使用计算数组中点之间的欧几里得距离
scipy.spatial.distance.cdist
类似于这个问题的答案:
Calculate Distances Between One Point in Matrix From All Other Points
但是,我想在假设循环边界条件的情况下进行计算,例如因此,在这种情况下,点 [0,0] 与点 [0,n-1] 的距离为 1,而不是 n-1 的距离。爱掏网 - it200.com (然后,我将为目标单元格阈值距离内的所有点制作一个蒙版,但这不是问题的核心)。爱掏网 - it200.com
我能想到的唯一方法是重复计算 9 次,域索引在 x、y 和 x&y 方向上添加/减去 n,然后堆叠结果并在 9 个切片中找到最小值。爱掏网 - it200.com 为了说明需要 9 次重复,我将一个简单的示意图放在一起,其中只有 1 个 J 点,用圆圈标记,并显示了一个示例,在这种情况下,三角形标记的单元格在域中的最近邻居反射(reflect)为左上角。爱掏网 - it200.com
使用循环(周期性)边界条件计算 numpy 数组中点之间距离的更快代码 (1)
这是我使用 cdist 为此开发的代码:

import numpy as npfrom scipy import spatial n=5 # size of 2D box (n X n points)np.random.seed(1) # to make reproduciblea=np.random.uniform(size=(n,n)) i=np.argwhere(a>-1) # all points, for each loc we want distance to nearest J j=np.argwhere(a>0.85) # set of J locations to find distance to.# this will be used in the KDtree soln global maxdistmaxdist=2.0def dist_v1(i,j): dist=[] # 3x3 search required for periodic boundaries. for xoff in [-n,0,n]: for yoff in [-n,0,n]: jo=j.copy() jo[:,0]-=xoff jo[:,1]-=yoff dist.append(np.amin(spatial.distance.cdist(i,jo,metric='euclidean'),1)) dist=np.amin(np.stack(dist),0).reshape([n,n]) return(dist)

这有效,并产生例如:

print(dist_v1(i,j))[[1.41421356 1. 1.41421356 1.41421356 1. ] [2.23606798 2. 1.41421356 1. 1.41421356] [2. 2. 1. 0. 1. ] [1.41421356 1. 1.41421356 1. 1. ] [1. 0. 1. 1. 0. ]]

零显然标记了 J 点,距离是正确的(这个编辑纠正了我之前不正确的尝试)。爱掏网 - it200.com
请注意,如果您更改最后两行以堆叠原始距离,然后仅使用一个最小值,如下所示:

def dist_v2(i,j): dist=[] # 3x3 search required for periodic boundaries. for xoff in [-n,0,n]: for yoff in [-n,0,n]: jo=j.copy() jo[:,0]-=xoff jo[:,1]-=yoff dist.append(spatial.distance.cdist(i,jo,metric='euclidean')) dist=np.amin(np.dstack(dist),(1,2)).reshape([n,n]) return(dist)

对于小 n (10) 速度要慢得多
...但无论哪种方式,对于我的大型数组(N = 500 和 J 点数约为 70),它是 ,此搜索占用了大约 99% 的计算时间,(并且使用它也有点难看循环) - 有更好/更快的方法吗?
我想到的其他选择是:

  • scipy.spatial.KDTree.query_ball_point
  • 通过进一步搜索,我发现有一个功能
    scipy.spatial.KDTree.query_ball_point 直接计算我的 J 点半径内的坐标,但它似乎没有任何使用周期性边界的设施,所以我认为仍然需要以某种方式使用 3x3 循环,堆叠然后使用 amin 作为我在上面做,所以我不确定这是否会更快。爱掏网 - it200.com
    我使用这个函数编写了一个解决方案,而不用担心周期性边界条件(即这不能回答我的问题)

    def dist_v3(n,j): x, y = np.mgrid[0:n, 0:n] points = np.c_[x.ravel(), y.ravel()] tree=spatial.KDTree(points) mask=np.zeros([n,n]) for results in tree.query_ball_point((j), maxdist): mask[points[results][:,0],points[results][:,1]]=1 return(mask)

    也许我没有以最有效的方式使用它,但是即使没有周期性边界,这已经和我基于 cdist 的解决方案一样慢。爱掏网 - it200.com 在两个 cdist 解决方案中包括掩码函数,即在这些函数中用 return(dist) 替换 return(np.where(dist<=maxdist,1,0)),然后使用 timeit,我得到以下 n=100 的时序:

    from timeit import timeitprint("cdist v1:",timeit(lambda: dist_v1(i,j), number=3)*100)print("cdist v2:",timeit(lambda: dist_v2(i,j), number=3)*100)print("KDtree:", timeit(lambda: dist_v3(n,j), number=3)*100)cdist v1: 181.80927299981704cdist v2: 554.8205785999016KDtree: 605.119637199823
  • 为设置距离 [0,0] 内的点创建一个相对坐标数组,然后手动循环 J 点,使用此相对点列表设置掩码 - 这具有“相对距离”计算的优点只执行一次(我的 J 点每个时间步都会改变),但我怀疑循环会很慢。爱掏网 - it200.com
  • 为 2D 域中的每个点预先计算一组掩码,因此在模型集成的每个时间步长中,我只需选择 J 点的掩码并应用。爱掏网 - it200.com 这将使用大量内存(与 n^4 成比例)并且可能仍然很慢,因为您需要循环 J 个点以组合掩码。爱掏网 - it200.com
  • [编辑] - 我发现代码跟踪工作完成点的方式有误,用 mask_kernel 修复了它。爱掏网 - it200.com 较新代码的纯 python 版本慢约 1.5 倍,但 numba 版本稍快(由于一些其他优化)。爱掏网 - it200.com
    [当前最佳:~100x 到 120x 原始速度]
    首先,感谢您提交此问题,我在优化它时获得了很多乐趣!
    我目前的最佳解决方案依赖于网格是规则的并且“源”点(我们需要计算距离的点)大致均匀分布的假设。爱掏网 - it200.com
    这里的想法是,所有的距离都将是 1、sqrt(2)sqrt(3),...所以我们可以预先进行数值计算。爱掏网 - it200.com 然后我们简单地将这些值放在一个矩阵中,并在每个源点周围复制该矩阵(并确保保留在每个点找到的最小值)。爱掏网 - it200.com 这涵盖了绝大多数点(> 99%)。爱掏网 - it200.com 然后我们对剩下的 1% 应用另一种更“经典”的方法。爱掏网 - it200.com
    这是代码:

    import numpy as npdef sq_distance(x1, y1, x2, y2, n): # computes the pairwise squared distance between 2 sets of points (with periodicity) # x1, y1 : coordinates of the first set of points (source) # x2, y2 : same dx = np.abs((np.subtract.outer(x1, x2) + n//2)%(n) - n//2) dy = np.abs((np.subtract.outer(y1, y2) + n//2)%(n) - n//2) d = (dx*dx + dy*dy) return ddef apply_kernel(sources, sqdist, kern_size, n, mask): ker_i, ker_j = np.meshgrid(np.arange(-kern_size, kern_size+1), np.arange(-kern_size, kern_size+1), indexing="ij") kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2) mask_kernel = kernel > kern_size**2 for pi, pj in sources: ind_i = (pi+ker_i)%n ind_j = (pj+ker_j)%n sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j]) mask[ind_i,ind_j] *= mask_kerneldef dist_vf(sources, n, kernel_size): sources = np.asfortranarray(sources) #for memory contiguity kernel_size = min(kernel_size, n//2) kernel_size = max(kernel_size, 1) sqdist = np.full((n,n), 10*n**2, dtype=np.int32) #preallocate with a huge distance (>max**2) mask = np.ones((n,n), dtype=bool) #which points have not been reached? #main code apply_kernel(sources, sqdist, kernel_size, n, mask) #remaining points rem_i, rem_j = np.nonzero(mask) if len(rem_i) > 0: sq_d = sq_distance(sources[:,0], sources[:,1], rem_i, rem_j, n).min(axis=0) sqdist[rem_i, rem_j] = sq_d #eff = 1-rem_i.size/n**2 #print("covered by kernel :", 100*eff, "%") #print("overlap :", sources.shape[0]*(1+2*kernel_size)**2/n**2) #print() return np.sqrt(sqdist)

    测试这个版本

    n=500 # size of 2D box (n X n points)np.random.seed(1) # to make reproduciblea=np.random.uniform(size=(n,n)) all_points=np.argwhere(a>-1) # all points, for each loc we want distance to nearest J source_points=np.argwhere(a>1-70/n**2) # set of J locations to find distance to.## code for dist_v1 and dist_vf#overlap=5.2kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)print("cdist v1 :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")print("kernel version:", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")

    cdist v1 : 1148.6694 mskernel version: 69.21876999999998 ms

    这已经是大约 17 倍的加速!我还实现了 sq_distanceapply_kernel 的 numba 版本:[这是新的正确版本]

    @njit(cache=True)def sq_distance(x1, y1, x2, y2, n): m1 = x1.size m2 = x2.size n2 = n//2 d = np.empty((m1,m2), dtype=np.int32) for i in range(m1): for j in range(m2): dx = np.abs(x1[i] - x2[j] + n2)%n - n2 dy = np.abs(y1[i] - y2[j] + n2)%n - n2 d[i,j] = (dx*dx + dy*dy) return d@njit(cache=True)def apply_kernel(sources, sqdist, kern_size, n, mask): # creating the kernel kernel = np.empty((2*kern_size+1, 2*kern_size+1)) vals = np.arange(-kern_size, kern_size+1)**2 for i in range(2*kern_size+1): for j in range(2*kern_size+1): kernel[i,j] = vals[i] + vals[j] mask_kernel = kernel > kern_size**2 I = sources[:,0] J = sources[:,1] # applying the kernel for each point for l in range(sources.shape[0]): pi = I[l] pj = J[l] if pj - kern_size >= 0 and pj + kern_size<n: #if we are in the middle, no need to do the modulo for j for i in range(2*kern_size+1): ind_i = np.mod((pi+i-kern_size), n) for j in range(2*kern_size+1): ind_j = (pj+j-kern_size) sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j]) mask[ind_i,ind_j] = mask_kernel[i,j] and mask[ind_i,ind_j] else: for i in range(2*kern_size+1): ind_i = np.mod((pi+i-kern_size), n) for j in range(2*kern_size+1): ind_j = np.mod((pj+j-kern_size), n) sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j]) mask[ind_i,ind_j] = mask_kernel[i,j] and mask[ind_i,ind_j] return

    并测试

    overlap=5.2kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)print("cdist v1 :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")print("kernel numba (first run):", timeit(lambda: dist_vf(source_points, n, kernel_size), number=1)*1000, "ms") #first run = cimpilation = longprint("kernel numba :", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")

    这给出了以下结果

    cdist v1 : 1163.0742 mskernel numba (first run): 2060.0802 mskernel numba : 8.80377000000001 ms

    由于 JIT 编译,第一次运行很慢,但除此之外,它的改进是 120 倍!
    通过调整 kernel_size 参数(或 overlap ),可以从这个算法中获得更多的好处。爱掏网 - it200.com 目前选择的kernel_size只对少量源点有效。爱掏网 - it200.com 例如,这个选择在使用 source_points=np.argwhere(a>0.85) (13s) 时惨遭失败,而手动设置 kernel_size=5 在 22ms 内给出答案。爱掏网 - it200.com
    我希望我的帖子不要(不必要地)太复杂,我真的不知道如何更好地组织它。爱掏网 - it200.com
    [编辑 2] :
    我对代码的非 numba 部分给予了更多关注,并设法获得了非常显着的加速,非常接近 numba 可以实现的目标:这是函数 apply_kernel 的新版本:

    def apply_kernel(sources, sqdist, kern_size, n, mask): ker_i = np.arange(-kern_size, kern_size+1).reshape((2*kern_size+1,1)) ker_j = np.arange(-kern_size, kern_size+1).reshape((1,2*kern_size+1)) kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2) mask_kernel = kernel > kern_size**2 for pi, pj in sources: imin = pi-kern_size jmin = pj-kern_size imax = pi+kern_size+1 jmax = pj+kern_size+1 if imax < n and jmax < n and imin >=0 and jmin >=0: # we are inside sqdist[imin:imax,jmin:jmax] = np.minimum(kernel, sqdist[imin:imax,jmin:jmax]) mask[imin:imax,jmin:jmax] *= mask_kernel elif imax < n and imin >=0: ind_j = (pj+ker_j.ravel())%n sqdist[imin:imax,ind_j] = np.minimum(kernel, sqdist[imin:imax,ind_j]) mask[imin:imax,ind_j] *= mask_kernel elif jmax < n and jmin >=0: ind_i = (pi+ker_i.ravel())%n sqdist[ind_i,jmin:jmax] = np.minimum(kernel, sqdist[ind_i,jmin:jmax]) mask[ind_i,jmin:jmax] *= mask_kernel else : ind_i = (pi+ker_i)%n ind_j = (pj+ker_j)%n sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j]) mask[ind_i,ind_j] *= mask_kernel

    主要的优化是

  • 切片索引(而不是密集数组)
  • 使用稀疏索引(我之前怎么没想到)
  • 测试

    overlap=5.4kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)print("cdist v1 :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")print("kernel v2 :", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")

    cdist v1 : 1209.8163000000002 mskernel v2 : 11.319049999999997 ms

    这比 cdist 提高了 100 倍,比之前仅使用 numpy 的版本提高了约 5.5 倍,仅比我使用 numba 所能达到的速度慢约 25%。爱掏网 - it200.com

    使用循环(周期性)边界条件计算 numpy 数组中点之间距离的更快代码 (2024)

    References

    Top Articles
    Today's Final Jeopardy Clue
    Va Ems Portal Login
    Spasa Parish
    Rentals for rent in Maastricht
    159R Bus Schedule Pdf
    Sallisaw Bin Store
    Black Adam Showtimes Near Maya Cinemas Delano
    Espn Transfer Portal Basketball
    Pollen Levels Richmond
    11 Best Sites Like The Chive For Funny Pictures and Memes
    Things to do in Wichita Falls on weekends 12-15 September
    Craigslist Pets Huntsville Alabama
    Paulette Goddard | American Actress, Modern Times, Charlie Chaplin
    Red Dead Redemption 2 Legendary Fish Locations Guide (“A Fisher of Fish”)
    What's the Difference Between Halal and Haram Meat & Food?
    R/Skinwalker
    Rugged Gentleman Barber Shop Martinsburg Wv
    Jennifer Lenzini Leaving Ktiv
    Justified - Streams, Episodenguide und News zur Serie
    Epay. Medstarhealth.org
    Olde Kegg Bar & Grill Portage Menu
    Cubilabras
    Half Inning In Which The Home Team Bats Crossword
    Amazing Lash Bay Colony
    Juego Friv Poki
    Dirt Devil Ud70181 Parts Diagram
    Truist Bank Open Saturday
    Water Leaks in Your Car When It Rains? Common Causes & Fixes
    What’s Closing at Disney World? A Complete Guide
    New from Simply So Good - Cherry Apricot Slab Pie
    Drys Pharmacy
    Ohio State Football Wiki
    Find Words Containing Specific Letters | WordFinder®
    FirstLight Power to Acquire Leading Canadian Renewable Operator and Developer Hydromega Services Inc. - FirstLight
    Webmail.unt.edu
    2024-25 ITH Season Preview: USC Trojans
    Metro By T Mobile Sign In
    Restored Republic December 1 2022
    12 30 Pacific Time
    Free Stuff Craigslist Roanoke Va
    Wi Dept Of Regulation & Licensing
    Pick N Pull Near Me [Locator Map + Guide + FAQ]
    Crystal Westbrooks Nipple
    Ice Hockey Dboard
    Über 60 Prozent Rabatt auf E-Bikes: Aldi reduziert sämtliche Pedelecs stark im Preis - nur noch für kurze Zeit
    Wie blocke ich einen Bot aus Boardman/USA - sellerforum.de
    Infinity Pool Showtimes Near Maya Cinemas Bakersfield
    Dermpathdiagnostics Com Pay Invoice
    How To Use Price Chopper Points At Quiktrip
    Maria Butina Bikini
    Busted Newspaper Zapata Tx
    Latest Posts
    Article information

    Author: Saturnina Altenwerth DVM

    Last Updated:

    Views: 6127

    Rating: 4.3 / 5 (44 voted)

    Reviews: 83% of readers found this page helpful

    Author information

    Name: Saturnina Altenwerth DVM

    Birthday: 1992-08-21

    Address: Apt. 237 662 Haag Mills, East Verenaport, MO 57071-5493

    Phone: +331850833384

    Job: District Real-Estate Architect

    Hobby: Skateboarding, Taxidermy, Air sports, Painting, Knife making, Letterboxing, Inline skating

    Introduction: My name is Saturnina Altenwerth DVM, I am a witty, perfect, combative, beautiful, determined, fancy, determined person who loves writing and wants to share my knowledge and understanding with you.