programing

python + NumPy / SciPy를 사용하여 롤링/이동 평균을 계산하는 방법

javaba 2022. 9. 30. 10:42
반응형

python + NumPy / SciPy를 사용하여 롤링/이동 평균을 계산하는 방법

단순히 numpy/scipy의 이동 평균을 계산하는 함수는 없는 것 같고, 이로 인해 복잡한 해를 얻을 수 있습니다.

제 질문은 두 가지입니다.

  • numpy를 사용하여 이동 평균을 구현하는 가장 쉬운 방법은 무엇입니까?
  • 이것은 간단하지 않고 에러가 발생하기 쉽기 때문에, 이 케이스에 배터리를 포함하지 않는 것이 좋은 이유입니까?

이동평균만 에는 무가중 으로 쉽게 할 수 .np.cumsum ,어떤. 그럴지도 모른다 FFT 기반 방법보다 빠릅니다.

편집 코드에서 Bean이 발견한 잘못된 인덱스를 하나씩 수정했습니다.편집

def moving_average(a, n=3) :
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1:] / n

>>> a = np.arange(20)
>>> moving_average(a)
array([  1.,   2.,   3.,   4.,   5.,   6.,   7.,   8.,   9.,  10.,  11.,
        12.,  13.,  14.,  15.,  16.,  17.,  18.])
>>> moving_average(a, n=4)
array([  1.5,   2.5,   3.5,   4.5,   5.5,   6.5,   7.5,   8.5,   9.5,
        10.5,  11.5,  12.5,  13.5,  14.5,  15.5,  16.5,  17.5])

따라서 이 솔루션은 구현이 매우 용이하며, 특별한 기능으로 인해 numpy가 이미 약간 부풀려져 있을 수 있습니다.

이를 실현하는 간단한 방법은 를 사용하는 것입니다.이 배경의 아이디어는 이산 컨볼루션 계산 방법을 활용하여 롤링 평균을 반환하는 데 사용하는 것입니다.이것은 우리가 원하는 슬라이딩 윈도우 길이와 같은 길이의 시퀀스로 컨볼빙하여 수행할 수 있습니다.

이를 위해 다음 함수를 정의할 수 있습니다.

def moving_average(x, w):
    return np.convolve(x, np.ones(w), 'valid') / w

.x가 긴 입니다.w된 .는 에 해 주세요.modevalid따라서 컨볼루션 곱은 시퀀스가 완전히 겹치는 지점에 대해서만 주어집니다.


몇 가지 예:

x = np.array([5,3,8,10,2,1,5,1,0,2])

이 있는 2하다

moving_average(x, 2)
# array([4. , 5.5, 9. , 6. , 1.5, 3. , 3. , 0.5, 1. ])

그리고 긴 창문은4:

moving_average(x, 4)
# array([6.5 , 5.75, 5.25, 4.5 , 2.25, 1.75, 2.  ])

는 어떻게 ?convolve 일?

이제 이산 회전수가 계산되는 방식을 좀 더 자세히 살펴보겠습니다.은 '하다'라는 으로 합니다.np.convolve이치노

def mov_avg(x, w):
    for m in range(len(x)-(w-1)):
        yield sum(np.ones(w) * x[m:m+w]) / w 

위의 동일한 예에서 다음과 같은 결과를 얻을 수 있습니다.

list(mov_avg(x, 2))
# [4.0, 5.5, 9.0, 6.0, 1.5, 3.0, 3.0, 0.5, 1.0]

따라서 각 단계에서 수행되는 작업은 내부 제품을 1개의 배열과 현재 창 사이에 배치하는 것입니다.이 경우 에 의한 곱셈np.ones(w)가 한 것이다sum수열 중 하나입니다.

Bellow(벨로우)를 선택합니다.「 」, 「 」의 .w=4:

[1,1,1,1]
[5,3,8,10,2,1,5,1,0,2]
= (1*5 + 1*3 + 1*8 + 1*10) / w = 6.5

