Thanks. The other day, I promised to go to a public bath with my juniors, but I was refused, "I'm sorry, I missed the last train with a girl ...", and the work progressed with anger and the article was completed.
So, I introduced it in the previous article, Forecasting the Nikkei 225 with Pytorch ~ Intermission ~.
Using Deep Learning Neural Networks and Candlestick Chart Representation to Predict Stock Market
I've created, coded, and learned the dataset for, so I'll keep it as a memorandum.
The dataset used here is like a chart, but it's a bit special. (For details, please see the paper !!)
What was created with reference to the paper

In this way, the image is 48x48 and the top and bottom are represented by red and green. Below is the code.
candle_make.ipynb
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import csv
import glob
import pandas as pd
import numpy as np
import sys
import matplotlib.pyplot as plt
import datetime
import openpyxl
 
#Candlestick depiction
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
path = "/content/drive/My Drive/Colab/pytorch_pre/"
sys.path.append("/content/drive/My Drive/Colab/pytorch_pre")
import mpl_finance
Library related import. Since google colaboratory is used, path etc. indicates the location where the files on the drive are stored. Probably unusual is mpl_finance. It can be downloaded from git hub. It is a convenient one that can be used to depict candlesticks. mlp_finance
candle_make.ipynb
future_num = 1
fname = path + "data/nikkei_heikin_x3.csv"
flist = glob.glob(fname)
for file in flist:
  dt = pd.read_csv(file, header=0, encoding="utf-8_sig", index_col='Datetime')
dt = dt.sort_values('Datetime')
print(dt)
future_price = dt.iloc[future_num:-18]['closing price'].values
curr_price = dt.iloc[:-18-future_num]['closing price'].values
    
#future_Treat the price compared with num days later as the correct label
y_data_tmp = future_price / curr_price
#Prepare a list for correct labels
y_data = np.zeros_like(y_data_tmp)
y_data_nam = np.zeros(len(y_data_tmp)+1, dtype=object)
Y_data = np.zeros((len(y_data_tmp)+1, 2), dtype=object)
Y_columns = ["label","ImageName"]
#Future to predict_Correct answer if num days later rises
for i in range(len(y_data_tmp)):
    Y_data[i,0] = 0
    if y_data_tmp[i] >= 1.0:
        y_data[i] = 1
        Y_data[i,0] = 1
count = 0
for i in y_data:
  if i == 1:
    count += 1
print(count)
for i in range(1,len(y_data_tmp)+1):
    y_data_nam[i] = "{:04}.png ".format(i-1) 
    Y_data[i,1] = y_data_nam[i]
Y_data[0,0] = "label"
Y_data[0,1] = "ImageName"
dn = pd.DataFrame(Y_data)
dn.columns = Y_columns
np.savetxt(path+"debug/y_data.csv", dn, delimiter=",", fmt='%s') #Write to csv file
I will do something about labeling around here. The read data is organized and binarized by 1 if the price on the day is higher than the previous day and 0 if the price is lower than the previous day. In order to be able to call the label from the image name later, arrange the file name (imagename) and label in the same row in csv and make it like a table.
candle_make.ipynb
img_create = 1
if img_create == 1:
    seq_len = 20
    df = pd.read_csv(fname, parse_dates=True, index_col=0)
    df = df.sort_values('Datetime')
    df = df.rename(columns={'Open price':'Open','High price':'High','Low price':'Low','closing price':'Close','Volume':'Volume'})
    df.fillna(0)
    plt.style.use('dark_background')
    df.reset_index(inplace=True)
    df['Datetime'] = df['Datetime'].map(mdates.date2num)
    for i in range(0, len(df)):
        c = df.iloc[i:i + int(seq_len) - 1, :]
        if len(c) == int(seq_len-1):
            # Date,Open,High,Low,Adj Close,Volume
            ohlc = zip(c['Datetime'], c['Open'], c['High'],
                       c['Low'], c['Close'], c['Volume'])
            my_dpi = 96
            fig = plt.figure(figsize=(48 / my_dpi, 48 / my_dpi), dpi=my_dpi)
            ax1 = plt.subplot2grid((1, 1), (0, 0))
            # candlestick2_ohlc(ax1, c['Open'],c['High'],c['Low'],c['Close'], width=0.4, colorup='#77d879', colordown='#db3f3f')
            mpl_finance.candlestick_ohlc(ax1, ohlc, width=0.4,
                             colorup='#77d879', colordown='#db3f3f')
            ax1.grid(False)
            ax1.set_xticklabels([])
            ax1.set_yticklabels([])
            ax1.xaxis.set_visible(False)
            ax1.yaxis.set_visible(False)
            ax1.axis('off')
#            pngfile = 'datasets/{}_{}_{}.png'.format(
#                i, seq_len, fname[11:-4])
            pngfile = 'datasets/{:04}.png'.format(i)
            fig.savefig(path+pngfile,  pad_inches=0, transparent=False)
            plt.close(fig)
    print("Converting olhc to candlestik finished.")
This is almost the same as the paper. Volume display is not possible. Please let me know if you can. Since it is not the same form of numerical data as the paper, it is matched. This is the data I used.

