Bewegend gemiddelde of voortschrijdend gemiddelde

Is er een SciPy-functie of NumPy-functie of module voor Python die het lopende gemiddelde van een 1D-array berekent in een specifiek venster?


Antwoord 1, autoriteit 100%

Voor een korte, snelle oplossing die alles in één lus doet, zonder afhankelijkheden, werkt de onderstaande code prima.

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []
for i, x in enumerate(mylist, 1):
    cumsum.append(cumsum[i-1] + x)
    if i>=N:
        moving_ave = (cumsum[i] - cumsum[i-N])/N
        #can do stuff with moving_ave here
        moving_aves.append(moving_ave)

Antwoord 2, autoriteit 96%

UPDATE:er zijn efficiëntere oplossingen voorgesteld, uniform_filter1dvan scipyis waarschijnlijk de beste onder de “standaard” bibliotheken van derden, en er zijn ook enkele nieuwere of gespecialiseerde bibliotheken beschikbaar.


U kunt np.convolvedaarvoor:

np.convolve(x, np.ones(N)/N, mode='valid')

Uitleg

Het lopende gemiddelde is een geval van de wiskundige bewerking van convolutie. Voor het lopende gemiddelde schuift u een venster langs de invoer en berekent u het gemiddelde van de inhoud van het venster. Voor discrete 1D-signalen is convolutie hetzelfde, behalve dat u in plaats van het gemiddelde een willekeurige lineaire combinatie berekent, d.w.z. elk element vermenigvuldigt met een overeenkomstige coëfficiënt en de resultaten bij elkaar optelt. Die coëfficiënten, één voor elke positie in het venster, worden soms de convolutie kernelgenoemd. Het rekenkundig gemiddelde van N-waarden is (x_1 + x_2 + ... + x_N) / N, dus de bijbehorende kernel is (1/N, 1/N, ..., 1/N), en dat is precies wat we krijgen door np.ones(N)/Nte gebruiken.

Randen

Het argument modevan np.convolvegeeft aan hoe de randen moeten worden afgehandeld. Ik heb hier de modus validgekozen omdat ik denk dat de meeste mensen zo verwachten dat de running mean werkt, maar je hebt misschien andere prioriteiten. Hier is een plot dat het verschil tussen de modi illustreert:

import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
    plt.plot(np.convolve(np.ones(200), np.ones(50)/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()


Antwoord 3, autoriteit 96%

Efficiënte oplossing

Convolutie is veel beter dan een eenvoudige benadering, maar (denk ik) gebruikt FFT en is dus vrij traag. Maar speciaal voor het berekenen van het lopende gemiddelde werkt de volgende aanpak prima

def running_mean(x, N):
    cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)

De te controleren code

In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop

Merk op dat numpy.allclose(result1, result2)Trueis, twee methoden zijn equivalent.
Hoe groter N, hoe groter het verschil in tijd.

waarschuwing: hoewel cumsum sneller is, zal er een grotere drijvende-kommafout optreden die ertoe kan leiden dat uw resultaten ongeldig/onjuist/onaanvaardbaar zijn

de opmerkingen wezen hier op dit probleem met zwevende komma’s, maar ik maak het duidelijker hier in het antwoord..

# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x, 10)
y2 = running_mean_cumsum(x, 10)
assert np.allclose(y1, y2, rtol=1e-12, atol=0)
  • hoe meer punten je verzamelt over hoe groter de drijvende-kommafout (dus 1e5 punten is merkbaar, 1e6 punten is belangrijker, meer dan 1e6 en misschien wil je de accumulatoren resetten)
  • je kunt vals spelen door np.longdoublemaar uw drijvende-kommafout wordt nog steeds significant voor een relatief groot aantal punten (ongeveer >1e5 maar hangt af van uw gegevens)
  • je kunt de fout plotten en deze relatief snel zien toenemen
  • de convolve-oplossingis langzamer maar heeft dit floating point-verlies aan precisie niet
  • de uniform_filter1d-oplossingis sneller dan deze cumsum-oplossing EN heeft dit zwevende-kommaverlies van precisie niet

Antwoord 4, autoriteit 95%

Update:het onderstaande voorbeeld toont de oude functie pandas.rolling_meandie in recente versies van panda’s is verwijderd. Een modern equivalent van de onderstaande functieaanroep zou zijn

In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]: 
array([ 0.49815397,  0.49844183,  0.49840518, ...,  0.49488191,
        0.49456679,  0.49427121])