그리고 다음 출력은 다음과 같이 계산됩니다.

  [1,1,1,1]
[5,3,8,10,2,1,5,1,0,2]
= (1*3 + 1*8 + 1*10 + 1*2) / w = 5.75

마찬가지로 모든 오버랩이 수행된 시퀀스의 이동 평균을 반환합니다.

NumPy가 특정 도메인 고유의 기능이 없는 것은 핵심 팀이 NumPy의 주요 지침에 대한 규율과 충실도 때문일 것입니다. NumPy는 N차원 어레이 유형과 이러한 어레이를 만들고 인덱싱하는 기능을 제공합니다.많은 기본 목표와 마찬가지로 이 목표도 작지 않고 NumPy는 훌륭하게 해낼 수 있습니다.

(대부분) 큰 SciPy는 수치 최적화(최적화), 신호 처리(신호), 적분(적분)과 같은 훨씬 더 큰 도메인별 라이브러리(SciPy devs에 의한 하위 패키지) 컬렉션을 포함합니다.

당신이 찾고 있는 함수는 SciPy 서브패키지 중 하나(scipy.signal)에 있는 것 같습니다만, 우선 SciPy skickit 컬렉션을 찾아 관련 skickit을 특정하고, 거기에서 관심 있는 함수를 찾습니다.

Scikit은 NumPy/SciPy를 기반으로 독립적으로 개발된 패키지이며 특정 기술 분야(예: skikits-image, skikits-learn 등)로 유도됩니다.이들 중 몇 가지(특히 수치 최적화를 위한 훌륭한 OpenOpt)는 비교적 새로운 skickits rubric 아래에 상주하기 훨씬 전에 높은 평가를 받고 성숙한 프로젝트였습니다.Skikits 홈페이지에는 30여개의 그러한 skickits가 게재되어 있지만, 그 중 적어도 몇 개는 더 이상 개발되지 않고 있다.

이 어드바이스를 따르면 cikits-timeseries로 이어집니다.그러나 이 패키지는 더 이상 개발되지 않습니다.실제로 Panda는 사실상의 NumPy 기반의 시계열 라이브러리가 되었습니다.

판다에는 이동 평균을 계산하는 데 사용할 수 있는 함수가 몇 가지 있습니다. 그중 가장 간단한 함수는 다음과 같이 사용하는 rolling_mean일 것입니다.

>>> # the recommended syntax to import pandas
>>> import pandas as PD
>>> import numpy as NP

>>> # prepare some fake data:
>>> # the date-time indices:
>>> t = PD.date_range('1/1/2010', '12/31/2012', freq='D')

>>> # the data:
>>> x = NP.arange(0, t.shape[0])

>>> # combine the data & index into a Pandas 'Series' object
>>> D = PD.Series(x, t)

이제 Series 개체에서 rolling_mean passing 함수를 호출하고 창 크기를 호출합니다.이 예에서는 10일입니다.

>>> d_mva = PD.rolling_mean(D, 10)

>>> # d_mva is the same size as the original Series
>>> d_mva.shape
    (1096,)

>>> # though obviously the first w values are NaN where w is the window size
>>> d_mva[:3]
    2010-01-01         NaN
    2010-01-02         NaN
    2010-01-03         NaN

효과가 있는지 확인합니다. 예를 들어, 원래 시리즈의 10 - 15 값과 롤링 평균으로 평활된 새 시리즈의 값을 비교합니다.

>>> D[10:15]
     2010-01-11    2.041076
     2010-01-12    2.041076
     2010-01-13    2.720585
     2010-01-14    2.720585
     2010-01-15    3.656987
     Freq: D

>>> d_mva[10:20]
      2010-01-11    3.131125
      2010-01-12    3.035232
      2010-01-13    2.923144
      2010-01-14    2.811055
      2010-01-15    2.785824
      Freq: D