Match the shape by renaming. This completes the image of the dataset.
The training model uses ResNet. In the paper, SOTA was recorded using a unique model structure, but ResNet also obtained sufficient results, so I will use this.
resnet.ipynb
class MyDataSet(Dataset):
    def __init__(self, csv_path, root_dir):
        self.train_df = pd.read_csv(csv_path)
        self.root_dir = root_dir
        self.images = os.listdir(self.root_dir)
        self.transform = transforms.Compose([transforms.ToTensor()])
        self.y_columns = ["label","ImageName"]
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        #Image loading
        # print(self.images)
        image_name = self.images[idx]
        image = Image.open(os.path.join(self.root_dir, image_name) )
        image = image.convert('RGB') # PyTorch 0.4 or later
        # label (0 or 1)
        self.train_df.columns = self.y_columns
        label = self.train_df.query('ImageName=="'+image_name+'"')['label'].iloc[0]
        return self.transform(image), int(label)
def conv3x3(in_channels, out_channels, stride=1, groups=1, dilation=1):
     return nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride,
                      padding=dilation, groups=groups, bias=True,
                      dilation=dilation)
def conv1x1(in_channels, out_channels, stride=1):
     return nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=True)
class BasicBlock(nn.Module):
 #  Implementation of Basic Building Block
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
     super(BasicBlock, self).__init__()
    
     self.conv1 = conv3x3(in_channels, out_channels, stride)
     self.bn1 = nn.BatchNorm2d(out_channels)
     self.relu = nn.ReLU(inplace=True)
     self.conv2 = conv3x3(out_channels, out_channels)
     self.bn2 = nn.BatchNorm2d(out_channels)
     self.downsample = downsample
    def forward(self, x):
     identity_x = x  # hold input for shortcut connection
    
     out = self.conv1(x)
     out = self.bn1(out)
     out = self.relu(out)
    
     out = self.conv2(out)
     out = self.bn2(out)
    
     if self.downsample is not None:
         identity_x = self.downsample(x)
    
     out += identity_x  # shortcut connection
     return self.relu(out)
class ResidualLayer(nn.Module):
     def __init__(self, num_blocks, in_channels, out_channels, block=BasicBlock):
         super(ResidualLayer, self).__init__()
         downsample = None
         if in_channels != out_channels:
             downsample = nn.Sequential(
                 conv1x1(in_channels, out_channels),
                 nn.BatchNorm2d(out_channels)
         )
         self.first_block = block(in_channels, out_channels, downsample=downsample)
         self.blocks = nn.ModuleList(block(out_channels, out_channels) for _ in range(num_blocks))
     def forward(self, x):
         out = self.first_block(x)
         for block in self.blocks:
             out = block(out)
         return out
class ResNet18(nn.Module):
   def __init__(self, num_classes):
       super(ResNet18, self).__init__()
       self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
       self.bn1 = nn.BatchNorm2d(64)
       self.relu = nn.ReLU(inplace=True)
       self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
       self.layer1 = ResidualLayer(2, in_channels=64, out_channels=64)
       self.layer2 = ResidualLayer(2, in_channels=64, out_channels=128)
       self.layer3 = ResidualLayer(
           2, in_channels=128, out_channels=256)
       self.layer4 = ResidualLayer(
           2, in_channels=256, out_channels=512)
       self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
       self.fc = nn.Linear(512, num_classes)
   def forward(self, x):
       out = self.conv1(x)
       out = self.bn1(out)
       out = self.relu(out)
       out = self.maxpool(out)
       out = self.layer1(out)
       out = self.layer2(out)
       out = self.layer3(out)
       out = self.layer4(out)
       out = self.avg_pool(out)
       out = out.view(out.size(0), -1)
       out = self.fc(out)
       return out
   
class Trainer:
    def __init__(self, model, optimizer, criterion):
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model = model.to(self.device)
        self.optimizer = optimizer
        self.criterion = criterion
    def epoch_train(self, train_loader):
        self.model.train()
        epoch_loss = 0
        correct = 0
        total = 0
        for batch_idx, (inputs, targets) in enumerate(train_loader):
            inputs = inputs.to(self.device)
            targets = targets.to(self.device).long()
            self.optimizer.zero_grad()
            outputs = self.model(inputs)
            loss = self.criterion(outputs, targets)
            loss.backward()
            self.optimizer.step()
            epoch_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += targets.size(0)
            correct += predicted.eq(targets.data).cpu().sum().item()
        epoch_loss /= len(train_loader)
        acc = 100 * correct / total
        return epoch_loss, acc
    def epoch_valid(self, valid_loader):
        self.model.eval()
        epoch_loss = 0
        correct = 0
        total = 0
        for batch_idx, (inputs, targets) in enumerate(valid_loader):
            inputs = inputs.to(self.device)
            targets = targets.to(self.device).long()
            outputs = self.model(inputs)
            loss = self.criterion(outputs, targets)
            epoch_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += targets.size(0)
            correct += predicted.eq(targets.data).cpu().sum().item()
        epoch_loss /= len(valid_loader)
        acc = 100 * correct / total
        return epoch_loss, acc
    @property
    def params(self):
        return self.model.state_dict()