panda’sis hiervoor meer geschikt dan NumPy of SciPy. De functie rolling_meandoet het werk gemakkelijk . Het retourneert ook een NumPy-array wanneer de invoer een array is.

Het is moeilijk om rolling_meante verslaan in prestaties met een aangepaste pure Python-implementatie. Hier is een voorbeeldprestatie tegen twee van de voorgestelde oplossingen:

In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: def running_mean(x, N):
   ...:     cumsum = np.cumsum(np.insert(x, 0, 0)) 
   ...:     return (cumsum[N:] - cumsum[:-N]) / N
   ...:
In [4]: x = np.random.random(100000)
In [5]: N = 1000
In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop
In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop
In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop
In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True

Er zijn ook leuke opties om te gaan met de randwaarden.


5, Autoriteit 196%

U kunt scipy.ndimage.filters.uniform_filter1d :

import numpy as np
from scipy.ndimage.filters import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)

uniform_filter1d:

  • geeft de uitvoer met dezelfde numpy-vorm (d.w.z. aantal punten)
  • Hiermee kunnen meerdere manieren omgaan met de grens waarbij 'reflect'de standaard is, maar in mijn geval wilde ik liever 'nearest'

Het is ook eerder snel (bijna 50 keer sneller dan np.convolveen 2-5 keer sneller dan de bovenstaande cumsum-benadering ):

%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop
%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop

hier zijn 3 functies waarmee u de fout/snelheid van verschillende implementaties kunt vergelijken:

from __future__ import division
import numpy as np
import scipy.ndimage.filters as ndif
def running_mean_convolve(x, N):
    return np.convolve(x, np.ones(N) / float(N), 'valid')
def running_mean_cumsum(x, N):
    cumsum = np.cumsum(np.insert(x, 0, 0))
    return (cumsum[N:] - cumsum[:-N]) / float(N)
def running_mean_uniform_filter1d(x, N):
    return ndif.uniform_filter1d(x, N, mode='constant', origin=-(N//2))[:-(N-1)]

Antwoord 6, autoriteit 186%

Je kunt een lopend gemiddelde berekenen met:

import numpy as np
def runningMean(x, N):
    y = np.zeros((len(x),))
    for ctr in range(len(x)):
         y[ctr] = np.sum(x[ctr:(ctr+N)])
    return y/N

Maar het is traag.

Gelukkig bevat numpy een convolve-functie die we kunnen gebruiken om dingen te versnellen. Het lopende gemiddelde is gelijk aan het convolueren van xmet een vector die nlang is, waarbij alle leden gelijk zijn aan 1/N. De numpy implementatie van convolve omvat de beginnende transiënt, dus je moet de eerste N-1 punten verwijderen:

def runningMeanFast(x, N):
    return np.convolve(x, np.ones((N,))/N)[(N-1):]

Op mijn computer is de snelle versie 20-30 keer sneller, afhankelijk van de lengte van de invoervector en de grootte van het middelingsvenster.

Houd er rekening mee dat convolve een 'same'-modus bevat die het lijkt alsof het het tijdelijke probleem van de start zou moeten aanpakken, maar het verdeelt het tussen het begin en het einde.


Antwoord 7, autoriteit 96%

of module voor python die berekent

in mijn tests op Tradewave.net wint TA-lib altijd:

import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t
PAIR = info.primary_pair
PERIOD = 30
def initialize():
    storage.reset()
    storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])
def cumsum_sma(array, period):
    ret = np.cumsum(array, dtype=float)
    ret[period:] = ret[period:] - ret[:-period]
    return ret[period - 1:] / period
def pandas_sma(array, period):
    return pd.rolling_mean(array, period)