rolling_mean 함수는 약 12개의 다른 함수와 함께 판다 문서에서 루브릭 이동 창 함수로 비공식적으로 그룹화된다. 두 번째, 판다의 관련 함수 그룹을 지수 가중 함수(예: ewma, 지수 이동 가중 평균을 계산하는)라고 한다.이 두 번째 그룹이 첫 번째(이동 창 함수)에 포함되지 않는 것은 지수 가중 변환이 고정 길이 창에 의존하지 않기 때문일 수 있습니다.

이를 위한 다양한 방법과 몇 가지 벤치마크를 소개합니다.가장 좋은 방법이 버전 다른 도서관에서 최적화된 코드를 사용하는 것.가장 좋은 방법은 다른 라이브러리의 최적화된 코드를 사용하는 버전입니다.그 그bottleneck.move_mean방법 아마도 주변에 가장 좋다.방법은 아마 모든 면에서 가장 좋을 것입니다.그 그scipy.convolve접근은 또한 매우 빠르고 확장성, 그리고 구문과 개념적으로 간단하지만 잘 하는 것은 매우 큰 창문 값에 대해 아닙니다.또한 접근 방식은 매우 빠르고 확장 가능하며 구문 및 개념적으로 단순하지만 매우 큰 창 값에 맞게 확장되지 않습니다.그 그numpy.cumsum만약 순수 순수가필요한경우 방법이 좋습니다 필요한 방법 좋다.numpy접근.접근.

주의: 일부 (예:bottleneck.move_mean)며, 데이터를 이동할 것이다.)중심으로 두고 있지 않고 있다.가 중심에 있지 않고 데이터가 이동됩니다.

import numpy as np
import scipy as sci
import scipy.signal as sig
import pandas as pd
import bottleneck as bn
import time as time

