개발/Python

[파이썬] 1D 배열의 실행 평균을 계산하는 SciPy 함수 또는 NumPy 함수?

MinorMan 2023. 8. 12. 14:16
반응형

<질문>

특정 창에서 1D 배열의 실행 평균을 계산하는 Python용 SciPy 함수 또는 NumPy 함수 또는 모듈이 있습니까?


<답변1>

업데이트:보다 효율적인 솔루션이 제안되었으며,uniform_filter1d from scipy아마도 "표준" 타사 라이브러리 중에서 최고일 것이며 일부 최신 또는 특수 라이브러리도 사용할 수 있습니다.

당신이 사용할 수있는np.convolve그에 대한:

np.convolve(x, np.ones(N)/N, mode='valid')

실행 평균은 다음의 수학적 연산의 경우입니다.convolution. 실행 평균의 경우 입력을 따라 창을 슬라이드하고 창 내용의 평균을 계산합니다. 불연속 1D 신호의 경우 컨볼루션은 평균 대신 임의의 선형 조합을 계산한다는 점을 제외하면 동일합니다. 즉, 각 요소에 해당 계수를 곱하고 결과를 더합니다. 창의 각 위치에 대해 하나씩 있는 이러한 계수는 때때로 컨볼루션이라고 합니다.핵심. N 값의 산술 평균은 다음과 같습니다.(x_1 + x_2 + ... + x_N) / N, 따라서 해당 커널은(1/N, 1/N, ..., 1/N), 이것이 바로 우리가 사용하여 얻는 것입니다.np.ones(N)/N.

그만큼mode인수np.convolve가장자리를 처리하는 방법을 지정합니다. 나는 선택했다valid대부분의 사람들이 실행 평균이 작동할 것으로 기대하는 방식이라고 생각하지만 다른 우선 순위가 있을 수 있습니다. 다음은 모드 간의 차이를 보여주는 도표입니다.

import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
    plt.plot(np.convolve(np.ones(200), np.ones(50)/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()

러닝 평균 컨볼루션 모드


<답변2>

컨볼루션은 간단한 접근 방식보다 훨씬 낫지만 FFT를 사용하므로 상당히 느립니다. 그러나 특히 달리기 계산을 위해 다음 접근 방식이 잘 작동합니다.

def running_mean(x, N):
    cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)

확인할 코드

In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop

참고numpy.allclose(result1, result2)~이다True, 두 가지 방법은 동일합니다. N이 클수록 시간차가 커집니다.

the comments pointed out this floating point error issue here but i am making it more obvious here in the answer..

# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x, 10)
y2 = running_mean_cumsum(x, 10)
assert np.allclose(y1, y2, rtol=1e-12, atol=0)
  • 더 많은 포인트가 누적될수록 부동 소수점 오류가 커집니다(따라서 1e5 포인트가 눈에 띄고 1e6 포인트가 더 중요하며 1e6보다 크며 누산기를 재설정해야 할 수 있음)
  • 당신은 사용하여 속일 수 있습니다np.longdouble그러나 부동 소수점 오류는 상대적으로 많은 수의 포인트에 대해 여전히 중요합니다(약 >1e5이지만 데이터에 따라 다름).
  • 오류를 플로팅하고 상대적으로 빠르게 증가하는 것을 볼 수 있습니다.
  • the convolve solution느리지만 이러한 부동 소수점 정밀도 손실이 없습니다.
  • the uniform_filter1d solution이 cumsum 솔루션보다 빠르고 이 부동 소수점 정밀도 손실이 없습니다.

<답변3>

업데이트:아래 예는 오래된 것을 보여줍니다.pandas.rolling_mean최신 버전의 pandas에서 제거된 기능입니다. 해당 함수 호출에 해당하는 최신 기능은 다음을 사용합니다.pandas.Series.rolling:

In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]: 
array([ 0.49815397,  0.49844183,  0.49840518, ...,  0.49488191,
        0.49456679,  0.49427121])

pandasNumPy 또는 SciPy보다 이에 더 적합합니다. 그 기능rolling_mean일을 편리하게 합니다. 또한 입력이 배열일 때 NumPy 배열을 반환합니다.

이기기 어렵다rolling_mean사용자 지정 순수 Python 구현으로 성능이 향상됩니다. 다음은 제안된 솔루션 중 두 가지에 대한 성능 예시입니다.