def api_sma(array, period):
    # this method is native to Tradewave and does NOT return an array
    return (data[PAIR].ma(PERIOD))
def talib_sma(array, period):
    return ta.MA(array, period)
def convolve_sma(array, period):
    return np.convolve(array, np.ones((period,))/period, mode='valid')
def fftconvolve_sma(array, period):    
    return scipy.signal.fftconvolve(
        array, np.ones((period,))/period, mode='valid')    
def tick():
    close = data[PAIR].warmup_period('close')
    t1 = t.time()
    sma_api = api_sma(close, PERIOD)
    t2 = t.time()
    sma_cumsum = cumsum_sma(close, PERIOD)
    t3 = t.time()
    sma_pandas = pandas_sma(close, PERIOD)
    t4 = t.time()
    sma_talib = talib_sma(close, PERIOD)
    t5 = t.time()
    sma_convolve = convolve_sma(close, PERIOD)
    t6 = t.time()
    sma_fftconvolve = fftconvolve_sma(close, PERIOD)
    t7 = t.time()
    storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
    storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
    storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
    storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
    storage.elapsed[-5] = storage.elapsed[-5] + t6-t5    
    storage.elapsed[-6] = storage.elapsed[-6] + t7-t6        
    plot('sma_api', sma_api)  
    plot('sma_cumsum', sma_cumsum[-5])
    plot('sma_pandas', sma_pandas[-10])
    plot('sma_talib', sma_talib[-15])
    plot('sma_convolve', sma_convolve[-20])    
    plot('sma_fftconvolve', sma_fftconvolve[-25])
def stop():
    log('ticks....: %s' % info.max_ticks)
    log('api......: %.5f' % storage.elapsed[-1])
    log('cumsum...: %.5f' % storage.elapsed[-2])
    log('pandas...: %.5f' % storage.elapsed[-3])
    log('talib....: %.5f' % storage.elapsed[-4])
    log('convolve.: %.5f' % storage.elapsed[-5])    
    log('fft......: %.5f' % storage.elapsed[-6])

resultaten:

[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700  # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306


Antwoord 8, autoriteit 79%

Zie https://scipy-cookbook voor een kant-en-klare oplossing. readthedocs.io/items/SignalSmooth.html.
Het biedt een lopend gemiddelde met het venstertype flat. Merk op dat dit een beetje geavanceerder is dan de eenvoudige doe-het-zelf-convolve-methode, omdat het de problemen aan het begin en het einde van de gegevens probeert op te lossen door deze weer te geven (wat in jouw geval al dan niet werkt. ..).

Om te beginnen kun je het volgende proberen:

a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)

Antwoord 9, autoriteit 61%

Ik weet dat dit een oude vraag is, maar hier is een oplossing die geen extra datastructuren of bibliotheken gebruikt. Het is lineair in het aantal elementen van de invoerlijst en ik kan geen andere manier bedenken om het efficiënter te maken (eigenlijk als iemand een betere manier weet om het resultaat toe te wijzen, laat het me dan weten).

OPMERKING:dit zou veel sneller zijn met een numpy-array in plaats van een lijst, maar ik wilde alle afhankelijkheden elimineren. Het zou ook mogelijk zijn om de prestaties te verbeteren door multi-threaded uitvoering

De functie gaat ervan uit dat de invoerlijst eendimensionaal is, dus wees voorzichtig.

### Running mean/Moving average
def running_mean(l, N):
    sum = 0
    result = list( 0 for x in l)
    for i in range( 0, N ):
        sum = sum + l[i]
        result[i] = sum / (i+1)
    for i in range( N, len(l) ):
        sum = sum - l[i-N] + l[i]
        result[i] = sum / N
    return result

Voorbeeld

Stel dat we een lijst hebben data = [ 1, 2, 3, 4, 5, 6 ]waarop we een voortschrijdend gemiddelde willen berekenen met een periode van 3, en dat u ook een uitvoerlijst die even groot is als de invoerlijst (dat is meestal het geval).

