[PYTHON] Code für das selbstorganisierende Zustandsraummodell (unvollendet)

Motivation

Ich brauchte ein hochgenaues Zeitreihen-Vorhersagemodell für die Änderungspunkterkennung, also habe ich versucht, einen Code zu erstellen. Schreiben Sie zuerst das Ergebnis. Ich wollte sagen: "Ich konnte eine gute Genauigkeit erzielen!", Aber ich bekam einen halbfertigen Code mit seltsamem Verhalten ... orz In Zukunft werden wir es wieder erhöhen, sobald es korrigiert werden kann. ..

Verweise

"Vorreiter der Analyse finanzieller Zeitreihen durch ein selbstorganisiertes Zustandsraummodell mit anfänglicher Verteilungssuche: Anwendung auf das stochastische Volatilitätsschwankungsmodell mit t-Verteilung" fasst die Probleme des Monte-Carlo-Filters zusammen und sucht weiter nach der optimalen anfänglichen Verteilung. Die Methode wurde ebenfalls vorgeschlagen und war ein sehr interessantes Material. Ich wollte ein Produkt erstellen, das die Optimierung der Erstverteilung in Bezug darauf beinhaltete, aber am Ende hatte ich ein Produkt, das weit davon entfernt war. ..

Probleme mit dem Monte-Carlo-Filter

Das Monte-Carlo-Filter ist ein sehr flexibler Zustandsschätzungsalgorithmus, mit dem Sie instationäre nicht-Gaußsche Modelle erstellen können. Die folgenden Punkte werden jedoch als Probleme angesprochen.

In Bezug auf die Parameterschätzung im Monte-Carlo-Filter enthält (1) die geschätzte Menge der Wahrscheinlichkeitsfunktion für den Parameter den Fehler nach der Monte-Carlo-Methode, und (2) die Differenzierung der Wahrscheinlichkeitsfunktion ist schwer zu berechnen (Selbstorganisation mit anfänglicher Verteilungssuche). Vorreiter der Analyse finanzieller Zeitreihen nach dem Zustandsraummodell P.4)

Um dies zu lösen, wurde ein Verfahren unter Verwendung eines Optimierungsverfahrens vorgeschlagen, das als Nelder-Mead-Verfahren (nichtlineares Optimierungsverfahren vom linearen Suchtyp) bezeichnet wird. Dieses Verfahren lässt jedoch das Problem offen, dass, wenn die Konvergenzbedingungen nicht signifikant gelockert werden, die Konvergenz nicht auftritt und die Streuung der Parameterschätzungen zunimmt. Andererseits ist das selbstorganisierte Zustandsraummodell eine Methode zum Übertragen von Parametern in den Zustandsraum, um eine Bayes'sche Schätzung durchzuführen. In "Vorab der Analyse finanzieller Zeitreihen durch ein selbstorganisiertes Zustandsraummodell mit anfänglicher Verteilungssuche" wird eine Methode zur Konstruktion der anfänglichen Verteilung (wahrscheinlichste Schätzmethode) unter Verwendung der NM-Methode vorgeschlagen. Informationen zum Monte-Carlo-Filter finden Sie unter Implementierung eines einfachen Partikelfilters.

Modell-

Ich habe den Code für das folgende Modell erstellt. x_t=Fx_{t-1}+Gv_t y_t=Hx_{t}+w_t, w_t~C(0,σ_t^2) v_{t,t_t}~C(0,τ_t^2) v_{t,{τ_t^2}}~C(0,ν^2) v_{t,{σ_t^2}}~C(0,ξ^2) x_t=[t_t,log_{10}τ_t^2,log_{10}σ_t^2]^T v_{t}=[v_{t,t_t},v_{t,{τ_t^2}},v_{t,{σ_t^2}}]^T

Wie in "Statistisches Modell vom Typ Wissensentdeckung und Selbstorganisation" beschrieben, muss beispielsweise bei der Schätzung von $ σ ^ 2 $ ein positiver Wert garantiert werden. Schätzen Sie also $ log_ {10} σ ^ 2 $. gehen.

Dann werde ich den folgenden Code schreiben.

Referenzcode

python


# coding: utf-8

from math import log, pow, sqrt
import numpy as np
from scipy.stats import norm, cauchy
from numpy.random import uniform, multivariate_normal
from multiprocessing import Pool
import matplotlib.pyplot as plt