In [1]: import numpy as np

In [2]: import pandas as pd

In [3]: def running_mean(x, N):
   ...:     cumsum = np.cumsum(np.insert(x, 0, 0)) 
   ...:     return (cumsum[N:] - cumsum[:-N]) / N
   ...:

In [4]: x = np.random.random(100000)

In [5]: N = 1000

In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop

In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop

In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop

In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True

가장자리 값을 처리하는 방법에 대한 좋은 옵션도 있습니다.


<답변4>

당신이 사용할 수있는scipy.ndimage.uniform_filter1d:

import numpy as np
from scipy.ndimage import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)

uniform_filter1d:

  • 동일한 numpy 모양(예: 포인트 수)으로 출력을 제공합니다.
  • 여러 방법으로 경계를 처리할 수 있습니다.'reflect'가 기본값이지만 제 경우에는 오히려'nearest'

또한 다소 빠릅니다(거의 50배 더 빠름).np.convolve그리고 2~5회faster than the cumsum approach given above):

%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop

%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop

다음은 다양한 구현의 오류/속도를 비교할 수 있는 3가지 기능입니다.

from __future__ import division
import numpy as np
import scipy.ndimage as ndi
def running_mean_convolve(x, N):
    return np.convolve(x, np.ones(N) / float(N), 'valid')
def running_mean_cumsum(x, N):
    cumsum = np.cumsum(np.insert(x, 0, 0))
    return (cumsum[N:] - cumsum[:-N]) / float(N)
def running_mean_uniform_filter1d(x, N):
    return ndi.uniform_filter1d(x, N, mode='constant', origin=-(N//2))[:-(N-1)]

<답변5>

다음을 사용하여 실행 평균을 계산할 수 있습니다.

import numpy as np

def runningMean(x, N):
    y = np.zeros((len(x),))
    for ctr in range(len(x)):
         y[ctr] = np.sum(x[ctr:(ctr+N)])
    return y/N

하지만 느립니다.

다행스럽게도 numpy에는convolve속도를 높이는 데 사용할 수 있는 기능입니다. 실행 평균은 컨볼루션과 동일합니다.x벡터로Nlong, 모든 멤버가 다음과 같음1/N. convolve의 numpy 구현에는 시작 과도 현상이 포함되므로 첫 번째 N-1 포인트를 제거해야 합니다.

def runningMeanFast(x, N):
    return np.convolve(x, np.ones((N,))/N)[(N-1):]

내 컴퓨터에서 빠른 버전은 입력 벡터의 길이와 평균화 창의 크기에 따라 20-30배 더 빠릅니다.

convolve에는'same'시작 일시적인 문제를 해결해야 하는 것처럼 보이지만 시작과 끝 사이에서 분할합니다.


<답변6>

종속성 없이 하나의 루프에서 모든 작업을 수행하는 짧고 빠른 솔루션의 경우 아래 코드가 훌륭하게 작동합니다.

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []

for i, x in enumerate(mylist, 1):
    cumsum.append(cumsum[i-1] + x)
    if i>=N:
        moving_ave = (cumsum[i] - cumsum[i-N])/N
        #can do stuff with moving_ave here
        moving_aves.append(moving_ave)

<답변7>

또는 계산하는 파이썬 용 모듈

Tradewave.net의 내 테스트에서 TA-lib는 항상 이깁니다.

import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t

PAIR = info.primary_pair
PERIOD = 30

def initialize():
    storage.reset()
    storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])

def cumsum_sma(array, period):
    ret = np.cumsum(array, dtype=float)
    ret[period:] = ret[period:] - ret[:-period]
    return ret[period - 1:] / period

def pandas_sma(array, period):
    return pd.rolling_mean(array, period)

def api_sma(array, period):
    # this method is native to Tradewave and does NOT return an array
    return (data[PAIR].ma(PERIOD))

def talib_sma(array, period):
    return ta.MA(array, period)

def convolve_sma(array, period):
    return np.convolve(array, np.ones((period,))/period, mode='valid')

def fftconvolve_sma(array, period):    
    return scipy.signal.fftconvolve(
        array, np.ones((period,))/period, mode='valid')    