def rollavg_direct(a,n): 
    'Direct "for" loop'
    assert n%2==1
    b = a*0.0
    for i in range(len(a)) :
        b[i]=a[max(i-n//2,0):min(i+n//2+1,len(a))].mean()
    return b

def rollavg_comprehension(a,n):
    'List comprehension'
    assert n%2==1
    r,N = int(n/2),len(a)
    return np.array([a[max(i-r,0):min(i+r+1,N)].mean() for i in range(N)]) 

def rollavg_convolve(a,n):
    'scipy.convolve'
    assert n%2==1
    return sci.convolve(a,np.ones(n,dtype='float')/n, 'same')[n//2:-n//2+1]  

def rollavg_convolve_edges(a,n):
    'scipy.convolve, edge handling'
    assert n%2==1
    return sci.convolve(a,np.ones(n,dtype='float'), 'same')/sci.convolve(np.ones(len(a)),np.ones(n), 'same')  

def rollavg_cumsum(a,n):
    'numpy.cumsum'
    assert n%2==1
    cumsum_vec = np.cumsum(np.insert(a, 0, 0)) 
    return (cumsum_vec[n:] - cumsum_vec[:-n]) / n

def rollavg_cumsum_edges(a,n):
    'numpy.cumsum, edge handling'
    assert n%2==1
    N = len(a)
    cumsum_vec = np.cumsum(np.insert(np.pad(a,(n-1,n-1),'constant'), 0, 0)) 
    d = np.hstack((np.arange(n//2+1,n),np.ones(N-n)*n,np.arange(n,n//2,-1)))  
    return (cumsum_vec[n+n//2:-n//2+1] - cumsum_vec[n//2:-n-n//2]) / d

def rollavg_roll(a,n):
    'Numpy array rolling'
    assert n%2==1
    N = len(a)
    rolling_idx = np.mod((N-1)*np.arange(n)[:,None] + np.arange(N), N)
    return a[rolling_idx].mean(axis=0)[n-1:] 

def rollavg_roll_edges(a,n):
    # see https://stackoverflow.com/questions/42101082/fast-numpy-roll
    'Numpy array rolling, edge handling'
    assert n%2==1
    a = np.pad(a,(0,n-1-n//2), 'constant')*np.ones(n)[:,None]
    m = a.shape[1]
    idx = np.mod((m-1)*np.arange(n)[:,None] + np.arange(m), m) # Rolling index
    out = a[np.arange(-n//2,n//2)[:,None], idx]
    d = np.hstack((np.arange(1,n),np.ones(m-2*n+1+n//2)*n,np.arange(n,n//2,-1)))
    return (out.sum(axis=0)/d)[n//2:]

def rollavg_pandas(a,n):
    'Pandas rolling average'
    return pd.DataFrame(a).rolling(n, center=True, min_periods=1).mean().to_numpy()

def rollavg_bottlneck(a,n):
    'bottleneck.move_mean'
    return bn.move_mean(a, window=n, min_count=1)

N = 10**6
a = np.random.rand(N)
functions = [rollavg_direct, rollavg_comprehension, rollavg_convolve, 
        rollavg_convolve_edges, rollavg_cumsum, rollavg_cumsum_edges, 
        rollavg_pandas, rollavg_bottlneck, rollavg_roll, rollavg_roll_edges]

print('Small window (n=3)')
%load_ext memory_profiler
for f in functions : 
    print('\n'+f.__doc__+ ' : ')
    %timeit b=f(a,3)

print('\nLarge window (n=1001)')
for f in functions[0:-2] : 
    print('\n'+f.__doc__+ ' : ')
    %timeit b=f(a,1001)

print('\nMemory\n')
print('Small window (n=3)')
N = 10**7
a = np.random.rand(N)
%load_ext memory_profiler
for f in functions[2:] : 
    print('\n'+f.__doc__+ ' : ')
    %memit b=f(a,3)

print('\nLarge window (n=1001)')
for f in functions[2:-2] : 
    print('\n'+f.__doc__+ ' : ')
    %memit b=f(a,1001)

타이밍, 작은 창(n=3)

Direct "for" loop : 

4.14 s ± 23.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

List comprehension : 
3.96 s ± 27.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

scipy.convolve : 
1.07 ms ± 26.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

scipy.convolve, edge handling : 
4.68 ms ± 9.69 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

numpy.cumsum : 
5.31 ms ± 5.11 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

numpy.cumsum, edge handling : 
8.52 ms ± 11.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Pandas rolling average : 
9.85 ms ± 9.63 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

bottleneck.move_mean : 
1.3 ms ± 12.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Numpy array rolling : 
31.3 ms ± 91.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

Numpy array rolling, edge handling : 
61.1 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

타이밍, 큰 창(n=1001)

Direct "for" loop : 
4.67 s ± 34 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

List comprehension : 
4.46 s ± 14.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

scipy.convolve : 
103 ms ± 165 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

scipy.convolve, edge handling : 
272 ms ± 1.23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

numpy.cumsum : 
5.19 ms ± 12.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

numpy.cumsum, edge handling : 
8.7 ms ± 11.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Pandas rolling average : 
9.67 ms ± 199 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

bottleneck.move_mean : 
1.31 ms ± 15.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

메모리, 작은 창(n=3)

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler

scipy.convolve : 
peak memory: 362.66 MiB, increment: 73.61 MiB

scipy.convolve, edge handling : 
peak memory: 510.24 MiB, increment: 221.19 MiB

numpy.cumsum : 
peak memory: 441.81 MiB, increment: 152.76 MiB

numpy.cumsum, edge handling : 
peak memory: 518.14 MiB, increment: 228.84 MiB

Pandas rolling average : 
peak memory: 449.34 MiB, increment: 160.02 MiB

bottleneck.move_mean : 
peak memory: 374.17 MiB, increment: 75.54 MiB

Numpy array rolling : 
peak memory: 661.29 MiB, increment: 362.65 MiB

Numpy array rolling, edge handling : 
peak memory: 1111.25 MiB, increment: 812.61 MiB

메모리, 큰 창(n=1001)

scipy.convolve : 
peak memory: 370.62 MiB, increment: 71.83 MiB

scipy.convolve, edge handling : 
peak memory: 521.98 MiB, increment: 223.18 MiB

numpy.cumsum : 
peak memory: 451.32 MiB, increment: 152.52 MiB

numpy.cumsum, edge handling : 
peak memory: 527.51 MiB, increment: 228.71 MiB

Pandas rolling average : 
peak memory: 451.25 MiB, increment: 152.50 MiB

bottleneck.move_mean : 
peak memory: 374.64 MiB, increment: 75.85 MiB

작작으로 Numpy 1.20는 요소의 창을 슬라이드/롤링하는 방법을 제공합니다.그런 다음 개별적으로 평균을 낼 수 있는 윈도우.

를 들어, 「」의 는,4- 창: - 창

from numpy.lib.stride_tricks import sliding_window_view

# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
np.average(sliding_window_view(values, window_shape = 4), axis=1)
# array([6.5, 5.75, 5.25, 4.5, 2.25, 1.75, 2])

해 주세요.sliding_window_view:

# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
sliding_window_view(values, window_shape = 4)
# array([[ 5,  3,  8, 10],
#        [ 3,  8, 10,  2],
#        [ 8, 10,  2,  1],
#        [10,  2,  1,  5],
#        [ 2,  1,  5,  1],
#        [ 1,  5,  1,  0],
#        [ 5,  1,  0,  2]])

한 이 입니다.rolling_mean 판다의

# the recommended syntax to import pandas
import pandas as pd
import numpy as np

# prepare some fake data:
# the date-time indices:
t = pd.date_range('1/1/2010', '12/31/2012', freq='D')

# the data:
x = np.arange(0, t.shape[0])

# combine the data & index into a Pandas 'Series' object
D = pd.Series(x, t)

를 호출해 주세요.rolling이치노열 번

d_mva10 = D.rolling(10).mean()

# d_mva is the same size as the original Series
# though obviously the first w values are NaN where w is the window size
d_mva10[:11]

2010-01-01    NaN
2010-01-02    NaN
2010-01-03    NaN
2010-01-04    NaN
2010-01-05    NaN
2010-01-06    NaN
2010-01-07    NaN
2010-01-08    NaN
2010-01-09    NaN
2010-01-10    4.5
2010-01-11    5.5
Freq: D, dtype: float64

병목현상을 이용하면 쉽게 해결할 수 있다고 생각한다.

아래 기본 샘플을 참조하십시오.

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=(5, 7))
mm = bn.move_mean(a, window=2, min_count=1)

그러면 각 축을 따라 이동 평균이 제공됩니다.

  • "mm"는 "a"의 이동 평균이다.

  • "filename"은 이동 평균에 대해 고려해야 할 항목의 최대 수입니다.

  • "min_count"는 이동 평균에 대해 고려해야 할 최소 항목 수입니다(예: 첫 번째 요소의 경우 또는 배열에 nan 값이 있는 경우).

좋은 점은 병목현상이 나노 값을 처리하는 데 도움이 되고 매우 효율적이라는 것입니다.

간단한 해결책이 필요한 경우에 한 가지 예를 들어보자

def moving_average(a,n):
    N=len(a)
    return np.array([np.mean(a[i:i+n]) for i in np.arange(0,N-n+1)])

하려면 창마다 스텝 합니다.np.arange(0,N-n+1,step)

가장자리 조건을 주의 깊게 관리하려는 경우(가장자리에 있는 사용 가능한 요소에서만 평균을 계산) 다음 함수가 이 기능을 수행합니다.

import numpy as np

def running_mean(x, N):
    out = np.zeros_like(x, dtype=np.float64)
    dim_len = x.shape[0]
    for i in range(dim_len):
        if N%2 == 0:
            a, b = i - (N-1)//2, i + (N-1)//2 + 2
        else:
            a, b = i - (N-1)//2, i + (N-1)//2 + 1

        #cap indices to min and max indices
        a = max(0, a)
        b = min(dim_len, b)
        out[i] = np.mean(x[a:b])
    return out

>>> running_mean(np.array([1,2,3,4]), 2)
array([1.5, 2.5, 3.5, 4. ])

>>> running_mean(np.array([1,2,3,4]), 3)
array([1.5, 2. , 3. , 3.5])

모든 해답은 미리 계산된 리스트의 경우에 초점을 맞춘 것 같습니다.번호가 1개씩 입력되는 실제 실행 유스케이스의 경우 마지막 N개의 값을 평균화하는 서비스를 제공하는 단순한 클래스를 다음에 나타냅니다.

import numpy as np
class RunningAverage():
    def __init__(self, stack_size):
        self.stack = [0 for _ in range(stack_size)]
        self.ptr = 0
        self.full_cycle = False
    def add(self,value):
        self.stack[self.ptr] = value
        self.ptr += 1
        if self.ptr == len(self.stack):
            self.full_cycle = True
            self.ptr = 0
    def get_avg(self):
        if self.full_cycle:
            return np.mean(self.stack)
        else:
            return np.mean(self.stack[:self.ptr])

사용방법:

N = 50  # size of the averaging window
run_avg = RunningAverage(N)
for i in range(1000):
    value = <my computation>
    run_avg.add(value)
    if i % 20 ==0: # print once in 20 iters:
        print(f'the average value is {run_avg.get_avg()}')

talib에는 단순한 이동 평균 도구와 유사한 다른 평균 도구(예: 지수 이동 평균)가 포함되어 있습니다.다음에서는 이 방법을 다른 솔루션과 비교합니다.


%timeit pd.Series(np.arange(100000)).rolling(3).mean()
2.53 ms ± 40.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

%timeit talib.SMA(real = np.arange(100000.), timeperiod = 3)
348 µs ± 3.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

%timeit moving_average(np.arange(100000))
638 µs ± 45.1 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

한 가지 주의할 점은 실제는 반드시 다음 요소를 가져야 한다는 것입니다.dtype = float 않으면 다음

예외: real은 이중이 아닙니다.

for i in range(len(Data)):
    Data[i, 1] = Data[i-lookback:i, 0].sum() / lookback

이 코드를 사용해 보세요.나는 그것이 더 간단하고 일을 할 수 있다고 생각한다.lookback은 이동 평균의 창입니다.

서서 Data[i-lookback:i, 0].sum() i i i i i i0데이터 집합의 첫 번째 열을 참조하지만 열이 두 개 이상인 경우 원하는 열을 넣을 수 있습니다.

입력과 출력 길이가 같도록 약간 수정한 승인된 답변의 솔루션을 사용하거나,pandas'은 .다음에 참조할 수 있도록 재현 가능한 예시와 함께 여기에 두 가지를 요약합니다.

import numpy as np
import pandas as pd

def moving_average(a, n):
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret / n

def moving_average_centered(a, n):
    return pd.Series(a).rolling(window=n, center=True).mean().to_numpy()

A = [0, 0, 1, 2, 4, 5, 4]
print(moving_average(A, 3))    
# [0.         0.         0.33333333 1.         2.33333333 3.66666667 4.33333333]
print(moving_average_centered(A, 3))
# [nan        0.33333333 1.         2.33333333 3.66666667 4.33333333 nan       ]

이동 평균

반복법

  • i에서 배열을 반전시키고, 단순히 평균을 i에서 n으로 취합니다.

  • 목록 이해 기능을 사용하여 즉시 미니 어레이를 생성할 수 있습니다.

x = np.random.randint(10, size=20)

def moving_average(arr, n):
    return [ (arr[:i+1][::-1][:n]).mean() for i, ele in enumerate(arr) ]
d = 5

moving_average(x, d)

텐서 회전

moving_average = np.convolve(x, np.ones(d)/d, mode='valid')

Python C 확장자를 직접 작성할 수도 있습니다.

가장 쉬운 , 더 수 .np.cumsum구성 요소로 사용됩니다.

// moving_average.c
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#include <Python.h>
#include <numpy/arrayobject.h>

static PyObject *moving_average(PyObject *self, PyObject *args) {
    PyObject *input;
    int64_t window_size;
    PyArg_ParseTuple(args, "Ol", &input, &window_size);
    if (PyErr_Occurred()) return NULL;
    if (!PyArray_Check(input) || !PyArray_ISNUMBER((PyArrayObject *)input)) {
        PyErr_SetString(PyExc_TypeError, "First argument must be a numpy array with numeric dtype");
        return NULL;
    }
    
    int64_t input_size = PyObject_Size(input);
    double *input_data;
    if (PyArray_AsCArray(&input, &input_data, (npy_intp[]){ [0] = input_size }, 1, PyArray_DescrFromType(NPY_DOUBLE)) != 0) {
        PyErr_SetString(PyExc_TypeError, "Failed to simulate C array of type double");
        return NULL;
    }
    
    int64_t output_size = input_size - window_size + 1;
    PyObject *output = PyArray_SimpleNew(1, (npy_intp[]){ [0] = output_size }, NPY_DOUBLE);
    double *output_data = PyArray_DATA((PyArrayObject *)output);
    
    double cumsum_before = 0;
    double cumsum_after = 0;
    for (int i = 0; i < window_size; ++i) {
        cumsum_after += input_data[i];
    }
    for (int i = 0; i < output_size - 1; ++i) {
        output_data[i] = (cumsum_after - cumsum_before) / window_size;
        cumsum_after += input_data[i + window_size];
        cumsum_before += input_data[i];
    }
    output_data[output_size - 1] = (cumsum_after - cumsum_before) / window_size;

    return output;
}

static PyMethodDef methods[] = {
    {
        "moving_average", 
        moving_average, 
        METH_VARARGS, 
        "Rolling mean of numpy array with specified window size"
    },
    {NULL, NULL, 0, NULL}
};

static struct PyModuleDef moduledef = {
    PyModuleDef_HEAD_INIT,
    "moving_average",
    "C extension for finding the rolling mean of a numpy array",
    -1,
    methods
};

PyMODINIT_FUNC PyInit_moving_average(void) {
    PyObject *module = PyModule_Create(&moduledef);
    import_array();
    return module;
}
  • METH_VARARGS 는 메서드가 positional 인수만 받아들이도록 지정합니다.PyArg_ParseTuple 그럼 이러한 위치 인수를 해석할 수 있습니다.

  • 메서드에서 NULL을 사용하여 반환함으로써 C 확장자에서 Python 인터프리터에 예외가 발생했음을 알릴 수 있습니다.

  • PyArray_AsCArray 어레이가 C 연속인지 여부에 관계없이 입력 어레이 dtype, 얼라인먼트에 관한 메서드는 다형성을 가질 수 있습니다( 「numpy 1d 어레이는 연속하지 않을있습니까?」를 참조).어레이의 카피를 작성할 필요가 없습니다.대신 을 사용했다면 직접 이 문제를 해결해야 합니다.

  • PyArray_SimpleNew 를 사용하면 새로운 numpy 배열을 만들 수 있습니다.이것은 를 사용하는 것과 비슷합니다.어레이가 초기화되지 않고 비결정적 정크가 포함되어 덮어쓰기를 잊은 경우 깜짝 놀랄 수 있습니다.

C 확장 기능 구축

# setup.py
from setuptools import setup, Extension
import numpy

setup(
  ext_modules=[
    Extension(
      'moving_average',
      ['moving_average.c'],
      include_dirs=[numpy.get_include()]
    )
  ]
)

# python setup.py build_ext --build-lib=.

벤치마크

import numpy as np

# Our compiled C extension:
from moving_average import moving_average as moving_average_c

# Answer by Jaime using npcumsum
def moving_average_cumsum(a, n) :
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1:] / n

# Answer by yatu using np.convolve
def moving_average_convolve(a, n):
    return np.convolve(a, np.ones(n), 'valid') / n

a = np.random.rand(1_000_000)
print('window_size = 3')
%timeit moving_average_c(a, 3)
%timeit moving_average_cumsum(a, 3)
%timeit moving_average_convolve(a, 3)

print('\nwindow_size = 100')
%timeit moving_average_c(a, 100)
%timeit moving_average_cumsum(a, 100)
%timeit moving_average_convolve(a, 100)
window_size = 3
958 µs ± 4.68 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
4.52 ms ± 15.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
809 µs ± 463 ns per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

window_size = 100
977 µs ± 937 ns per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
6.16 ms ± 19.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
14.2 ms ± 12.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

아래 용액과 numpy의 cumsum을 사용한 용액을 비교하면, 이것은 거의 절반의 시간이 걸립니다.이는 합계를 수행하기 위해 어레이 전체를 통과할 필요가 없으며 그 후 모든 감산을 수행할 필요가 없기 때문입니다.또한 배열이 크고 수가 많은 경우(오버플로우 가능성이 있음) 누적이 "위험"할 수 있습니다.물론 위험도 존재하지만 최소한 필수 수치만 합산하면 된다.

def moving_average(array_numbers, n):
    if n > len(array_numbers):
      return []
    temp_sum = sum(array_numbers[:n])
    averages = [temp_sum / float(n)]
    for first_index, item in enumerate(array_numbers[n:]):
        temp_sum += item - array_numbers[first_index]
        averages.append(temp_sum / float(n))
    return averages

사실 나는 받아들여진 대답과는 조금 다른 행동을 원했다.이동 평균 피쳐 추출기를 만들고 있었습니다.sklearn파이프라인 때문에 이동 평균의 출력은 입력과 동일한 차원을 갖도록 요구했습니다. 것은 일정하게 이다. 즉, 이 일정하다고 가정하는 것이다.[1,2,3,4,5] 2가 됩니다.[1.5,2.5,3.5,4.5,5.0]

열 벡터(내 사용 사례)에 대해서는

def moving_average_col(X, n):
  z2 = np.cumsum(np.pad(X, ((n,0),(0,0)), 'constant', constant_values=0), axis=0)
  z1 = np.cumsum(np.pad(X, ((0,n),(0,0)), 'constant', constant_values=X[-1]), axis=0)
  return (z1-z2)[(n-1):-1]/n

어레이의 경우

def moving_average_array(X, n):
  z2 = np.cumsum(np.pad(X, (n,0), 'constant', constant_values=0))
  z1 = np.cumsum(np.pad(X, (0,n), 'constant', constant_values=X[-1]))
  return (z1-z2)[(n-1):-1]/n

물론 패딩에 대해 일정한 값을 가정할 필요는 없지만 대부분의 경우 그렇게 하는 것이 적절합니다.

다음은 numba(유형에 주의)를 사용한 고속 구현입니다.이동 시에는 난이 포함되어 있습니다.

import numpy as np
import numba as nb

@nb.jit(nb.float64[:](nb.float64[:],nb.int64),
        fastmath=True,nopython=True)
def moving_average( array, window ):    
    ret = np.cumsum(array)
    ret[window:] = ret[window:] - ret[:-window]
    ma = ret[window - 1:] / window
    n = np.empty(window-1); n.fill(np.nan)
    return np.concatenate((n.ravel(), ma.ravel())) 

언급URL : https://stackoverflow.com/questions/14313510/how-to-calculate-rolling-moving-average-using-python-numpy-scipy

반응형