It defines ResNet, reads datasets, etc. MyDataSet reads the label and file name and labels them.
resnet.ipynb
if __name__ == '__main__':
    model = ResNet18(2)
    
    epoch_n = 100
    
    optimizer = optim.Adam(model.parameters(),
                        lr=0.001,
                        weight_decay=1e-4)
    
    criterion = nn.CrossEntropyLoss()
    trainer = Trainer(model, optimizer, criterion)
    transform = transforms.Compose([
        transforms.Resize((50, 50)),
        transforms.ToTensor(),
    ])
    tmp_data = MyDataSet(path+'debug/y_data.csv', path+'datasets/')
    dtrain, dtest = train_test_split(tmp_data, test_size=0.2)#You have to have 4 outputs??
    # x_train, y_train, x_test, y_test = train_test_split(tmp_data)
    
    train_loader = torch.utils.data.DataLoader(dtrain, batch_size=43, shuffle=True,
                              drop_last=True)
    valid_loader = torch.utils.data.DataLoader(dtest, batch_size=43, shuffle=True,
                              drop_last=True)
    best_acc = -1
    for epoch in range(1, 1 + epoch_n):
        train_loss, train_acc = trainer.epoch_train(train_loader)
        valid_loss, valid_acc = trainer.epoch_valid(valid_loader)
        if valid_acc > best_acc:
            best_acc = valid_acc
            best_params = trainer.params
        print(f'EPOCH: {epoch} / {epoch_n}')
        print(f'TRAIN LOSS: {train_loss:.3f}, TRAIN ACC: {train_acc:.3f}')
        print(f'VALID LOSS: {valid_loss:.3f}, VALID ACC: {valid_acc:.3f}')
    torch.save(best_params, path + 'models/resnet.pth')
This is the part to learn. I think it's almost the same as a normal ResNet. It is a learning result.
EPOCH: 1 / 100
TRAIN LOSS: 0.700, TRAIN ACC: 62.114
VALID LOSS: 1.145, VALID ACC: 52.442
EPOCH: 2 / 100
TRAIN LOSS: 0.502, TRAIN ACC: 76.391
VALID LOSS: 0.476, VALID ACC: 78.023
EPOCH: 3 / 100
TRAIN LOSS: 0.426, TRAIN ACC: 81.248
VALID LOSS: 0.467, VALID ACC: 77.907
EPOCH: 4 / 100
TRAIN LOSS: 0.368, TRAIN ACC: 83.633
VALID LOSS: 0.357, VALID ACC: 85.000
EPOCH: 5 / 100
TRAIN LOSS: 0.324, TRAIN ACC: 86.488
VALID LOSS: 0.648, VALID ACC: 76.395
EPOCH: 6 / 100
TRAIN LOSS: 0.305, TRAIN ACC: 87.018
VALID LOSS: 0.365, VALID ACC: 84.884
EPOCH: 7 / 100
TRAIN LOSS: 0.277, TRAIN ACC: 88.284
VALID LOSS: 0.480, VALID ACC: 79.884
.
.
.
EPOCH: 92 / 100
TRAIN LOSS: 0.006, TRAIN ACC: 99.853
VALID LOSS: 0.874, VALID ACC: 85.349
EPOCH: 93 / 100
TRAIN LOSS: 0.025, TRAIN ACC: 99.058
VALID LOSS: 0.960, VALID ACC: 78.953
EPOCH: 94 / 100
TRAIN LOSS: 0.028, TRAIN ACC: 99.058
VALID LOSS: 0.992, VALID ACC: 85.698
EPOCH: 95 / 100
TRAIN LOSS: 0.021, TRAIN ACC: 99.264
VALID LOSS: 0.744, VALID ACC: 84.884
EPOCH: 96 / 100
TRAIN LOSS: 0.007, TRAIN ACC: 99.735
VALID LOSS: 1.000, VALID ACC: 85.349
EPOCH: 97 / 100
TRAIN LOSS: 0.008, TRAIN ACC: 99.735
VALID LOSS: 0.834, VALID ACC: 86.279
EPOCH: 98 / 100
TRAIN LOSS: 0.020, TRAIN ACC: 99.470
VALID LOSS: 0.739, VALID ACC: 85.930
EPOCH: 99 / 100
TRAIN LOSS: 0.005, TRAIN ACC: 99.853
VALID LOSS: 0.846, VALID ACC: 86.395
EPOCH: 100 / 100
TRAIN LOSS: 0.016, TRAIN ACC: 99.382
VALID LOSS: 1.104, VALID ACC: 83.953
The train data recorded 100%, and the test data also recorded 87%. Considering about 60% using the previous LSTM, the accuracy is much better.
It turns out that the accuracy can be improved by just hitting the top and bottom by using the image. I don't know if it will be profitable to actually use it, but ... Since this is a follow-up experiment to the dissertation, I'm thinking of incorporating deep-distance learning and predicting a 3% increase.
By the way, in the data of about 4300, only 70 increased by 3% from the previous day. It is 5% of the total, which is a fairly unbalanced data set.
Next time I would like to do something about this.
Recommended Posts