Het eerste element heeft index 0, dus het voortschrijdend gemiddelde moet worden berekend op elementen van index -2, -1 en 0. Uiteraard hebben we geen data[-2] en data[-1] (tenzij je wilt om speciale randvoorwaarden te gebruiken), dus we nemen aan dat die elementen 0 zijn. Dit komt overeen met het opvullen van de lijst met nul, behalve dat we het niet echt opvullen, alleen de indices bijhouden die opvulling vereisen (van 0 tot N- 1).

Dus, voor de eerste N elementen blijven we de elementen in een accumulator optellen.

result[0] = (0 + 0 + 1) / 3  = 0.333    ==   (sum + 1) / 3
result[1] = (0 + 1 + 2) / 3  = 1        ==   (sum + 2) / 3
result[2] = (1 + 2 + 3) / 3  = 2        ==   (sum + 3) / 3

Van elementen N+1 naar voren werkt eenvoudige accumulatie niet. we verwachten result[3] = (2 + 3 + 4)/3 = 3maar dit is anders dan (sum + 4)/3 = 3.333.

De manier om de juiste waarde te berekenen is door data[0] = 1af te trekken van sum+4, waardoor sum + 4 - 1 = 9.

Dit gebeurt omdat momenteel sum = data[0] + data[1] + data[2], maar het geldt ook voor elke i >= Nomdat, vóór de aftrekking, sumdata[i-N] + ... + data[i-2] + data[i-1]is.


Antwoord 10, autoriteit 54%

Ik denk dat dit elegant kan worden opgelost met behulp van knelpunt

Bekijk het basisvoorbeeld hieronder:

import numpy as np
import bottleneck as bn
a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)
  • “mm” is het bewegende gemiddelde voor “a”.

  • “window” is het maximale aantal items dat in aanmerking moet worden genomen voor voortschrijdend gemiddelde.

  • “min_count” is het minimale aantal vermeldingen dat in aanmerking moet worden genomen voor voortschrijdend gemiddelde (bijvoorbeeld voor de eerste paar elementen of als de array nan-waarden heeft).

Het goede is dat bottleneck helpt om met nan-waarden om te gaan en het is ook erg efficiënt.


Antwoord 11, autoriteit 32%

Ik heb nog niet gecontroleerd hoe snel dit is, maar je zou het kunnen proberen:

from collections import deque
cache = deque() # keep track of seen values
n = 10          # window size
A = xrange(100) # some dummy iterable
cum_sum = 0     # initialize cumulative sum
for t, val in enumerate(A, 1):
    cache.append(val)
    cum_sum += val
    if t < n:
        avg = cum_sum / float(t)
    else:                           # if window is saturated,
        cum_sum -= cache.popleft()  # subtract oldest value
        avg = cum_sum / float(n)

12, gezag 25%

In plaats van numpy of scipy, ik zou panda’s aanraden om dit sneller te doen:

df['data'].rolling(3).mean()

Dit is van voortschrijdend gemiddelde (MA) 3 perioden van de kolom “data”. U kunt berekenen verschoven versies, bijvoorbeeld degene die uitsluit de huidige cel (verschoven achterstand) kan gemakkelijk worden berekend:

df['data'].shift(periods=1).rolling(3).mean()

13, gezag 25%

Python standaard bibliotheek oplossing

Deze generator-functie neemt een iterable en venstergrootte nen geeft het gemiddelde van de actuele waarden binnen het venster. Het maakt gebruik van een deque, wat een datastructuur vergelijkbaar met een lijst, maar geoptimaliseerd voor snelle aanpassingen (popappend) aan beide eindpunten .

from collections import deque
from itertools import islice
def sliding_avg(iterable, N):        
    it = iter(iterable)
    window = deque(islice(it, N))        
    num_vals = len(window)
    if num_vals < N:
        msg = 'window size {} exceeds total number of values {}'
        raise ValueError(msg.format(N, num_vals))
    N = float(N) # force floating point division if using Python 2
    s = sum(window)
    while True:
        yield s/N
        try:
            nxt = next(it)
        except StopIteration:
            break
        s = s - window.popleft() + nxt
        window.append(nxt)

Hier is de functie in actie:

>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>> 
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0