def tick():

    close = data[PAIR].warmup_period('close')

    t1 = t.time()
    sma_api = api_sma(close, PERIOD)
    t2 = t.time()
    sma_cumsum = cumsum_sma(close, PERIOD)
    t3 = t.time()
    sma_pandas = pandas_sma(close, PERIOD)
    t4 = t.time()
    sma_talib = talib_sma(close, PERIOD)
    t5 = t.time()
    sma_convolve = convolve_sma(close, PERIOD)
    t6 = t.time()
    sma_fftconvolve = fftconvolve_sma(close, PERIOD)
    t7 = t.time()

    storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
    storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
    storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
    storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
    storage.elapsed[-5] = storage.elapsed[-5] + t6-t5    
    storage.elapsed[-6] = storage.elapsed[-6] + t7-t6        

    plot('sma_api', sma_api)  
    plot('sma_cumsum', sma_cumsum[-5])
    plot('sma_pandas', sma_pandas[-10])
    plot('sma_talib', sma_talib[-15])
    plot('sma_convolve', sma_convolve[-20])    
    plot('sma_fftconvolve', sma_fftconvolve[-25])

def stop():

    log('ticks....: %s' % info.max_ticks)

    log('api......: %.5f' % storage.elapsed[-1])
    log('cumsum...: %.5f' % storage.elapsed[-2])
    log('pandas...: %.5f' % storage.elapsed[-3])
    log('talib....: %.5f' % storage.elapsed[-4])
    log('convolve.: %.5f' % storage.elapsed[-5])    
    log('fft......: %.5f' % storage.elapsed[-6])

결과:

[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700  # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306

enter image description here


<답변8>

바로 사용할 수 있는 솔루션은 다음을 참조하십시오.https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html. 그것은 실행 평균을 제공합니다flat창 유형. 이것은 데이터를 반영하여 데이터의 시작과 끝에서 문제를 처리하려고 시도하기 때문에 단순한 do-it-yourself convolve-method보다 조금 더 정교합니다(귀하의 경우에 작동하거나 작동하지 않을 수 있음). ..).

먼저 다음을 시도해 볼 수 있습니다.

a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)

<답변9>

나는 이것이 오래된 질문이라는 것을 알고 있지만 여기에는 추가 데이터 구조나 라이브러리를 사용하지 않는 솔루션이 있습니다. 그것은 입력 목록의 요소 수에 선형적이며 더 효율적으로 만드는 다른 방법을 생각할 수 없습니다(실제로 결과를 할당하는 더 좋은 방법을 아는 사람이 있으면 알려주십시오).

메모:이것은 목록 대신 numpy 배열을 사용하는 것이 훨씬 빠르지만 모든 종속성을 제거하고 싶었습니다. 다중 스레드 실행으로 성능을 향상시킬 수도 있습니다.

이 함수는 입력 목록이 1차원이라고 가정하므로 주의하십시오.

### Running mean/Moving average
def running_mean(l, N):
    sum = 0
    result = list( 0 for x in l)

    for i in range( 0, N ):
        sum = sum + l[i]
        result[i] = sum / (i+1)

    for i in range( N, len(l) ):
        sum = sum - l[i-N] + l[i]
        result[i] = sum / N

    return result

목록이 있다고 가정합니다.data = [ 1, 2, 3, 4, 5, 6 ]주기가 3인 롤링 평균을 계산하고 싶고 입력 목록과 동일한 크기의 출력 목록도 원합니다(대부분의 경우).

첫 번째 요소의 인덱스는 0이므로 롤링 평균은 인덱스 -2, -1 및 0의 요소에 대해 계산되어야 합니다. 분명히 data[-2] 및 data[-1]이 없습니다(특별한 경계 조건) 따라서 해당 요소가 0이라고 가정합니다. 이것은 목록을 0으로 채우는 것과 동일합니다. 단, 실제로 패딩하지 않고 패딩이 필요한 인덱스(0에서 N-1까지)만 추적하면 됩니다.

따라서 첫 번째 N개의 요소에 대해 누산기의 요소를 계속 추가합니다.

result[0] = (0 + 0 + 1) / 3  = 0.333    ==   (sum + 1) / 3
result[1] = (0 + 1 + 2) / 3  = 1        ==   (sum + 2) / 3
result[2] = (1 + 2 + 3) / 3  = 2        ==   (sum + 3) / 3

