If you google with Python and parallel processing, you will see many articles about the module called multiprocessing, so I tried it with the article I wrote earlier.
[Python] Try optimizing FX systole parameters with a genetic algorithm
The genetic algorithm evaluates many individuals and performs genetic processing such as selection and crossing according to their fitness, but since the evaluation of each individual is completely independent, it is suitable for parallel processing. .. This time as well, I will apply parallel processing to the individual evaluation part.
Code for the genetic algorithm before parallelization. It is not a general purpose because it was used for optimizing the systole parameters in the above article. The code before and after is omitted.
def Optimize(ohlc, Prange):
def shift(x, n=1): return np.concatenate((np.zeros(n), x[:-n])) #Shift function
SlowMA = np.empty([len(Prange[0]), len(ohlc)]) #Long-term moving average
for i in range(len(Prange[0])):
SlowMA[i] = ind.iMA(ohlc, Prange[0][i])
FastMA = np.empty([len(Prange[1]), len(ohlc)]) #Short-term moving average
for i in range(len(Prange[1])):
FastMA[i] = ind.iMA(ohlc, Prange[1][i])
ExitMA = np.empty([len(Prange[2]), len(ohlc)]) #Moving average for payment
for i in range(len(Prange[2])):
ExitMA[i] = ind.iMA(ohlc, Prange[2][i])
Close = ohlc['Close'].values #closing price
M = 20 #Population
Eval = np.zeros([M, 6]) #Evaluation item
Param = InitParam(Prange, M) #Parameter initialization
gens = 0 #Number of generations
while gens < 100:
for k in range(M):
i0 = Param[k,0]
i1 = Param[k,1]
i2 = Param[k,2]
#Buy entry signal
BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
#Sell entry signal
SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
#Buy exit signal
BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
#Sell exit signal
SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
#Backtest
Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit)
Eval[k] = BacktestReport(Trade, PL)
#Alternation of generations
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
Let it run as it is and measure the execution time.
import time
start = time.perf_counter()
result = Optimize(ohlc, [SlowMAperiod, FastMAperiod, ExitMAperiod])
print("elapsed_time = {0} sec".format(time.perf_counter()-start))
elapsed_time = 11.180512751173708 sec
In the original code, it is between the for
statements that are parallelized. It takes some time to perform backtesting and evaluation with the parameters of each individual. When using multiprocessing, it seems easy to use the map
method, so first try replacing the for
statement with the map
function.
For that, we need to make the repeating part a function, but here is a little note. If you just want to make it a map
function, it is more convenient to define the function in the ʻOptimize function, but if you use multiprocessing, an error will occur. So, I defined the function with the name ʻevaluate
outside the ʻOptimize` function.
For convenience of passing to map
, I want to make only k
the argument of the ʻevaluatefunction. Therefore, the variables of technical indicators such as
SlowMA and
FastMAare global variables. However,
Param` is an argument of the function.
SlowMA = np.empty([len(SlowMAperiod), len(ohlc)]) #Long-term moving average
for i in range(len(SlowMAperiod)):
SlowMA[i] = ind.iMA(ohlc, SlowMAperiod[i])
FastMA = np.empty([len(FastMAperiod), len(ohlc)]) #Short-term moving average
for i in range(len(FastMAperiod)):
FastMA[i] = ind.iMA(ohlc, FastMAperiod[i])
ExitMA = np.empty([len(ExitMAperiod), len(ohlc)]) #Moving average for payment
for i in range(len(ExitMAperiod)):
ExitMA[i] = ind.iMA(ohlc, ExitMAperiod[i])
Close = ohlc['Close'].values #closing price
#Shift function
def shift(x, n=1):
return np.concatenate((np.zeros(n), x[:-n]))
#Function to process in parallel
def evaluate(k,Param):
i0 = Param[k,0]
i1 = Param[k,1]
i2 = Param[k,2]
#Buy entry signal
BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
#Sell entry signal
SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
#Buy exit signal
BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
#Sell exit signal
SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
#Backtest
Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit)
return BacktestReport(Trade, PL)
The following code is replaced with the map
function instead of the for
statement.
import functools
def Optimize(ohlc, Prange):
M = 20 #Population
Eval = np.zeros([M, 4]) #Evaluation item
Param = InitParam(Prange, M) #Parameter initialization
gens = 0 #Number of generations
while gens < 100:
#for k in range(M): Eval[k] = evaluate(k,Param)
Eval = np.array(list(map(functools.partial(evaluate, Param=Param), np.arange(M))))
#Alternation of generations
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
Actually, it wasn't easy to write with just map
. I put the repeating function ʻevaluate in the first argument of the
map function, but since there are two arguments of the ʻevaluate
function, the second argument Param
should be fixed to Param``. I'm using functools.partial
.
Also, the return value of map
is converted to NumPy array, but it seems that it must be converted to list before that. (It seems that it depends on the version of Python. I tried Python 3.5.1 this time.)
When I did this, I got the following results:
elapsed_time = 11.157917446009389 sec
Even if you change the for
statement to map
, the execution time does not change much.
multiprocessing
It's easy to introduce multiprocessing if it is replaced by the map
function.
import functools
import multiprocessing as mp
def Optimize(ohlc, Prange):
M = 20 #Population
Eval = np.zeros([M, 4]) #Evaluation item
Param = InitParam(Prange, M) #Parameter initialization
pool = mp.Pool() #Creating a process pool
gens = 0 #Number of generations
while gens < 100:
#for k in range(M): Eval[k] = evaluate(k,Param)
Eval = np.array(list(pool.map(functools.partial(evaluate, Param=Param), np.arange(M))))
#Alternation of generations
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
Just create a process pool with the Pool
class and replace the map
part with pool.map
. Specify the number of processes with the argument of Pool
. If no argument is written, all CPU threads will be used.
You can use all threads for simple processing, but since there is other code, using a little over half of the threads was the fastest.
Since it was 8 threads of Core i7, the result of running with Pool (5)
was
elapsed_time = 5.766524394366197 sec
is. It's about twice as fast. I expected it to be a little faster, but probably because there was genetic processing other than repeating the individual. For systems that take longer to backtest, parallelization may be a little more effective.
Recommended Posts