Antwoord 14, autoriteit 25%

Een beetje laat voor het feest, maar ik heb mijn eigen kleine functie gemaakt die NIET rond de uiteinden of pads met nullen wikkelt die dan ook worden gebruikt om het gemiddelde te vinden. Als een verdere traktatie is dat het ook het signaal opnieuw bemonstert op lineair uit elkaar geplaatste punten. Pas de code naar wens aan om andere functies te krijgen.

De methode is een eenvoudige matrixvermenigvuldiging met een genormaliseerde Gauss-kernel.

def running_mean(y_in, x_in, N_out=101, sigma=1):
    '''
    Returns running mean as a Bell-curve weighted average at evenly spaced
    points. Does NOT wrap signal around, or pad with zeros.
    Arguments:
    y_in -- y values, the values to be smoothed and re-sampled
    x_in -- x values for array
    Keyword arguments:
    N_out -- NoOf elements in resampled array.
    sigma -- 'Width' of Bell-curve in units of param x .
    '''
    import numpy as np
    N_in = len(y_in)
    # Gaussian kernel
    x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
    x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
    gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
    # Normalize kernel, such that the sum is one along axis 1
    normalization = np.tile(np.reshape(np.sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
    gauss_kernel_normalized = gauss_kernel / normalization
    # Perform running average as a linear operation
    y_out = gauss_kernel_normalized @ y_in
    return y_out, x_out

Een eenvoudig gebruik op een sinusvormig signaal met toegevoegde normaal verdeelde ruis:


Antwoord 15, autoriteit 18%

Er zijn hierboven veel antwoorden over het berekenen van een lopend gemiddelde. Mijn antwoord voegt twee extra functies toe:

  1. neegt nan-waarden
  2. berekent het gemiddelde voor de N aangrenzende waarden NIET inclusief de waarde van belang zelf

Deze tweede functie is vooral handig om te bepalen welke waarden in een bepaald opzicht afwijken van de algemene trend.

Ik gebruik numpy.cumsum omdat dit de meest tijdbesparende methode is (zie Alleo’s antwoord hierboven).

N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x) 
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)

Deze code werkt alleen voor even Ns. Het kan worden aangepast voor oneven getallen door de np.insert van padding_x en n_nan te wijzigen.

Voorbeelduitvoer (ruwe in zwart, movavg in blauw):

Deze code kan eenvoudig worden aangepast om alle voortschrijdende gemiddelde waarden te verwijderen die zijn berekend op basis van minder dan cutoff = 3 niet-nan-waarden.

window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)


Antwoord 16, autoriteit 18%

Er is een opmerking van mabbegraven in een van de antwoordenhierboven met deze methode. bottleneckheeft move_meanwat een eenvoudig voortschrijdend gemiddelde is :

import numpy as np
import bottleneck as bn
a = np.arange(10) + np.random.random(10)
mva = bn.move_mean(a, window=2, min_count=1)

min_countis een handige parameter die in feite het voortschrijdend gemiddelde naar dat punt in uw array zal brengen. Als u min_countniet instelt, is dit gelijk aan windowen is alles tot aan window-punten nan.


Antwoord 17, autoriteit 18%

Met de variabelen van @Aikude heb ik one-liner geschreven.

import numpy as np
mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
mean = [np.mean(mylist[x:x+N]) for x in range(len(mylist)-N+1)]
print(mean)
>>> [2.0, 3.0, 4.0, 5.0, 6.0]

Antwoord 18, autoriteit 18%

Alle bovengenoemde oplossingen zijn slecht omdat ze ontbreken

  • snelheid vanwege een native python in plaats van een numpy gevectoriseerde implementatie,
  • numerieke stabiliteit door slecht gebruik van numpy.cumsum, of
  • snelheid door O(len(x) * w)implementaties als windingen.

Gegeven

import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000

Merk op dat x_[:w].sum()gelijk is aan x[:w-1].sum(). Dus voor het eerste gemiddelde voegt numpy.cumsum(...)x[w] / wtoe (via x_[w+1] / w), en trekt 0af (van x_[0] / w). Dit resulteert in x[0:w].mean()