N+1 요소부터 단순 누적이 작동하지 않습니다. 우리는 기대한다result[3] = (2 + 3 + 4)/3 = 3하지만 이것은 다른(sum + 4)/3 = 3.333.

올바른 값을 계산하는 방법은 빼는 것입니다.data[0] = 1~에서sum+4, 따라서주는sum + 4 - 1 = 9.

이것은 현재 때문에 발생합니다sum = data[0] + data[1] + data[2], 하지만 모든 경우에도 마찬가지입니다.i >= N빼기 전에sum~이다data[i-N] + ... + data[i-2] + data[i-1].


<답변10>

나는 이것이 다음을 사용하여 우아하게 해결할 수 있다고 생각합니다.bottleneck

아래의 기본 샘플을 참조하십시오.

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)
  • "mm"은 "a"의 이동 평균입니다.

  • "창"은 이동 평균에 대해 고려할 최대 항목 수입니다.

  • "min_count"는 이동 평균에 대해 고려할 최소 항목 수입니다(예: 처음 몇 개의 요소 또는 배열에 nan 값이 있는 경우).

좋은 점은 Bottleneck이 nan 값을 처리하는 데 도움이 되고 매우 효율적이라는 것입니다.


<답변11>

이것이 얼마나 빠른지 아직 확인하지 않았지만 시도해 볼 수 있습니다.

from collections import deque

cache = deque() # keep track of seen values
n = 10          # window size
A = xrange(100) # some dummy iterable
cum_sum = 0     # initialize cumulative sum

for t, val in enumerate(A, 1):
    cache.append(val)
    cum_sum += val
    if t < n:
        avg = cum_sum / float(t)
    else:                           # if window is saturated,
        cum_sum -= cache.popleft()  # subtract oldest value
        avg = cum_sum / float(n)

<답변12>

numpy 또는 scipy 대신 pandas를 사용하여 더 신속하게 수행하는 것이 좋습니다.

df['data'].rolling(3).mean()

이것은 열 "데이터"의 3개 기간의 이동 평균(MA)을 취합니다. 이동된 버전을 계산할 수도 있습니다. 예를 들어 현재 셀을 제외하는 셀(하나 뒤로 이동)은 다음과 같이 쉽게 계산할 수 있습니다.

df['data'].shift(periods=1).rolling(3).mean()

<답변13>

Python 표준 라이브러리 솔루션

이 생성기 함수는 iterable과 창 크기를 사용합니다.N창 내부의 현재 값에 대한 평균을 산출합니다. 그것은deque, 목록과 유사한 데이터 구조이지만 빠른 수정에 최적화됨(pop,append)두 끝점에서.

from collections import deque
from itertools import islice

def sliding_avg(iterable, N):        
    it = iter(iterable)
    window = deque(islice(it, N))        
    num_vals = len(window)

    if num_vals < N:
        msg = 'window size {} exceeds total number of values {}'
        raise ValueError(msg.format(N, num_vals))

    N = float(N) # force floating point division if using Python 2
    s = sum(window)
    
    while True:
        yield s/N
        try:
            nxt = next(it)
        except StopIteration:
            break
        s = s - window.popleft() + nxt
        window.append(nxt)
        

작동 중인 기능은 다음과 같습니다.

>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>> 
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0

<답변14>

파티에 조금 늦었지만 끝이나 패드를 0으로 감싸지 않고 평균을 찾는 데 사용되는 나만의 작은 기능을 만들었습니다. 추가 처리는 선형 간격 지점에서 신호를 다시 샘플링한다는 것입니다. 다른 기능을 사용하려면 마음대로 코드를 사용자 정의하십시오.

이 방법은 정규화된 가우시안 커널을 사용한 간단한 행렬 곱셈입니다.