class ParticleFilter:
    nu = 0.01 #Ultra-Super-Parameter der Systemrauschskala
    xi = 0.03 #Ultra-Super-Parameter der Beobachtungsrauschskala
    log_likelihood = 0.0 #Log-Wahrscheinlichkeit
    TIME = 1
    PR=8 # unmber of processing
    
    def __init__(self, PARTICLES_NUM, k=1, ydim=1, pdim=2):
        self.PARTICLES_NUM = PARTICLES_NUM #Anzahl der Partikel
        self.TEETH_OF_COMB = np.arange(0, 1, float(1.0)/self.PARTICLES_NUM)
        self.weights = np.zeros((ydim, self.PARTICLES_NUM))
        self.particles = np.zeros((k*ydim+pdim ,self.PARTICLES_NUM))
        self.predicted_particles = np.zeros((k*ydim+pdim , self.PARTICLES_NUM))
        np.random.seed(555)
        self.predicted_value = []
        self.filtered_value = []
        self.LSM = np.zeros(ydim) #Quadratischer Fehler

        self.F, self.G, self.H = self.FGHset(k, ydim, pdim)
        self.k = k
        self.ydim = ydim
        self.pdim = pdim
    
    def init_praticles_distribution(self, P, r):
        """initialize particles
        x_0|0
        tau_0|0
        sigma_0|0
        """
        data_particles = multivariate_normal([1]*self.ydim*self.k,
                            np.eye(self.ydim*self.k)*10, self.PARTICLES_NUM).T
        param_particles = np.zeros((self.pdim, self.PARTICLES_NUM))
        for i in xrange(self.pdim):
            param_particles[i,:] = uniform(P-r, P+r, self.PARTICLES_NUM)
        self.particles = np.vstack((data_particles, param_particles))
        
    def get_system_noise(self):
        """v_t vector"""
        data_noise = cauchy.rvs(loc=[0]*self.PARTICLES_NUM, scale=np.power(10,self.particles[self.ydim])) #size=self.PARTICLES_NUM
        data_noise[data_noise==float("-inf")] = -1e308
        data_noise[data_noise==float("inf")] = 1e308
        parameta_noise_sys = cauchy.rvs(loc=0, scale=self.xi, size=self.PARTICLES_NUM) # noise of hyper parameter in system model
        parameta_noise_ob = cauchy.rvs(loc=0, scale=self.nu, size=self.PARTICLES_NUM) # noise of hyper parameter in observation model

        #print np.vstack((data_noise, parameta_noise_sys, parameta_noise_ob))
        return np.vstack((data_noise, parameta_noise_sys, parameta_noise_ob))
        
    def calc_pred_particles(self):
        """calculate system function
        x_t|t-1 = F*x_t-1 + Gv_t
        """
        return self.F.dot(self.particles) + self.G.dot(self.get_system_noise()) # linear non-Gaussian  
        
    def calc_particles_weight(self,y):
        """calculate fitness probabilities between observation value and predicted value
        w_t
        """
        locs = self.calc_pred_particles()
        self.predicted_particles = locs
        scale=np.power(10,locs[-1])
        scale[scale==0] = 1e-308
        
        self.weights = cauchy.pdf( np.array([y]*self.PARTICLES_NUM) - self.H.dot(locs), loc=[0]*self.PARTICLES_NUM,
                                scale=scale ).flatten()
        
    def calc_likelihood(self):
        """calculate likelihood at that point
        p(y_t|y_1:t-1)
        """
        res = np.sum(self.weights)/self.PARTICLES_NUM
        #print res
        self.log_likelihood += log(res)
      
    def normalize_weights(self):
        """wtilda_t"""
        self.weights = self.weights/np.sum(self.weights)
      
    def resample(self,y):
        """x_t|t"""
        self.normalize_weights()

        self.memorize_predicted_value()

        # accumulate weight
        cum = np.cumsum(self.weights)
        
        # create roulette pointer 
        base = uniform(0,float(1.0)/self.PARTICLES_NUM)
        pointers = self.TEETH_OF_COMB + base

        # select particles
        selected_idx = [np.where(cum>=p)[0][0] for p in pointers]
        """
        pool = Pool(processes=self.PR)
        selected_idx = pool.map(get_slected_particles_idx, ((cum,p) for p in pointers))
        pool.close()
        pool.join()     
        """

        self.particles = self.predicted_particles[:,selected_idx]
        self.memorize_filtered_value(selected_idx, y)
    
    def memorize_predicted_value(self):
        predicted_value = np.sum(self.predicted_particles*self.weights, axis=1)[0]
        self.predicted_value.append(predicted_value)

    def memorize_filtered_value(self, selected_idx, y):
        filtered_value = np.sum(self.particles*self.weights[selected_idx] , axis=1) \
                            /np.sum(self.weights[selected_idx])
        self.filtered_value.append(filtered_value[0])
        self.calculate_LSM(y,filtered_value[:self.ydim])

    def calculate_LSM(self,y,filterd_value):
        self.LSM += pow(y-filterd_value,2)

    def forward(self,y):
        """compute system model and observation model"""
        print 'calculating time at %d' % self.TIME
        self.calc_pred_particles()
        self.calc_particles_weight(y)
        self.calc_likelihood()
        self.resample(y)
        self.TIME += 1

    def FGHset(self, k, vn_y, n_ss_parameters):
        """
        vn_y:input dimension
        n_ss_parameters:number of hyper parameter 
        k:difference
        """
        G_upper_block = np.zeros((k*vn_y, vn_y+n_ss_parameters))
        G_lower_block = np.zeros((n_ss_parameters, vn_y+n_ss_parameters))
        G_lower_block[-n_ss_parameters:, -n_ss_parameters:] = np.eye(n_ss_parameters)
        G_upper_block[:vn_y, :vn_y] = np.eye(vn_y)
        G = np.vstack( (G_upper_block, G_lower_block) )
        
        H = np.hstack( (np.eye(vn_y), 
                            np.zeros((vn_y, vn_y*(k-1)+n_ss_parameters))
                        ) )
              
        F_upper_block = np.zeros((k*vn_y, k*vn_y+n_ss_parameters))
        F_lower_block = np.zeros((n_ss_parameters, k*vn_y+n_ss_parameters))
        F_lower_block[-n_ss_parameters:, -n_ss_parameters:] = np.eye(n_ss_parameters)
        if k==1:
            F_upper_block[:vn_y, :vn_y] = np.eye(vn_y)
        elif k==2:
            F_upper_block[:vn_y, :vn_y] = np.eye(vn_y)*2
            F_upper_block[:vn_y, vn_y:k*vn_y] = np.eye(vn_y)*-1
            F_upper_block[vn_y:k*vn_y, :vn_y] = np.eye(vn_y)
        F = np.vstack((F_upper_block, F_lower_block))
        
        return F, G, H