Via cumsum werk je het tweede gemiddelde bij door x[w+1] / wtoe te voegen en x[0] / waf te trekken, wat resulteert in x[1:w+1].mean().

Dit gaat door totdat x[-w:].mean()is bereikt.

x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w

Deze oplossing is vectorized, O(m), leesbaar en numeriek stabiel.


19, Autoriteit 14%

Deze vraag is nu zelfs ouder dan wanneer Nexus vorige maand over heeft geschreven, maar ik hou van hoe zijn code deals met randcases. Omdat het echter een “eenvoudig bewegend gemiddelde is”, vertragen de resultaten achter de gegevens die ze van toepassing zijn op. Ik dacht dat het op een meer bevredigende manier omgaan met randcases dan Numpy’s modi valid, same, en fullkan worden bereikt door een vergelijkbare aanpak toe te passen naar een convolution()gebaseerde methode.

Mijn bijdrage gebruikt een centraal hardingsgemiddelde om zijn resultaten met hun gegevens uit te lijnen. Wanneer er te weinig punten beschikbaar zijn voor het gebruikte venster met een groot formaat, worden het gebruik van gemiddelden berekend van achtereenvolgens kleinere ramen aan de randen van de array. [Eigenlijk, van achtereenvolgens grotere ramen, maar dat is een implementatiedetail.]