def running_mean(y_in, x_in, N_out=101, sigma=1):
    '''
    Returns running mean as a Bell-curve weighted average at evenly spaced
    points. Does NOT wrap signal around, or pad with zeros.
    
    Arguments:
    y_in -- y values, the values to be smoothed and re-sampled
    x_in -- x values for array
    
    Keyword arguments:
    N_out -- NoOf elements in resampled array.
    sigma -- 'Width' of Bell-curve in units of param x .
    '''
    import numpy as np
    N_in = len(y_in)

    # Gaussian kernel
    x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
    x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
    gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
    # Normalize kernel, such that the sum is one along axis 1
    normalization = np.tile(np.reshape(np.sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
    gauss_kernel_normalized = gauss_kernel / normalization
    # Perform running average as a linear operation
    y_out = gauss_kernel_normalized @ y_in

    return y_out, x_out

정규 분포 잡음이 추가된 정현파 신호에 대한 간단한 사용법:enter image description here


<답변15>

또 다른이동 평균을 찾는 접근법없이사용numpy또는pandas

import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(
    lambda a,b: b/a, 
    enumerate(itertools.accumulate(sample), 1))
)

인쇄합니다[2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]

  • 2.0 = (2)/1
  • 4.0 = (2 + 6) / 2
  • 6.0 = (2 + 6 + 10) / 3
  • ...

<답변16>

실행 평균 계산에 대한 위에 많은 답변이 있습니다. 내 대답은 두 가지 추가 기능을 추가합니다.

  1. nan 값을 무시합니다.
  2. 관심 있는 값 자체를 포함하지 않는 N개의 인접 값에 대한 평균을 계산합니다.

이 두 번째 기능은 어떤 값이 일반 추세와 일정량 차이가 나는지 확인하는 데 특히 유용합니다.

가장 시간 효율적인 방법이므로 numpy.cumsum을 사용합니다(see Alleo's answer above).

N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x) 
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)

이 코드는 Ns에만 적용됩니다. padded_x 및 n_nan의 np.insert를 변경하여 홀수로 조정할 수 있습니다.

출력 예(검은색 원시, 파란색 movavg):raw data (black) and moving average (blue) of 10 points around each value, not including that value. nan values are ignored.

이 코드는 cutoff = 3 non-nan 값보다 적은 값에서 계산된 모든 이동 평균 값을 제거하도록 쉽게 조정할 수 있습니다.

window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values

raw data (black) and moving average (blue) while ignoring any window with fewer than 3 non-nan values


<답변17>

님의 댓글이 있습니다.mab중 하나에 묻혔다.answers위의 방법이 있습니다.bottleneck가지다move_mean간단한 이동 평균입니다.

import numpy as np
import bottleneck as bn

a = np.arange(10) + np.random.random(10)

mva = bn.move_mean(a, window=2, min_count=1)

min_count기본적으로 배열의 해당 지점까지 이동 평균을 취하는 편리한 매개변수입니다. 설정하지 않으면min_count, 같을 것이다window, 그리고 모든 것까지window포인트는nan.


<답변18>

@Aikude의 변수를 사용하여 한 줄로 작성했습니다.

import numpy as np

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3

mean = [np.mean(mylist[x:x+N]) for x in range(len(mylist)-N+1)]
print(mean)

>>> [2.0, 3.0, 4.0, 5.0, 6.0]

<답변19>

앞서 언급한 모든 솔루션은 부족하기 때문에 좋지 않습니다.

  • numpy 벡터화 구현 대신 네이티브 파이썬으로 인한 속도,
  • 잘못된 사용으로 인한 수치적 안정성numpy.cumsum, 또는
  • 로 인한 속도O(len(x) * w)컨볼루션으로 구현.

주어진

import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000

참고x_[:w].sum()같음x[:w-1].sum(). 그래서 첫 번째 평균은numpy.cumsum(...)추가x[w] / w(을 통해x_[w+1] / w), 빼기0(에서x_[0] / w). 이로 인해x[0:w].mean()

cumsum을 통해 추가로 두 번째 평균을 업데이트합니다.x[w+1] / w빼기x[0] / w, 를 야기하는x[1:w+1].mean().

이것은 까지 계속됩니다x[-w:].mean()도달했습니다.

x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w

이 솔루션은 벡터화되어 있습니다.O(m), 읽기 쉽고 수치적으로 안정적입니다.


<답변20>

이 질문은 지금더 오래된NeXuS가 지난 달에 그것에 대해 썼을 때보다, 하지만 나는 그의 코드가 엣지 케이스를 다루는 방식을 좋아합니다. 그러나 "단순 이동 평균"이기 때문에 결과가 적용되는 데이터보다 뒤떨어집니다. NumPy의 모드보다 더 만족스러운 방식으로 엣지 케이스를 처리하는 것이 좋다고 생각했습니다.valid,same, 그리고full에 유사한 접근 방식을 적용하여 달성할 수 있습니다.convolution()기반 방법.

