Ich brauchte ein hochgenaues Zeitreihen-Vorhersagemodell für die Änderungspunkterkennung, also habe ich versucht, einen Code zu erstellen. Schreiben Sie zuerst das Ergebnis. Ich wollte sagen: "Ich konnte eine gute Genauigkeit erzielen!", Aber ich bekam einen halbfertigen Code mit seltsamem Verhalten ... orz In Zukunft werden wir es wieder erhöhen, sobald es korrigiert werden kann. ..
"Vorreiter der Analyse finanzieller Zeitreihen durch ein selbstorganisiertes Zustandsraummodell mit anfänglicher Verteilungssuche: Anwendung auf das stochastische Volatilitätsschwankungsmodell mit t-Verteilung" fasst die Probleme des Monte-Carlo-Filters zusammen und sucht weiter nach der optimalen anfänglichen Verteilung. Die Methode wurde ebenfalls vorgeschlagen und war ein sehr interessantes Material. Ich wollte ein Produkt erstellen, das die Optimierung der Erstverteilung in Bezug darauf beinhaltete, aber am Ende hatte ich ein Produkt, das weit davon entfernt war. ..
Das Monte-Carlo-Filter ist ein sehr flexibler Zustandsschätzungsalgorithmus, mit dem Sie instationäre nicht-Gaußsche Modelle erstellen können. Die folgenden Punkte werden jedoch als Probleme angesprochen.
In Bezug auf die Parameterschätzung im Monte-Carlo-Filter enthält (1) die geschätzte Menge der Wahrscheinlichkeitsfunktion für den Parameter den Fehler nach der Monte-Carlo-Methode, und (2) die Differenzierung der Wahrscheinlichkeitsfunktion ist schwer zu berechnen (Selbstorganisation mit anfänglicher Verteilungssuche). Vorreiter der Analyse finanzieller Zeitreihen nach dem Zustandsraummodell P.4)
Um dies zu lösen, wurde ein Verfahren unter Verwendung eines Optimierungsverfahrens vorgeschlagen, das als Nelder-Mead-Verfahren (nichtlineares Optimierungsverfahren vom linearen Suchtyp) bezeichnet wird. Dieses Verfahren lässt jedoch das Problem offen, dass, wenn die Konvergenzbedingungen nicht signifikant gelockert werden, die Konvergenz nicht auftritt und die Streuung der Parameterschätzungen zunimmt. Andererseits ist das selbstorganisierte Zustandsraummodell eine Methode zum Übertragen von Parametern in den Zustandsraum, um eine Bayes'sche Schätzung durchzuführen. In "Vorab der Analyse finanzieller Zeitreihen durch ein selbstorganisiertes Zustandsraummodell mit anfänglicher Verteilungssuche" wird eine Methode zur Konstruktion der anfänglichen Verteilung (wahrscheinlichste Schätzmethode) unter Verwendung der NM-Methode vorgeschlagen. Informationen zum Monte-Carlo-Filter finden Sie unter Implementierung eines einfachen Partikelfilters.
Ich habe den Code für das folgende Modell erstellt.
Wie in "Statistisches Modell vom Typ Wissensentdeckung und Selbstorganisation" beschrieben, muss beispielsweise bei der Schätzung von $ σ ^ 2 $ ein positiver Wert garantiert werden. Schätzen Sie also $ log_ {10} σ ^ 2 $. gehen.
Dann werde ich den folgenden Code schreiben.
python
# coding: utf-8
from math import log, pow, sqrt
import numpy as np
from scipy.stats import norm, cauchy
from numpy.random import uniform, multivariate_normal
from multiprocessing import Pool
import matplotlib.pyplot as plt
class ParticleFilter:
nu = 0.01 #Ultra-Super-Parameter der Systemrauschskala
xi = 0.03 #Ultra-Super-Parameter der Beobachtungsrauschskala
log_likelihood = 0.0 #Log-Wahrscheinlichkeit
TIME = 1
PR=8 # unmber of processing
def __init__(self, PARTICLES_NUM, k=1, ydim=1, pdim=2):
self.PARTICLES_NUM = PARTICLES_NUM #Anzahl der Partikel
self.TEETH_OF_COMB = np.arange(0, 1, float(1.0)/self.PARTICLES_NUM)
self.weights = np.zeros((ydim, self.PARTICLES_NUM))
self.particles = np.zeros((k*ydim+pdim ,self.PARTICLES_NUM))
self.predicted_particles = np.zeros((k*ydim+pdim , self.PARTICLES_NUM))
np.random.seed(555)
self.predicted_value = []
self.filtered_value = []
self.LSM = np.zeros(ydim) #Quadratischer Fehler
self.F, self.G, self.H = self.FGHset(k, ydim, pdim)
self.k = k
self.ydim = ydim
self.pdim = pdim
def init_praticles_distribution(self, P, r):
"""initialize particles
x_0|0
tau_0|0
sigma_0|0
"""
data_particles = multivariate_normal([1]*self.ydim*self.k,
np.eye(self.ydim*self.k)*10, self.PARTICLES_NUM).T
param_particles = np.zeros((self.pdim, self.PARTICLES_NUM))
for i in xrange(self.pdim):
param_particles[i,:] = uniform(P-r, P+r, self.PARTICLES_NUM)
self.particles = np.vstack((data_particles, param_particles))
def get_system_noise(self):
"""v_t vector"""
data_noise = cauchy.rvs(loc=[0]*self.PARTICLES_NUM, scale=np.power(10,self.particles[self.ydim])) #size=self.PARTICLES_NUM
data_noise[data_noise==float("-inf")] = -1e308
data_noise[data_noise==float("inf")] = 1e308
parameta_noise_sys = cauchy.rvs(loc=0, scale=self.xi, size=self.PARTICLES_NUM) # noise of hyper parameter in system model
parameta_noise_ob = cauchy.rvs(loc=0, scale=self.nu, size=self.PARTICLES_NUM) # noise of hyper parameter in observation model
#print np.vstack((data_noise, parameta_noise_sys, parameta_noise_ob))
return np.vstack((data_noise, parameta_noise_sys, parameta_noise_ob))
def calc_pred_particles(self):
"""calculate system function
x_t|t-1 = F*x_t-1 + Gv_t
"""
return self.F.dot(self.particles) + self.G.dot(self.get_system_noise()) # linear non-Gaussian
def calc_particles_weight(self,y):
"""calculate fitness probabilities between observation value and predicted value
w_t
"""
locs = self.calc_pred_particles()
self.predicted_particles = locs
scale=np.power(10,locs[-1])
scale[scale==0] = 1e-308
self.weights = cauchy.pdf( np.array([y]*self.PARTICLES_NUM) - self.H.dot(locs), loc=[0]*self.PARTICLES_NUM,
scale=scale ).flatten()
def calc_likelihood(self):
"""calculate likelihood at that point
p(y_t|y_1:t-1)
"""
res = np.sum(self.weights)/self.PARTICLES_NUM
#print res
self.log_likelihood += log(res)
def normalize_weights(self):
"""wtilda_t"""
self.weights = self.weights/np.sum(self.weights)
def resample(self,y):
"""x_t|t"""
self.normalize_weights()
self.memorize_predicted_value()
# accumulate weight
cum = np.cumsum(self.weights)
# create roulette pointer
base = uniform(0,float(1.0)/self.PARTICLES_NUM)
pointers = self.TEETH_OF_COMB + base
# select particles
selected_idx = [np.where(cum>=p)[0][0] for p in pointers]
"""
pool = Pool(processes=self.PR)
selected_idx = pool.map(get_slected_particles_idx, ((cum,p) for p in pointers))
pool.close()
pool.join()
"""
self.particles = self.predicted_particles[:,selected_idx]
self.memorize_filtered_value(selected_idx, y)
def memorize_predicted_value(self):
predicted_value = np.sum(self.predicted_particles*self.weights, axis=1)[0]
self.predicted_value.append(predicted_value)
def memorize_filtered_value(self, selected_idx, y):
filtered_value = np.sum(self.particles*self.weights[selected_idx] , axis=1) \
/np.sum(self.weights[selected_idx])
self.filtered_value.append(filtered_value[0])
self.calculate_LSM(y,filtered_value[:self.ydim])
def calculate_LSM(self,y,filterd_value):
self.LSM += pow(y-filterd_value,2)
def forward(self,y):
"""compute system model and observation model"""
print 'calculating time at %d' % self.TIME
self.calc_pred_particles()
self.calc_particles_weight(y)
self.calc_likelihood()
self.resample(y)
self.TIME += 1
def FGHset(self, k, vn_y, n_ss_parameters):
"""
vn_y:input dimension
n_ss_parameters:number of hyper parameter
k:difference
"""
G_upper_block = np.zeros((k*vn_y, vn_y+n_ss_parameters))
G_lower_block = np.zeros((n_ss_parameters, vn_y+n_ss_parameters))
G_lower_block[-n_ss_parameters:, -n_ss_parameters:] = np.eye(n_ss_parameters)
G_upper_block[:vn_y, :vn_y] = np.eye(vn_y)
G = np.vstack( (G_upper_block, G_lower_block) )
H = np.hstack( (np.eye(vn_y),
np.zeros((vn_y, vn_y*(k-1)+n_ss_parameters))
) )
F_upper_block = np.zeros((k*vn_y, k*vn_y+n_ss_parameters))
F_lower_block = np.zeros((n_ss_parameters, k*vn_y+n_ss_parameters))
F_lower_block[-n_ss_parameters:, -n_ss_parameters:] = np.eye(n_ss_parameters)
if k==1:
F_upper_block[:vn_y, :vn_y] = np.eye(vn_y)
elif k==2:
F_upper_block[:vn_y, :vn_y] = np.eye(vn_y)*2
F_upper_block[:vn_y, vn_y:k*vn_y] = np.eye(vn_y)*-1
F_upper_block[vn_y:k*vn_y, :vn_y] = np.eye(vn_y)
F = np.vstack((F_upper_block, F_lower_block))
return F, G, H
def get_slected_particles_idx((cum,p)):
"""multiprocessing function"""
try:
return np.where(cum>=p)[0][0]
except Exception, e:
import sys
import traceback
sys.stderr.write(traceback.format_exc())
if __name__=='__main__':
n_particle = 10000
pf = ParticleFilter(n_particle, k=1, ydim=1, pdim=2)
pf.init_praticles_distribution(0, 8) # P, r
data = np.hstack((norm.rvs(0,1,size=20),norm.rvs(10,1,size=60),norm.rvs(-30,0.5,size=20)))
for d in data:
pf.forward(d)
print 'log likelihood:', pf.log_likelihood
print 'LSM:', pf.LSM
rng = range(100)
plt.plot(rng,data,label=u"training data")
plt.plot(rng,pf.predicted_value,label=u"predicted data")
plt.plot(rng,pf.filtered_value,label=u"filtered data")
plt.xlabel('TIME',fontsize=18)
plt.ylabel('Value',fontsize=18)
plt.legend()
plt.show()
Es ist ein Code voller Fehler, der im resultierenden Diagramm nicht angezeigt werden muss ...
Schade, dass diesmal die Zeit zur Hälfte abgelaufen ist. Korrigieren Sie es, damit Sie in Zukunft anständige Ergebnisse erzielen können, und erhöhen Sie es erneut. (Es tut mir leid, eine halbfertige Sache zu veröffentlichen, die nicht hilfreich ist, aber diesmal habe ich sie als Memo für mich hochgeladen.)
Wir entschuldigen uns für die Unannehmlichkeiten, aber wenn Sie einen Fehler machen, würden wir uns freuen, wenn Sie darauf hinweisen könnten.
Recommended Posts