序列數(shù)據(jù)處理學(xué)習(xí)(一)利用極值方法檢測(cè)峰值學(xué)習(xí)
最近有個(gè)時(shí)間序列異常值處理的任務(wù),找一些源碼學(xué)習(xí)一下思路:
一、Marcos Duarte編寫(xiě)的峰值查找算法
算法思路:
1.通過(guò)極值前后一階導(dǎo)符號(hào)相反的條件,提取所有峰值的坐標(biāo):?
?????ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0] ?
# np.hstack((0, dx)) 在dx前面插入0,通過(guò)與運(yùn)算就可以判斷先dx>0再dx<0 的位置為峰值點(diǎn)。
2.同時(shí)對(duì)于平頂峰,可以選擇檢測(cè)上下沿
ine = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0]??
#先dx>0再dx<0或=0?的位置為峰值
通過(guò)極值點(diǎn)獲得峰值坐標(biāo)后就是一系列的功能性篩選:
3.NaN和NaN附近的值不能為峰值
ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan - 1, indnan + 1))), invert=True)]
#測(cè)試峰值坐標(biāo)是否在空數(shù)值的附近。5in1d:測(cè)試一維數(shù)組的每個(gè)元素是否也存在于第二個(gè)數(shù)組中,返回一個(gè)長(zhǎng)度相同布爾數(shù)組,
4.去除處于首位和末尾的峰值點(diǎn)
? ? if ind.size and ind[0] == 0:
? ? ? ? ind = ind[1:] ? #去除在首位的峰值
? ? if ind.size and ind[-1] == x.size - 1:
? ? ? ? ind = ind[:-1] #去除在末位的峰值
5.去除 高度 小于閾值mph的峰值
? ? ? ind = ind[x[ind] >= mph] ?#去除 高度 小于閾值mph的峰值
6.剔除前后突變小于閾值的峰值
dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0)
ind = np.delete(ind, np.where(dx < threshold)[0])
#計(jì)算峰值點(diǎn)前后的差值,上下組合在一起在取最小值
#np.vstack按垂直方向(行順序)堆疊數(shù)組構(gòu)成一個(gè)新的數(shù)組,np.min取兩側(cè)最小的突變,加入axis參數(shù),當(dāng)axis=0時(shí)會(huì)分別取每一列的最大值或最小值,axis=1時(shí),會(huì)分別取每一行的最大值或最小值,且將所有取到的數(shù)據(jù)放在一個(gè)一維數(shù)組中。
7.剔除峰值間距離過(guò)近的峰值
idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd)
#搜索在當(dāng)前峰,前后mpd區(qū)間里的其他峰,若有則給這些峰在idel里賦值1
算法風(fēng)格:
這個(gè)算法的所有操作基本在峰值的索引序列上進(jìn)行
先用邏輯語(yǔ)句獲得布爾序列,再通過(guò)np.where函數(shù)取得索引序列
后面的功能判斷,利用索引僅在極值點(diǎn)附近進(jìn)行,簡(jiǎn)化了運(yùn)算。
相關(guān)函數(shù):
np.where:根據(jù)判斷語(yǔ)句,提取索引
np.hstack,np.vstack :合并組合序列的方法
np.unique:合并后去除重復(fù)的函數(shù)
np.in1d:找數(shù)組B中元素,是否在數(shù)組A中存在,并標(biāo)記位置
np.min(A, axis=0):求列最小
np.delete:判斷語(yǔ)句配合np.where提取索引,精準(zhǔn)刪除
(np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0) :判斷極值點(diǎn)的思路
#Marcos Duarte編寫(xiě)的峰值查找算法,功能比較強(qiáng)大,篩選參數(shù)比較多。算法源碼如下:
def Marcos_detect_peaks(x, mph=None, mpd=1, threshold=0, edge='both',
? ? ? ? ? ? ? ? ?kpsh=False, valley=False, show=False, ax=None):
? ? """Detect peaks in data based on their amplitude and other features.
? ? Parameters
? ? ----------
? ? x : 1D array_like
? ? ? ? data.
? ? mph : {None, number}, optional (default = None) #峰最小的高度
? ? ? ? detect peaks that are greater than minimum peak height.
? ? mpd : positive integer, optional (default = 1) #峰最小的寬度
? ? ? ? detect peaks that are at least separated by minimum peak distance (in
? ? ? ? number of data).
? ? threshold : positive number, optional (default = 0)?
? ? ? ? detect peaks (valleys) that are greater (smaller) than `threshold`
? ? ? ? in relation to their immediate neighbors.
? ? edge : {None, 'rising', 'falling', 'both'}, optional (default = 'rising') #平頂峰檢測(cè)
? ? ? ? for a flat peak, keep only the rising edge ('rising'), only the
? ? ? ? falling edge ('falling'), both edges ('both'), or don't detect a
? ? ? ? flat peak (None).
? ? kpsh : bool, optional (default = False) #
? ? ? ? keep peaks with same height even if they are closer than `mpd`.
? ? valley : bool, optional (default = False)? #谷值檢測(cè)
? ? ? ? if True (1), detect valleys (local minima) instead of peaks.
? ? show : bool, optional (default = False)
? ? ? ? if True (1), plot data in matplotlib figure.
? ? ax : a matplotlib.axes.Axes instance, optional (default = None).
? ? Returns
? ? -------
? ? ind : 1D array_like
? ? ? ? indeces of the peaks in `x`.
? ? Notes
? ? -----
? ? The detection of valleys instead of peaks is performed internally by simply
? ? negating the data: `ind_valleys = detect_peaks(-x)`
? ? The function can handle NaN's
? ? See this IPython Notebook [1]_.
? ? References
? ? ----------
? ? "Marcos Duarte, https://github.com/demotu/BMC"
? ? [1] http://nbviewer.ipython.org/github/demotu/BMC/blob/master/notebooks/DetectPeaks.ipynb
? ? """
? ? x = np.atleast_1d(x).astype('float64') #一維數(shù)組,轉(zhuǎn)換為浮點(diǎn)型
? ? if x.size < 3:
? ? ? ? return np.array([], dtype=int)? #輸入數(shù)組長(zhǎng)度小于3,返回空
? ? if valley:
? ? ? ? x = -x? #檢測(cè)谷值,將數(shù)組沿x軸鏡像
? ? # find indexes of all peaks
? ? dx = x[1:] - x[:-1]?
????## x[1:]截取索引1(第二個(gè)值)到所有, x[:-1]截取除最后一個(gè)的全部,相減為梯度
? ? # handle NaN's
? ? indnan = np.where(np.isnan(x))[0]? ?
????#通過(guò)isnan函數(shù)判斷是否為空值,通過(guò)where獲得nan所在索引
? ? if indnan.size:
? ? ? ? x[indnan] = np.inf? #np.inf 表示+∞,是沒(méi)有確切的數(shù)值的,類(lèi)型為浮點(diǎn)型
? ? ? ? dx[np.where(np.isnan(dx))[0]] = np.inf? #若為無(wú)窮,對(duì)應(yīng)位置梯度為無(wú)窮
? ? ##########定位峰值點(diǎn)##########
? ? ine, ire, ife = np.array([[], [], []], dtype=int)? ?#創(chuàng)建空數(shù)組,ine, ire, ife = 平頂峰檢測(cè)
? ? if not edge:? #不進(jìn)行平頂峰檢測(cè)
? ? ? ? ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0]??
????????# np.hstack((0, dx)) 在dx前面插入0,通過(guò)與運(yùn)算就可以判斷先dx>0在dx小于0 的位置為峰值點(diǎn)。
? ? else:
? ? ? ? if edge.lower() in ['rising', 'both']:
? ? ? ? ? ? ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0]? #上升沿后可以接下降沿也可以接平頂,返回?cái)?shù)組坐標(biāo)
? ? ? ? if edge.lower() in ['falling', 'both']:
? ? ? ? ? ? ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0]? ? ?##平頂或者上升沿可以接下降沿,返回?cái)?shù)組坐標(biāo)
? ? #返回峰值點(diǎn)坐標(biāo)
? ? ind = np.unique(np.hstack((ine, ire, ife)))?
????# 當(dāng)同時(shí)檢測(cè)上下沿時(shí),去除其中重復(fù)的坐標(biāo) ,并按坐標(biāo)由小到大返回一個(gè)新的無(wú)元素重復(fù)的元組
? ? # handle NaN's
? ? if ind.size and indnan.size:
? ? ? ? # NaN's and values close to NaN's cannot be peaks,NaN和NaN附近的值不能為峰值
? ? ? ? ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan - 1, indnan + 1))), invert=True)]?
????????#測(cè)試峰值坐標(biāo)是否在空數(shù)值的附近。5in1d:測(cè)試一維數(shù)組的每個(gè)元素是否也存在于第二個(gè)數(shù)組中,返回一個(gè)長(zhǎng)度相同布爾數(shù)組,
? ? #first and last values of x cannot be peaks? #去除第一個(gè)和最后一個(gè)峰值點(diǎn)
? ? if ind.size and ind[0] == 0:
? ? ? ? ind = ind[1:]? ?#去除在首位的峰值
? ? if ind.size and ind[-1] == x.size - 1:
? ? ? ? ind = ind[:-1] #去除在末位的峰值
? ? # remove peaks < minimum peak height #
? ? if ind.size and mph is not None:
? ? ? ? ind = ind[x[ind] >= mph]? #去除 高度 小于閾值mph的峰值
? ? # remove peaks - neighbors < threshold
? ? if ind.size and threshold > 0:? ?
? ? ? ? dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0)?
????????#np.vstack按垂直方向(行順序)堆疊數(shù)組構(gòu)成一個(gè)新的數(shù)組,np.min取兩側(cè)最小的突變,加入axis參數(shù),當(dāng)axis=0時(shí)會(huì)分別取每一列的最大值或最小值,axis=1時(shí),會(huì)分別取每一行的最大值或最小值,且將所有取到的數(shù)據(jù)放在一個(gè)一維數(shù)組中。
? ? ? ? ind = np.delete(ind, np.where(dx < threshold)[0])? ?#剔除突變小于閾值的峰值
? ??
? ? # detect small peaks closer than minimum peak distance #檢測(cè)小于最小峰值距離的小峰值
? ? if ind.size and mpd > 1:
? ? ? ? ind = ind[np.argsort(x[ind])][::-1]? #np.argsort:將a中的元素從小到大排列,提取其在排列前對(duì)應(yīng)的index(索引)輸出。[::-1]:取從后向前(相反)的元素,b = a[i:j:s]這種格式呢,i,j與上面的一樣,但s表示步進(jìn),缺省為1.
? ? ? ? idel = np.zeros(ind.size, dtype=bool) #zeros 在bool布爾里為0,idel數(shù)組只能true,false
? ? ? ? for i in range(ind.size):
? ? ? ? ? ? if not idel[i]:? ? # if ((not idel[i]) == ture) :? ? , not idel[i]是取反運(yùn)算
? ? ? ? ? ? ? ? # keep peaks with the same height if kpsh is True? ,僅當(dāng)峰間隔相同時(shí)保留峰,周期性峰值,下面'\'是換行標(biāo)記
? ? ? ? ? ? ? ? #(ind >= ind[i] - mpd) & (ind <= ind[i] + mpd),搜索在當(dāng)前峰,前后mpd區(qū)間里的其他峰,若有則給這些峰在idel里賦值1
? ? ? ? ? ? ? ? idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \
? ? ? ? ? ? ? ? ? ? ? ?& (x[ind[i]] > x[ind] if kpsh else True)
? ? ? ? ? ? ? ? idel[i] = 0? # Keep current peak
? ? ? ? # remove the small peaks and sort back the indexes by their occurrence
? ? ? ? ind = np.sort(ind[~idel])? #把小峰標(biāo)記的1取反,在ind中刪去這些峰值,再次排序
? ? return ind
二、Janko Slavic?方法
和上一個(gè)方法差不多,基本就是換了一種求極值的寫(xiě)法,相比之前的復(fù)雜一點(diǎn)點(diǎn)。
#Janko Slavic 方法
def findpeaks(data, spacing=1, limit=None):
? ? """Finds peaks in `data` which are of `spacing` width and >=`limit`.
? ? :param data: values
? ? :param spacing: minimum spacing to the next peak (should be 1 or more),到下一個(gè)峰值的最小間距(應(yīng)該是1或更多)
? ? :param limit: peaks should have value greater or equal ,高度閾值
? ? :return:
? ? """
? ? len = data.size
? ? x = np.zeros(len + 2 * spacing) #增加兩個(gè)空位
? ? x[:spacing] = data[0] - 1.e-6 ?#0:1首位賦值
? ? x[-spacing:] = data[-1] - 1.e-6 #-1:-1,末尾賦值
? ? x[spacing:spacing + len] = data #中間的為原數(shù)據(jù),相當(dāng)于給首末位復(fù)制一個(gè)數(shù)
? ? peak_candidate = np.zeros(len) ?#候選數(shù)組
? ? peak_candidate[:] = True #布爾
? ? for s in range(spacing):
? ? ? ? start = spacing - s - 1
? ? ? ? h_b = x[start: start + len] ?
????????# before,起始位0,b會(huì)有兩個(gè)第一位,最后一位是倒數(shù)第二位
? ? ? ? start = spacing
? ? ? ? h_c = x[start: start + len] ?# central,起始位1,等于原數(shù)據(jù)
? ? ? ? start = spacing + s + 1
? ? ? ? h_a = x[start: start + len] ?# after,起始位2,b會(huì)有兩個(gè)末位,第一位是第二位
? ? ? ? peak_candidate = np.logical_and(peak_candidate, np.logical_and(h_c > h_b, h_c > h_a)) ?#同樣是通過(guò)極值前后一階導(dǎo)符號(hào)相反的條件,提取所有峰值的坐標(biāo)
? ? ind =?np.argwhere(peak_candidate)?
????#np.argwhere與where的功能基本一樣,但是返回值的結(jié)構(gòu)更簡(jiǎn)單
? ? ind = ind.reshape(ind.size) #整形成數(shù)組
? ? if limit is not None:
? ? ? ? ind = ind[data[ind] > limit] #高度閾值
? ? return ind
三、Tony Beltramelli方法
相當(dāng)于先做了個(gè)歸一化再求極值
相關(guān)函數(shù)
np.roll(ratios, 1) ,這個(gè)滾動(dòng)函數(shù)挺有意思。
通過(guò)邏輯判斷的布爾序列求索引,可以實(shí)現(xiàn)np.argwhere的功能
for?i?in?range(0, len(peaks)):
if?peaks[i]:
? ? ? peak_indexes.append(i)
return?peak_indexes
def detect_peaks(signal, threshold=0.5):
? ? """ Performs peak detection on three steps: root mean square, peak to
? ? average ratios and first order logic.
? ? threshold used to discard peaks too small """
? ? # compute root mean square
? ? root_mean_square = sqrt(np.sum(np.square(signal) / len(signal)))#均方根:平方,均值,開(kāi)方
? ? # compute peak to average ratios
? ? ratios = np.array([pow(x / root_mean_square, 2) for x in signal]) #計(jì)算與均方根的比值
? ? # apply first order logic
? ? peaks = (ratios > np.roll(ratios, 1)) & (ratios > np.roll(ratios, -1)) & (ratios > threshold)
????# np.roll:意思是將a,沿著axis的方向,滾動(dòng)shift長(zhǎng)度。np.roll(ratios, 1)相當(dāng)于在數(shù)組首位加了最后的一位,其他后延
? ? # optional: return peak indices
? ? peak_indexes = []
? ? for i in range(0, len(peaks)):
? ? ? ? if peaks[i]:
? ? ? ? ? ? peak_indexes.append(i)
? ? return peak_indexes
四、總結(jié)

1.極值方法求峰值的方法就先看到這里,基本就是利用極值定義判斷得到布爾序列,在轉(zhuǎn)換成索引。
2.其次不要瞎歸一化,注意需要強(qiáng)調(diào)的點(diǎn)針對(duì)性地選擇歸一化方法
3.前后序列補(bǔ)的值也要考慮好,最好用標(biāo)準(zhǔn)的正常值,也不要隨便用0、復(fù)制等辦法