Wenn Sie mit Python und Parallelverarbeitung googeln, werden Sie viele Artikel über das Modul Multiprocessing sehen. Deshalb habe ich es mit dem Artikel versucht, den ich zuvor geschrieben habe.
[Python] Versuchen Sie, die FX-Systolenparameter mit einem genetischen Algorithmus zu optimieren
Der genetische Algorithmus bewertet viele Individuen und führt genetische Verarbeitungen wie Selektion und Kreuzung entsprechend ihrer Anpassungsfähigkeit durch. Da jedoch die Bewertung jedes Individuums völlig unabhängig ist, ist er für die parallele Verarbeitung geeignet. .. Auch dieses Mal werde ich die parallele Verarbeitung auf den einzelnen Bewertungsteil anwenden.
Code für den genetischen Algorithmus vor der Parallelisierung. Dies ist kein allgemeiner Zweck, da er zur Optimierung der Systolenparameter im obigen Artikel verwendet wurde. Der Code davor und danach wird weggelassen.
def Optimize(ohlc, Prange):
def shift(x, n=1): return np.concatenate((np.zeros(n), x[:-n])) #Schaltfunktion
SlowMA = np.empty([len(Prange[0]), len(ohlc)]) #Langfristiger gleitender Durchschnitt
for i in range(len(Prange[0])):
SlowMA[i] = ind.iMA(ohlc, Prange[0][i])
FastMA = np.empty([len(Prange[1]), len(ohlc)]) #Kurzfristiger gleitender Durchschnitt
for i in range(len(Prange[1])):
FastMA[i] = ind.iMA(ohlc, Prange[1][i])
ExitMA = np.empty([len(Prange[2]), len(ohlc)]) #Mobiler Durchschnitt für die Zahlung
for i in range(len(Prange[2])):
ExitMA[i] = ind.iMA(ohlc, Prange[2][i])
Close = ohlc['Close'].values #Schlusskurs
M = 20 #Anzahl der Personen
Eval = np.zeros([M, 6]) #Bewertungsgegenstand
Param = InitParam(Prange, M) #Parameterinitialisierung
gens = 0 #Anzahl der Generationen
while gens < 100:
for k in range(M):
i0 = Param[k,0]
i1 = Param[k,1]
i2 = Param[k,2]
#Einstiegssignal kaufen
BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
#Eingangssignal verkaufen
SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
#Ausgangssignal kaufen
BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
#Ausgangssignal verkaufen
SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
#Backtest
Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit)
Eval[k] = BacktestReport(Trade, PL)
#Generationswechsel
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
Lassen Sie es so laufen wie es ist und messen Sie die Ausführungszeit.
import time
start = time.perf_counter()
result = Optimize(ohlc, [SlowMAperiod, FastMAperiod, ExitMAperiod])
print("elapsed_time = {0} sec".format(time.perf_counter()-start))
elapsed_time = 11.180512751173708 sec
Im ursprünglichen Code werden die for-Anweisungen parallelisiert. Es dauert einige Zeit, um Backtesting und Evaluierung mit den Parametern jedes Einzelnen durchzuführen. Wenn Sie Multiprocessing verwenden, scheint es einfach zu sein, die "map" -Methode zu verwenden. Versuchen Sie daher zunächst, die "for" -Anweisung durch die "map" -Funktion zu ersetzen.
Dafür müssen wir den sich wiederholenden Teil zu einer Funktion machen, aber hier gibt es einen kleinen Hinweis. Wenn Sie es nur zu einer "Map" -Funktion machen möchten, ist es bequemer, die Funktion in der "Optimize" -Funktion zu definieren. Wenn Sie jedoch Multiprocessing verwenden, tritt ein Fehler auf. Also habe ich die Funktion mit dem Namen "evaluieren" außerhalb der Funktion "Optimieren" definiert.
Um die Übergabe an "map" zu vereinfachen, möchte ich nur "k" zum Argument der Funktion "evaluieren" machen. Daher sind die Variablen technischer Indikatoren wie "SlowMA" und "FastMA" globale Variablen. Param
ist jedoch ein Argument der Funktion.
SlowMA = np.empty([len(SlowMAperiod), len(ohlc)]) #Langfristiger gleitender Durchschnitt
for i in range(len(SlowMAperiod)):
SlowMA[i] = ind.iMA(ohlc, SlowMAperiod[i])
FastMA = np.empty([len(FastMAperiod), len(ohlc)]) #Kurzfristiger gleitender Durchschnitt
for i in range(len(FastMAperiod)):
FastMA[i] = ind.iMA(ohlc, FastMAperiod[i])
ExitMA = np.empty([len(ExitMAperiod), len(ohlc)]) #Mobiler Durchschnitt für die Zahlung
for i in range(len(ExitMAperiod)):
ExitMA[i] = ind.iMA(ohlc, ExitMAperiod[i])
Close = ohlc['Close'].values #Schlusskurs
#Schaltfunktion
def shift(x, n=1):
return np.concatenate((np.zeros(n), x[:-n]))
#Funktion zur parallelen Verarbeitung
def evaluate(k,Param):
i0 = Param[k,0]
i1 = Param[k,1]
i2 = Param[k,2]
#Einstiegssignal kaufen
BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
#Eingangssignal verkaufen
SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
#Ausgangssignal kaufen
BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
#Ausgangssignal verkaufen
SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
#Backtest
Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit)
return BacktestReport(Trade, PL)
Der folgende Code wird durch die Funktion "map" anstelle der Anweisung "for" ersetzt.
import functools
def Optimize(ohlc, Prange):
M = 20 #Anzahl der Personen
Eval = np.zeros([M, 4]) #Bewertungsgegenstand
Param = InitParam(Prange, M) #Parameterinitialisierung
gens = 0 #Anzahl der Generationen
while gens < 100:
#for k in range(M): Eval[k] = evaluate(k,Param)
Eval = np.array(list(map(functools.partial(evaluate, Param=Param), np.arange(M))))
#Generationswechsel
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
Eigentlich war es nicht einfach, nur mit "map" zu schreiben. Ich habe die Wiederholungsfunktion "evaluieren" in das erste Argument der Funktion "map" eingefügt, aber da es zwei Argumente der Funktion "evaluieren" gibt, sollte das zweite Argument "Param" auf "Param" festgelegt werden Ich benutze functools.partial`.
Auch der Rückgabewert von "map" wird in ein NumPy-Array konvertiert, aber es scheint, dass er vorher in eine Liste konvertiert werden muss. (Es scheint, dass es von der Version von Python abhängt. Ich habe diesmal Python 3.5.1 ausprobiert.)
Als ich das tat, bekam ich die folgenden Ergebnisse:
elapsed_time = 11.157917446009389 sec
Selbst wenn Sie die Anweisung "for" in "map" ändern, ändert sich die Ausführungszeit nicht wesentlich.
multiprocessing
Es ist einfach, Multiprocessing einzuführen, wenn es durch die Map-Funktion ersetzt wird.
import functools
import multiprocessing as mp
def Optimize(ohlc, Prange):
M = 20 #Anzahl der Personen
Eval = np.zeros([M, 4]) #Bewertungsgegenstand
Param = InitParam(Prange, M) #Parameterinitialisierung
pool = mp.Pool() #Prozesspool erstellen
gens = 0 #Anzahl der Generationen
while gens < 100:
#for k in range(M): Eval[k] = evaluate(k,Param)
Eval = np.array(list(pool.map(functools.partial(evaluate, Param=Param), np.arange(M))))
#Generationswechsel
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
Erstellen Sie einfach einen Prozesspool mit der Klasse "Pool" und ersetzen Sie den Teil "map" durch "pool.map". Geben Sie die Anzahl der Prozesse mit dem Argument "Pool" an. Wenn kein Argument geschrieben wird, werden alle CPU-Threads verwendet.
Sie können alle Threads für die einfache Verarbeitung verwenden, aber da es anderen Code gibt, war diesmal die Verwendung von etwas mehr als der Hälfte der Threads am schnellsten.
Da es sich um 8 Threads von Corei7 handelte, war das Ergebnis der Ausführung mit "Pool (5)"
elapsed_time = 5.766524394366197 sec
ist. Es ist ungefähr doppelt so schnell. Ich hatte erwartet, dass es etwas schneller sein würde, aber wahrscheinlich, weil es eine andere genetische Verarbeitung gab als die Wiederholung des Individuums. Bei Systemen, deren Backtest länger dauert, ist die Parallelisierung möglicherweise etwas effektiver.
Recommended Posts