import numpy as np
def running_mean(l, N):
    # Also works for the(strictly invalid) cases when N is even.
    if (N//2)*2 == N:
        N = N - 1
    front = np.zeros(N//2)
    back = np.zeros(N//2)
    for i in range(1, (N//2)*2, 2):
        front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
    for i in range(1, (N//2)*2, 2):
        back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
    return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])

Het is relatief traag omdat het convolve()gebruikt, en het kan waarschijnlijk behoorlijk worden opgeknapt door een echte Pythonista, maar ik geloof dat het idee standhoudt.


Antwoord 20, autoriteit 11%

Laat me voor educatieve doeleinden nog twee Numpy-oplossingen toevoegen (die langzamer zijn dan de cumsum-oplossing):

import numpy as np
from numpy.lib.stride_tricks import as_strided
def ra_strides(arr, window):
    ''' Running average using as_strided'''
    n = arr.shape[0] - window + 1
    arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
    return arr_strided.mean(axis=1)
def ra_add(arr, window):
    ''' Running average using add.reduceat'''
    n = arr.shape[0] - window + 1
    indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
    arr = np.append(arr, 0)
    return np.add.reduceat(arr, indices )[::2]/window

Gebruikte functies: as_strided, toevoegen. verminderen op


Antwoord 21, autoriteit 11%

Gebruik alleen de standaardbibliotheek van Python (efficiënt geheugen)

Geef gewoon een andere versie van het gebruik van de standaardbibliotheek deque. Het is nogal een verrassing voor mij dat de meeste antwoorden gebruiken pandasof numpy.

def moving_average(iterable, n=3):
    d = deque(maxlen=n)
    for i in iterable:
        d.append(i)
        if len(d) == n:
            yield sum(d)/n
r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]

Eigenlijk vond ik een andere implementatie in Python Docs

def moving_average(iterable, n=3):
    # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable)
    d = deque(itertools.islice(it, n-1))
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

De implementatie lijkt me echter een beetje complexer dan het zou moeten zijn. Maar het moet om een ​​reden in de standaard Python-documenten zijn, kan iemand commentaar geven op de implementatie van de mijne en de standaard DOC?


22, Autoriteit 7%

Hoewel er oplossingen voor deze vraag zijn, kijk dan eens naar mijn oplossing. Het is heel eenvoudig en goed werken.

import numpy as np
dataset = np.asarray([1, 2, 3, 4, 5, 6, 7])
ma = list()
window = 3
for t in range(0, len(dataset)):
    if t+window <= len(dataset):
        indices = range(t, t+window)
        ma.append(np.average(np.take(dataset, indices)))
else:
    ma = np.asarray(ma)

Antwoord 23, autoriteit 7%

Na het lezen van de andere antwoorden denk ik niet dat de vraag hierom vroeg, maar ik kwam hier met de behoefte om een lopend gemiddelde bij te houden van een lijst met waarden die in omvang groeide.

Dus als u een lijst met waarden wilt bijhouden die u ergens vandaan haalt (een site, een meetapparaat, enz.) en het gemiddelde van de laatste n-waarden bijgewerkt, kunt u de onderstaande code, die de moeite van het toevoegen van nieuwe elementen minimaliseert:

class Running_Average(object):
    def __init__(self, buffer_size=10):
        """
        Create a new Running_Average object.
        This object allows the efficient calculation of the average of the last
        `buffer_size` numbers added to it.
        Examples
        --------
        >>> a = Running_Average(2)
        >>> a.add(1)
        >>> a.get()
        1.0
        >>> a.add(1)  # there are two 1 in buffer
        >>> a.get()
        1.0
        >>> a.add(2)  # there's a 1 and a 2 in the buffer
        >>> a.get()
        1.5
        >>> a.add(2)
        >>> a.get()  # now there's only two 2 in the buffer
        2.0
        """
        self._buffer_size = int(buffer_size)  # make sure it's an int
        self.reset()
    def add(self, new):
        """
        Add a new number to the buffer, or replaces the oldest one there.
        """
        new = float(new)  # make sure it's a float
        n = len(self._buffer)
        if n < self.buffer_size:  # still have to had numbers to the buffer.
            self._buffer.append(new)
            if self._average != self._average:  # ~ if isNaN().
                self._average = new  # no previous numbers, so it's new.
            else:
                self._average *= n  # so it's only the sum of numbers.
                self._average += new  # add new number.
                self._average /= (n+1)  # divide by new number of numbers.
        else:  # buffer full, replace oldest value.
            old = self._buffer[self._index]  # the previous oldest number.
            self._buffer[self._index] = new  # replace with new one.
            self._index += 1  # update the index and make sure it's...
            self._index %= self.buffer_size  # ... smaller than buffer_size.
            self._average -= old/self.buffer_size  # remove old one...
            self._average += new/self.buffer_size  # ...and add new one...
            # ... weighted by the number of elements.
    def __call__(self):
        """
        Return the moving average value, for the lazy ones who don't want
        to write .get .
        """
        return self._average
    def get(self):
        """
        Return the moving average value.
        """
        return self()
    def reset(self):
        """
        Reset the moving average.
        If for some reason you don't want to just create a new one.
        """
        self._buffer = []  # could use np.empty(self.buffer_size)...
        self._index = 0  # and use this to keep track of how many numbers.
        self._average = float('nan')  # could use np.NaN .
    def get_buffer_size(self):
        """
        Return current buffer_size.
        """
        return self._buffer_size
    def set_buffer_size(self, buffer_size):
        """
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
        Decreasing buffer size:
        >>> a.buffer_size = 6
        >>> a._buffer  # should not access this!!
        [9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        >>> a.buffer_size = 2
        >>> a._buffer
        [13.0, 14.0]
        Increasing buffer size:
        >>> a.buffer_size = 5
        Warning: no older data available!
        >>> a._buffer
        [13.0, 14.0]
        Keeping buffer size:
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
        >>> a.buffer_size = 10  # reorders buffer!
        >>> a._buffer
        [5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        """
        buffer_size = int(buffer_size)
        # order the buffer so index is zero again:
        new_buffer = self._buffer[self._index:]
        new_buffer.extend(self._buffer[:self._index])
        self._index = 0
        if self._buffer_size < buffer_size:
            print('Warning: no older data available!')  # should use Warnings!
        else:
            diff = self._buffer_size - buffer_size
            print(diff)
            new_buffer = new_buffer[diff:]
        self._buffer_size = buffer_size
        self._buffer = new_buffer
    buffer_size = property(get_buffer_size, set_buffer_size)

En je kunt het testen met bijvoorbeeld:

def graph_test(N=200):
    import matplotlib.pyplot as plt
    values = list(range(N))
    values_average_calculator = Running_Average(N/2)
    values_averages = []
    for value in values:
        values_average_calculator.add(value)
        values_averages.append(values_average_calculator())
    fig, ax = plt.subplots(1, 1)
    ax.plot(values, label='values')
    ax.plot(values_averages, label='averages')
    ax.grid()
    ax.set_xlim(0, N)
    ax.set_ylim(0, N)
    fig.show()

Wat geeft:


Antwoord 24, autoriteit 7%

Wat dacht je van een voortschrijdend gemiddelde filter? Het is ook een one-liner en heeft het voordeel, dat je het venstertype gemakkelijk kunt manipuleren als je iets anders nodig hebt dan de rechthoek, bijv. een N-lang eenvoudig voortschrijdend gemiddelde van een array a:

lfilter(np.ones(N)/N, [1], a)[N:]

En met het driehoekige venster toegepast:

lfilter(np.ones(N)*scipy.signal.triang(N)/N, [1], a)[N:]

Opmerking: ik gooi meestal de eerste N-samples weg als nep, vandaar [N:]aan het einde, maar het is niet nodig en het is alleen een kwestie van een persoonlijke keuze.


Antwoord 25, autoriteit 7%

Een nieuw convolve-recept is samengevoegdin Python 3.10.

Gegeven


import collections, operator
from itertools import chain, repeat
size = 3 + 1
kernel = [1/size] * size                                              

Code

def convolve(signal, kernel):
    # See:  https://betterexplained.com/articles/intuitive-convolution/
    # convolve(data, [0.25, 0.25, 0.25, 0.25]) --> Moving average (blur)
    # convolve(data, [1, -1]) --> 1st finite difference (1st derivative)
    # convolve(data, [1, -2, 1]) --> 2nd finite difference (2nd derivative)
    kernel = list(reversed(kernel))
    n = len(kernel)
    window = collections.deque([0] * n, maxlen=n)
    for x in chain(signal, repeat(0, n-1)):
        window.append(x)
        yield sum(map(operator.mul, kernel, window))

Demo

list(convolve(range(1, 6), kernel))
# [0.25, 0.75, 1.5, 2.5, 3.5, 3.0, 2.25, 1.25]

Details

Een convolutieis een algemene wiskundige bewerking die kan worden toegepast op voortschrijdende gemiddelden. Dit idee is dat je, gegeven sommige gegevens, een subset van gegevens (een venster) als een “masker” of “kernel” over de gegevens schuift, waarbij je een bepaalde wiskundige bewerking over elk venster uitvoert. In het geval van voortschrijdende gemiddelden is de kern het gemiddelde:

Je kunt deze implementatie nu gebruiken via more_itertools.convolve.
more_itertoolsis een populair pakket van derden; installeren via > pip install more_itertools.


Antwoord 26, autoriteit 4%

Een andere oplossing die gewoon een standaardbibliotheek en deque gebruikt:

from collections import deque
import itertools
def moving_average(iterable, n=3):
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable) 
    # create an iterable object from input argument
    d = deque(itertools.islice(it, n-1))  
    # create deque object by slicing iterable
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n
# example on how to use it
for i in  moving_average([40, 30, 50, 46, 39, 44]):
    print(i)
# 40.0
# 42.0
# 45.0
# 43.0

Antwoord 27

Als u ervoor kiest om uw eigen bibliotheek te gebruiken in plaats van een bestaande bibliotheek te gebruiken, houd dan rekening met drijvende-kommafouten en probeer de effecten ervan te minimaliseren:

class SumAccumulator:
    def __init__(self):
        self.values = [0]
        self.count = 0
    def add( self, val ):
        self.values.append( val )
        self.count = self.count + 1
        i = self.count
        while i & 0x01:
            i = i >> 1
            v0 = self.values.pop()
            v1 = self.values.pop()
            self.values.append( v0 + v1 )
    def get_total(self):
        return sum( reversed(self.values) )
    def get_size( self ):
        return self.count

Als al uw waarden ongeveer van dezelfde grootteorde zijn, helpt dit om de precisie te behouden door altijd waarden van ongeveer dezelfde grootte toe te voegen.

Other episodes