有没有一种方法可以将这种动态时间扭曲算法矢量化?

2024-09-30 03:24:34 发布

您现在位置:Python中文网/ 问答频道 /正文

动态时间扭曲算法提供了两个时间序列之间的距离概念,这两个时间序列的速度可能不同。如果我有N个序列相互比较,我可以通过两两应用算法构造一个NXN对称矩阵和一个nule对角。然而,对于长的二维序列,这是非常缓慢的。因此,我试图对代码进行矢量化,以加速矩阵计算。重要的是,我还想提取定义最佳对齐的索引

到目前为止,我的成对比较代码:

import math
import numpy as np

seq1 = np.random.randint(100, size=(100, 2)) #Two dim sequences
seq2 = np.random.randint(100, size=(100, 2))

def seqdist(seq1, seq2):                      # dynamic time warping function

    ns = len(seq1)
    nt = len(seq2)

    D = np.zeros((ns+1, nt+1))+math.inf
    D[0, 0] = 0
    cost = np.zeros((ns,nt))

    for i in range(ns):
       for j in range(nt): 

          cost[i,j] = np.linalg.norm(seq1[i,:]-seq2[j,:])
          D[i+1, j+1] = cost[i,j]+min([D[i, j+1], D[i+1, j], D[i, j]])

    d = D[ns,nt]                            # distance

    matchidx = [[ns-1, nt-1]]              # backwards optimal alignment computation 
    i = ns
    j = nt
    for k in range(ns+nt+2):
        idx = np.argmin([D[i-1, j], D[i, j-1], D[i-1, j-1]])
        if idx == 0 and i > 1 and j > 0:
           matchidx.append([i-2, j-1])
           i -= 1
        elif idx == 1 and i > 0 and j > 1:
             matchidx.append([i-1, j-2])
             j -= 1
        elif idx == 2 and i > 1 and j > 1:
             matchidx.append([i-2, j-2])
             i -= 1
             j -= 1
        else:
             break

    matchidx.reverse()

    return d, matchidx

[d,matchidx] = seqdist(seq1,seq2) #try it

Tags: andinfornp时间range序列ns
1条回答
网友
1楼 · 发布于 2024-09-30 03:24:34

这里是对代码的一次重写,使其更易于numba.jit。这并不是一个完全矢量化的解决方案,但是我看到这个基准测试的加速因子是230

from numba import jit
from scipy import spatial

@jit
def D_from_cost(cost, D):
  # operates on D inplace
  ns, nt = cost.shape
  for i in range(ns):
    for j in range(nt):
      D[i+1, j+1] = cost[i,j]+min(D[i, j+1], D[i+1, j], D[i, j])
      # avoiding the list creation inside mean enables better jit performance
      # D[i+1, j+1] = cost[i,j]+min([D[i, j+1], D[i+1, j], D[i, j]])

@jit
def get_d(D, matchidx):
  ns = D.shape[0] - 1
  nt = D.shape[1] - 1
  d = D[ns,nt]

  matchidx[0,0] = ns - 1
  matchidx[0,1] = nt - 1
  i = ns
  j = nt
  for k in range(1, ns+nt+3):
    idx = 0
    if not (D[i-1,j] <= D[i,j-1] and D[i-1,j] <= D[i-1,j-1]):
      if D[i,j-1] <= D[i-1,j-1]:
        idx = 1
      else:
        idx = 2

    if idx == 0 and i > 1 and j > 0:
      # matchidx.append([i-2, j-1])
      matchidx[k,0] = i - 2
      matchidx[k,1] = j - 1
      i -= 1
    elif idx == 1 and i > 0 and j > 1:
      # matchidx.append([i-1, j-2])
      matchidx[k,0] = i-1
      matchidx[k,1] = j-2
      j -= 1
    elif idx == 2 and i > 1 and j > 1:
      # matchidx.append([i-2, j-2])
      matchidx[k,0] = i-2
      matchidx[k,1] = j-2
      i -= 1
      j -= 1
    else:
      break

  return d, matchidx[:k]


def seqdist2(seq1, seq2):
  ns = len(seq1)
  nt = len(seq2)

  cost = spatial.distance_matrix(seq1, seq2)

  # initialize and update D
  D = np.full((ns+1, nt+1), np.inf)
  D[0, 0] = 0
  D_from_cost(cost, D)

  matchidx = np.zeros((ns+nt+2,2), dtype=np.int)
  d, matchidx = get_d(D, matchidx)
  return d, matchidx[::-1].tolist()

assert seqdist2(seq1, seq2) == seqdist(seq1, seq2)

%timeit seqdist2(seq1, seq2) # 1000 loops, best of 3: 365 µs per loop
%timeit seqdist(seq1, seq2)  # 10 loops, best of 3: 86.1 ms per loop

以下是一些变化:

  1. cost使用spatial.distance_matrix计算
  2. idx的定义被一堆丑陋的if语句所取代,这使得编译代码更快
  3. min([D[i, j+1], D[i+1, j], D[i, j]])替换为min(D[i, j+1], D[i+1, j], D[i, j]),也就是说,我们不取列表的min,而是取三个值的min。这导致了jit下的惊人加速
  4. matchidx作为numpy数组预先分配,并在输出之前被截断为正确的大小

相关问题 更多 >

    热门问题