def get_slected_particles_idx((cum,p)):
    """multiprocessing function"""
    try:
        return np.where(cum>=p)[0][0]
    
    except Exception, e:
        import sys
        import traceback
        sys.stderr.write(traceback.format_exc())    

if __name__=='__main__':
    n_particle = 10000
    pf = ParticleFilter(n_particle, k=1, ydim=1, pdim=2)
    pf.init_praticles_distribution(0, 8) # P, r
    
    data = np.hstack((norm.rvs(0,1,size=20),norm.rvs(10,1,size=60),norm.rvs(-30,0.5,size=20)))
    
    for d in data:
        pf.forward(d)
    print 'log likelihood:', pf.log_likelihood
    print 'LSM:', pf.LSM
    
    rng = range(100)
    plt.plot(rng,data,label=u"training data")
    plt.plot(rng,pf.predicted_value,label=u"predicted data")
    plt.plot(rng,pf.filtered_value,label=u"filtered data")
    plt.xlabel('TIME',fontsize=18)
    plt.ylabel('Value',fontsize=18)    
    plt.legend() 
    plt.show()

Versuchsergebnis

Es ist ein Code voller Fehler, der im resultierenden Diagramm nicht angezeigt werden muss ...

Kommentar

Schade, dass diesmal die Zeit zur Hälfte abgelaufen ist. Korrigieren Sie es, damit Sie in Zukunft anständige Ergebnisse erzielen können, und erhöhen Sie es erneut. (Es tut mir leid, eine halbfertige Sache zu veröffentlichen, die nicht hilfreich ist, aber diesmal habe ich sie als Memo für mich hochgeladen.)

Wir entschuldigen uns für die Unannehmlichkeiten, aber wenn Sie einen Fehler machen, würden wir uns freuen, wenn Sie darauf hinweisen könnten.

Recommended Posts

Code für das selbstorganisierende Zustandsraummodell (unvollendet)
Code für das selbstorganisierte Zustandsraummodell (modifizierte Version)
Techniken zum Testen von Code?
Persönliches Python-Code-Memo
Testcode zur Bewertung von Dekorateuren
[Python] Beispielcode für die Python-Grammatik