내 기여는 중앙 실행 평균을 사용하여 결과를 데이터와 일치시킵니다. 전체 크기 창을 사용할 수 있는 포인트가 너무 적은 경우 배열 가장자리에 있는 연속적으로 작은 창에서 실행 평균이 계산됩니다. [실제로, 연속적으로 더 큰 창에서, 그러나 그것은 구현 세부 사항입니다.]

import numpy as np

def running_mean(l, N):
    # Also works for the(strictly invalid) cases when N is even.
    if (N//2)*2 == N:
        N = N - 1
    front = np.zeros(N//2)
    back = np.zeros(N//2)

    for i in range(1, (N//2)*2, 2):
        front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
    for i in range(1, (N//2)*2, 2):
        back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
    return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])

사용하기 때문에 상대적으로 느립니다.convolve(), 진정한 Pythonista에 의해 꽤 많이 꾸며질 수 있지만 아이디어가 유효하다고 믿습니다.


<답변21>

새로운convolve레시피는merged파이썬 3.10으로.

주어진


import collections, operator

from itertools import chain, repeat


size = 3 + 1
kernel = [1/size] * size                                              

암호

def convolve(signal, kernel):
    # See:  https://betterexplained.com/articles/intuitive-convolution/
    # convolve(data, [0.25, 0.25, 0.25, 0.25]) --> Moving average (blur)
    # convolve(data, [1, -1]) --> 1st finite difference (1st derivative)
    # convolve(data, [1, -2, 1]) --> 2nd finite difference (2nd derivative)
    kernel = list(reversed(kernel))
    n = len(kernel)
    window = collections.deque([0] * n, maxlen=n)
    for x in chain(signal, repeat(0, n-1)):
        window.append(x)
        yield sum(map(operator.mul, kernel, window))

데모

list(convolve(range(1, 6), kernel))
# [0.25, 0.75, 1.5, 2.5, 3.5, 3.0, 2.25, 1.25]

세부

convolution에 적용할 수 있는 일반적인 수학 연산입니다.moving averages. 아이디어는 일부 데이터가 주어지면 데이터의 하위 집합(창)을 데이터 전체에서 "마스크" 또는 "커널"로 슬라이드하여 각 창에 대해 특정 수학적 연산을 수행하는 것입니다. 이동 평균의 경우 커널은 평균입니다.

enter image description here

지금 이 구현을 다음을 통해 사용할 수 있습니다.more_itertools.convolve.more_itertools인기 있는 타사 패키지입니다. 통해 설치> pip install more_itertools.


<답변22>

다른 답변을 읽은 결과 이것이 질문에서 요구한 것이라고 생각하지 않지만 크기가 커지는 값 목록의 실행 평균을 유지할 필요가 있어 여기에 왔습니다.

따라서 어딘가(사이트, 측정 장치 등)에서 얻은 값 목록과 마지막 평균의 목록을 유지하려는 경우n값이 업데이트되면 새 요소를 추가하는 노력을 최소화하는 다음 코드를 사용할 수 있습니다.

class Running_Average(object):
    def __init__(self, buffer_size=10):
        """
        Create a new Running_Average object.

        This object allows the efficient calculation of the average of the last
        `buffer_size` numbers added to it.

        Examples
        --------
        >>> a = Running_Average(2)
        >>> a.add(1)
        >>> a.get()
        1.0
        >>> a.add(1)  # there are two 1 in buffer
        >>> a.get()
        1.0
        >>> a.add(2)  # there's a 1 and a 2 in the buffer
        >>> a.get()
        1.5
        >>> a.add(2)
        >>> a.get()  # now there's only two 2 in the buffer
        2.0
        """
        self._buffer_size = int(buffer_size)  # make sure it's an int
        self.reset()

    def add(self, new):
        """
        Add a new number to the buffer, or replaces the oldest one there.
        """
        new = float(new)  # make sure it's a float
        n = len(self._buffer)
        if n < self.buffer_size:  # still have to had numbers to the buffer.
            self._buffer.append(new)
            if self._average != self._average:  # ~ if isNaN().
                self._average = new  # no previous numbers, so it's new.
            else:
                self._average *= n  # so it's only the sum of numbers.
                self._average += new  # add new number.
                self._average /= (n+1)  # divide by new number of numbers.
        else:  # buffer full, replace oldest value.
            old = self._buffer[self._index]  # the previous oldest number.
            self._buffer[self._index] = new  # replace with new one.
            self._index += 1  # update the index and make sure it's...
            self._index %= self.buffer_size  # ... smaller than buffer_size.
            self._average -= old/self.buffer_size  # remove old one...
            self._average += new/self.buffer_size  # ...and add new one...
            # ... weighted by the number of elements.

    def __call__(self):
        """
        Return the moving average value, for the lazy ones who don't want
        to write .get .
        """
        return self._average

    def get(self):
        """
        Return the moving average value.
        """
        return self()

    def reset(self):
        """
        Reset the moving average.

        If for some reason you don't want to just create a new one.
        """
        self._buffer = []  # could use np.empty(self.buffer_size)...
        self._index = 0  # and use this to keep track of how many numbers.
        self._average = float('nan')  # could use np.NaN .

    def get_buffer_size(self):
        """
        Return current buffer_size.
        """
        return self._buffer_size

    def set_buffer_size(self, buffer_size):
        """
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]

        Decreasing buffer size:
        >>> a.buffer_size = 6
        >>> a._buffer  # should not access this!!
        [9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        >>> a.buffer_size = 2
        >>> a._buffer
        [13.0, 14.0]

        Increasing buffer size:
        >>> a.buffer_size = 5
        Warning: no older data available!
        >>> a._buffer
        [13.0, 14.0]

        Keeping buffer size:
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
        >>> a.buffer_size = 10  # reorders buffer!
        >>> a._buffer
        [5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        """
        buffer_size = int(buffer_size)
        # order the buffer so index is zero again:
        new_buffer = self._buffer[self._index:]
        new_buffer.extend(self._buffer[:self._index])
        self._index = 0
        if self._buffer_size < buffer_size:
            print('Warning: no older data available!')  # should use Warnings!
        else:
            diff = self._buffer_size - buffer_size
            print(diff)
            new_buffer = new_buffer[diff:]
        self._buffer_size = buffer_size
        self._buffer = new_buffer

    buffer_size = property(get_buffer_size, set_buffer_size)

예를 들어 다음과 같이 테스트할 수 있습니다.

def graph_test(N=200):
    import matplotlib.pyplot as plt
    values = list(range(N))
    values_average_calculator = Running_Average(N/2)
    values_averages = []
    for value in values:
        values_average_calculator.add(value)
        values_averages.append(values_average_calculator())
    fig, ax = plt.subplots(1, 1)
    ax.plot(values, label='values')
    ax.plot(values_averages, label='averages')
    ax.grid()
    ax.set_xlim(0, N)
    ax.set_ylim(0, N)
    fig.show()

다음을 제공합니다.

Values and their average as a function of values #


<답변23>

교육 목적으로 Numpy 솔루션을 두 개 더 추가하겠습니다(cumsum 솔루션보다 느림).

import numpy as np
from numpy.lib.stride_tricks import as_strided

def ra_strides(arr, window):
    ''' Running average using as_strided'''
    n = arr.shape[0] - window + 1
    arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
    return arr_strided.mean(axis=1)

def ra_add(arr, window):
    ''' Running average using add.reduceat'''
    n = arr.shape[0] - window + 1
    indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
    arr = np.append(arr, 0)
    return np.add.reduceat(arr, indices )[::2]/window

사용된 기능:as_strided,add.reduceat


<답변24>

Python 표준 라이브러리만 사용(메모리 효율적)

표준 라이브러리를 사용하는 다른 버전을 제공하십시오.deque오직. 대부분의 답변이pandas또는numpy.

def moving_average(iterable, n=3):
    d = deque(maxlen=n)
    for i in iterable:
        d.append(i)
        if len(d) == n:
            yield sum(d)/n

r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]

사실 나는 다른 것을 찾았다.implementation in python docs

def moving_average(iterable, n=3):
    # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable)
    d = deque(itertools.islice(it, n-1))
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

그러나 구현은 예상보다 약간 더 복잡해 보입니다. 그러나 이유 때문에 표준 파이썬 문서에 있어야 합니다. 누군가 내 구현과 표준 문서에 대해 언급할 수 있습니까?


<답변25>

어때이동 평균 필터? 또한 한 줄짜리이며 직사각형이 아닌 다른 것이 필요한 경우 창 유형을 쉽게 조작할 수 있다는 장점이 있습니다. 배열 a의 N-long 단순 이동 평균:

lfilter(np.ones(N)/N, [1], a)[N:]

삼각형 창이 적용된 경우:

lfilter(np.ones(N)*scipy.signal.triang(N)/N, [1], a)[N:]

참고: 나는 보통 처음 N개의 샘플을 가짜로 버립니다.[N:]결국에는 필요하지 않으며 개인적인 선택의 문제입니다.


<답변26>

내 솔루션은 Wikipedia의 "단순 이동 평균"을 기반으로 합니다.

from numba import jit
@jit
def sma(x, N):
    s = np.zeros_like(x)
    k = 1 / N
    s[0] = x[0] * k
    for i in range(1, N + 1):
        s[i] = s[i - 1] + x[i] * k
    for i in range(N, x.shape[0]):
        s[i] = s[i - 1] + (x[i] - x[i - N]) * k
    s = s[N - 1:]
    return s

이전에 제안된 솔루션과 비교하면 scipy에서 가장 빠른 솔루션인 "uniform_filter1d"보다 두 배 빠르며 동일한 오류 순서를 가짐을 알 수 있습니다. 속도 테스트:

import numpy as np    
x = np.random.random(10000000)
N = 1000

from scipy.ndimage.filters import uniform_filter1d
%timeit uniform_filter1d(x, size=N)
95.7 ms ± 9.34 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit sma(x, N)
47.3 ms ± 3.42 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

오류 비교:

np.max(np.abs(np.convolve(x, np.ones((N,))/N, mode='valid') - uniform_filter1d(x, size=N, mode='constant', origin=-(N//2))[:-(N-1)]))
8.604228440844963e-14
np.max(np.abs(np.convolve(x, np.ones((N,))/N, mode='valid') - sma(x, N)))
1.41886502547095e-13

<답변27>

여기에 이 질문에 대한 해결책이 있지만 제 해결책을 살펴보시기 바랍니다. 그것은 매우 간단하고 잘 작동합니다.

import numpy as np
dataset = np.asarray([1, 2, 3, 4, 5, 6, 7])
ma = list()
window = 3
for t in range(0, len(dataset)):
    if t+window <= len(dataset):
        indices = range(t, t+window)
        ma.append(np.average(np.take(dataset, indices)))
else:
    ma = np.asarray(ma)

<답변28>

표준 라이브러리와 deque를 사용하는 또 다른 솔루션:

from collections import deque
import itertools

def moving_average(iterable, n=3):
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable) 
    # create an iterable object from input argument
    d = deque(itertools.islice(it, n-1))  
    # create deque object by slicing iterable
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

# example on how to use it
for i in  moving_average([40, 30, 50, 46, 39, 44]):
    print(i)

# 40.0
# 42.0
# 45.0
# 43.0

<답변29>

매우 작은 배열(약 200개 미만의 요소)에 대해 이 작업을 반복적으로 수행해야 하는 경우 선형 대수를 사용하여 가장 빠른 결과를 찾았습니다. 가장 느린 부분은 곱셈 행렬 y를 설정하는 것입니다. 이 작업은 한 번만 수행하면 되지만 그 후에는 더 빠를 수 있습니다.

import numpy as np
import random 

N = 100      # window size
size =200     # array length

x = np.random.random(size)
y = np.eye(size, dtype=float)

# prepare matrix
for i in range(size):
  y[i,i:i+N] = 1./N
  
# calculate running mean
z = np.inner(x,y.T)[N-1:]


<답변30>

기존 라이브러리를 사용하지 않고 직접 롤링하기로 선택한 경우 부동 소수점 오류를 의식하고 그 영향을 최소화하십시오.

class SumAccumulator:
    def __init__(self):
        self.values = [0]
        self.count = 0

    def add( self, val ):
        self.values.append( val )
        self.count = self.count + 1
        i = self.count
        while i & 0x01:
            i = i >> 1
            v0 = self.values.pop()
            v1 = self.values.pop()
            self.values.append( v0 + v1 )

    def get_total(self):
        return sum( reversed(self.values) )

    def get_size( self ):
        return self.count
반응형