Automatic Zakuzaku, Bitcoin. A story about a Python beginner making a coin check 1-minute chart

Overview

Since automatic trading of virtual currency transactions (bitcoin) has become commonplace, Python beginners generate their own 1-minute bars in order to automatically buy and sell with coin check. I couldn't get 1 minute bar with the API published by Coincheck, so I made it myself. After briefly explaining the core logic, I will post the entire source code and execution results.

Preparation

Check the API provided by Coincheck. Coincheck provides API by REST, but some API using WebSocket is also provided. Currently, what you can get with WebSocket is

Therefore, ** Transaction history ** is used to create a 1-minute bar. Looking at how to use the transaction history,

REQUEST


{
  "type": "subscribe",
  "channel": "[pair]-trades"
}

Is the request format.

RESPONSE


[
  "ID",
  "Trading pair",
  "Order rate",
  "Quantity on order",
  "How to order"
]

Can be confirmed that is the response format. Since we will make a 1-minute bitcoin this time, [pair] is btc_jpy, so use" btc_jpn-trades "to specify the request. Also, since websocket is used with python, install websocket-client with pip.

basic design

To create a 1-minute bar, basically follow the procedure below.

  1. Receive transaction history sequentially from WebSocket
  2. Separate by 1 minute and set the order rate of the first transaction history to open.
  3. While receiving the transaction history, compare the order rates and update high and low. And close is constantly updated.
  4. In addition, add the buy and sell volumes according to the order quantity and order method.

Below is a code fragment that implements this procedure

	def __update_candle_data(self, trade):
        last_candle = self.__candle[-1]        
        _dt = datetime.now().replace(second=0, microsecond=0) #Set second and microsecond to 0 to make 1 minute
        mark_ts = _dt.timestamp()
        last_ts = last_candle["timestamp"]
        if last_ts == mark_ts:       #Update within 1 minute
            last_candle["high"] = max(last_candle["high"], trade["price"])
            last_candle["low"] = min(last_candle["low"], trade["price"])
            last_candle["close"] = trade["price"]
            last_candle["volume"] += trade["volume"]
            last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
            last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0            
        else:                        #Add a new foot if it is out of the 1 minute range
            self.__candle.append(
                {
                    "timestamp": mark_ts,
                    "open": trade["price"],
                    "high": trade["price"],
                    "low": trade["price"],
                    "close": trade["price"],
                    "volume": trade["volume"],
                    "buy": trade["volume"] if trade["type"] == 'buy' else 0,
                    "sell": trade["volume"] if trade["type"] == 'sell' else 0,
                }
            )

This function is called every time the transaction history is received by WebSocket. Set the callback to receive the transaction history. Below is a code fragment


self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
                                        on_message = self.__on_message, #Callback to receive transaction information
                                        on_close = self.__on_close,
                                        on_open = self.__on_open, #Callback called immediately after WebSocket is opened
                                        on_error = self.__on_error)

def __on_open(self, ws): 
    self.ws.send(json.dumps({   #Send the currency pair you want to receive inside this method
        "type": "subscribe",
        "channel": "btc_jpy-trades"
    }))   

def __on_message(self, ws, message):
        if self.__connected == False:
            self.__connected = True

        trade = self.__make_trade(message)
        if self.__init_flg == False:
            self.__init_candle_data(trade)
            self.__init_flg = True
        else:
            self.__update_candle_data(trade) #Update foot data here

Data completion

With the implementation so far, basically 1-minute data can be created, but if there is no transaction history, even 1-minute data cannot be created. Therefore, if there is no transaction for 1 minute, data will be lost for 1 minute. Therefore, check the data every 30 seconds to prevent loss. Below is the code fragment. Since this process needs to be performed regularly, it is executed in a separate thread.


	while self.ws.sock and self.ws.sock.connected:
            
            time.sleep(1)
            if _timer_count < 30:   #Wait 30 seconds
                print("wait until 30")
                _timer_count += 1
                continue

            self.__thread_lock()

            _dt = datetime.now().replace(second=0, microsecond=0)
            mark_ts = _dt.timestamp()
            last_candle = self.__candle[-1]
            last_ts = last_candle["timestamp"]
            #Not within 1 minute of the current time
            if last_ts != mark_ts:
                self.__candle.append(  #Create new 1-minute data using the last 1-minute data
                    {
                        "timestamp": mark_ts,
                        "open": last_candle["close"],
                        "high": last_candle["close"],
                        "low": last_candle["close"],
                        "close": last_candle["close"],
                        "volume": 0,
                        "buy": 0,
                        "sell": 0,
                    }
                )
            self.__thread_unlock()
            _timer_count = 0

Correspondence to disconnection

When I actually try it, the Coincheck WebSocket is often disconnected. When disconnected, ʻon_error is called before that. Also, when observing the timing of the expiration several times, it was found that ʻon_open may not be called in the first place and it may time out as it is. It is a problem that you cannot make a 1-minute bar as soon as it is cut, so if you are disconnected, you want to reconnect with the foot data created so far. Furthermore, if ʻon_open` is not called, it will be 100% cut off, so I would like to try reconnecting within 3 seconds. So, add the following processing


    def __on_error(self, ws, error):
        self.__reconnect()  #Reconnect if an error occurs

    def __candle_thread_terminate(self):
        self.__connected = True #If the data completion thread is in a busy loop waiting for a connection, exit the loop.
        time.sleep(1.5)


    def __exit(self):
        self.ws.close()    #Disconnect WebSocket
        self.__candle_thread_terminate() #End the thread for data completion
        

    def __reconnect(self):
        self.__exit()  #Explicitly disconnect
        time.sleep(2)
        self.__connect() #Connect with Coincheck WebSocket

#Method executed by the data completion thread
def __check_candle(self, args):
    _error_count = 0
    while True:
        if not self.__connected: #WebSocket connection is not established(on_message is never called)If wait
            _error_count += 1 
            self._logger.debug("wait 1 sec")
            time.sleep(1)

            if not self.__opened and _error_count > 3:  #On even if you wait 3 seconds_Reconnect if open is not called
                self.ws.on_error = None #On when reconnecting_Prevent error from being called and going to connect twice
                term_thread = threading.Thread(target=lambda: self.__reconnect()) #You can't end this thread unless you do it in another thread
                term_thread.start()
                break           

        else:
            break        

Basically, it processes according to the comment in the source code, but if you can not connect even after waiting for 3 seconds, if you call self.__reconnect () directly from the thread for data completion, that thread I was addicted to the fact that I had to execute it in a separate thread because I couldn't terminate myself. If you do not execute it in another thread, the number of data completion threads will double each time you disconnect, which is a terrifying situation.

All source code and execution results

coincheck_websocket


import websocket
import json
import threading
import time

from datetime import datetime

class CoinCheckWebSocket:

    URL = "wss://ws-api.coincheck.com/"
    MAX_CANDLE_LEN = 24 * 60

    def __init__(self):
        self.__candle = []
        self.__init_flg = False

        self.__f = open("coin1m.log", 'w')

        self.__lock = threading.Lock()

        self.__connect()
    

    def __connect(self):
        self.__connected = False
        self.__opened = False
        self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
                                        on_message = self.__on_message,
                                        on_close = self.__on_close,
                                        on_open = self.__on_open,
                                        on_error = self.__on_error)

        self.wst = threading.Thread(target=lambda: self.ws.run_forever())
        self.wst.daemon = True
        self.wst.start()


        # Candle Thread
        self._check_candle_thread = threading.Thread(
            target = self.__check_candle, args=("check_candle",)
        )
        self._check_candle_thread.daemon = True
        self._check_candle_thread.start()
        print("check candle thread start")
    

        

    def __on_open(self, ws):
        print("open")
        
        self.ws.send(json.dumps({
            "type": "subscribe",
            "channel": "btc_jpy-trades"
        }))        

        self.__opened = True

    def __on_error(self, ws, error):
        print("error")
        print(error)
        self.__reconnect()

    def __on_close(self, ws):
        print("close, why close")

    def __on_message(self, ws, message):
        #print(message)
        if self.__connected == False:
            self.__connected = True

        trade = self.__make_trade(message)
        #print(trade)
        if self.__init_flg == False:
            self.__init_candle_data(trade)
            self.__init_flg = True
        else:
            self.__update_candle_data(trade)

    def __candle_thread_terminate(self):
        print("__candle_thread_terminate invoked")
        self.__connected = True #Because it may be stopped by wait
        time.sleep(1.5)


    def __exit(self):
        print("__exit invoked")
        self.ws.close()
        self.__candle_thread_terminate()
        

    def __reconnect(self):
        print("__reconnect invoked")
        self.__exit()
        time.sleep(2)
        self.__connect()

    def __make_trade(self, message):
        elements = message.split(',')
        trade = {
            "id": int(elements[0][1:len(elements[0])-1]),
            "price": float(elements[2][1:len(elements[2])-1]),
            "volume": float(elements[3][1:len(elements[3])-1]),
            "type": elements[4][1:len(elements[4])-2]
        }    
        return trade

    def __thread_lock(self):
        _count = 0
        while self.__lock.acquire(blocking=True, timeout=1) == False:
            _count += 1
            if _count > 3:
                print("lock acquite timeout")
                return False
        return True

    def __thread_unlock(self):
        try:
            self.__lock.release()
        except Exception as e:
            print("lock release a {}".format(e))
            return False
        return True

    def __format_candle(self, candle):
        dt = datetime.fromtimestamp(candle["timestamp"])
        s_str = "{0:%Y-%m-%d %H:%M:%S}".format(dt)
        fmt_str = "%s  %.1f  %.1f  %.1f  %.1f   %.6f   %.6f   %.6f" % (s_str, 
                                candle["open"],
                                candle["high"],
                                candle["low"],
                                candle["close"],
                                candle["volume"],
                                candle["buy"],
                                candle["sell"],
                                ) 
        return fmt_str

    def _write_log(self, candle):
        fmt_str = self.__format_candle(candle)
        fmt_str += '\r\n'        
        self.__f.write(fmt_str)
        self.__f.flush()

    def _print_log(self, candle):
        fmt_str = self.__format_candle(candle)
        print(fmt_str)
        

    def __init_candle_data(self, trade):
        _dt = datetime.now().replace(second=0, microsecond=0)
        _stamp = _dt.timestamp()
        self.__candle.append(
            {
                "timestamp": _stamp,
                "open": trade["price"],
                "high": trade["price"],
                "low": trade["price"],
                "close": trade["price"],
                "volume": trade["volume"],
                "buy": trade["volume"] if trade["type"] == 'buy' else 0,
                "sell": trade["volume"] if trade["type"] == 'sell' else 0,
            }
        )                

    def __update_candle_data(self, trade):
        last_candle = self.__candle[-1]        
        _dt = datetime.now().replace(second=0, microsecond=0)
        mark_ts = _dt.timestamp()
        last_ts = last_candle["timestamp"]
        if last_ts == mark_ts:
            print("append")
            last_candle["high"] = max(last_candle["high"], trade["price"])
            last_candle["low"] = min(last_candle["low"], trade["price"])
            last_candle["close"] = trade["price"]
            last_candle["volume"] += trade["volume"]
            last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
            last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0 
            
            self._print_log(last_candle)
        else:
            print("add new")
            self._write_log(last_candle)
            self.__candle.append(
                {
                    "timestamp": mark_ts,
                    "open": trade["price"],
                    "high": trade["price"],
                    "low": trade["price"],
                    "close": trade["price"],
                    "volume": trade["volume"],
                    "buy": trade["volume"] if trade["type"] == 'buy' else 0,
                    "sell": trade["volume"] if trade["type"] == 'sell' else 0,
                }
            )
    def get_candle(self, type=0):
        self.__thread_lock()
        if type == 0:
            candle = self.__candle[:-1]
        else:
            candle = self.__candle[:]
        self.__thread_unlock()
        return candle

    def __check_candle(self, args):
        _error_count = 0
        while True:
            if not self.__connected:
                _error_count += 1                
                print("wait 1 sec")
                time.sleep(1)

                if not self.__opened and _error_count > 3:
                    #print("nonono reconnect!!!")
                    self.ws.on_error = None #Avoid being called twice
                    term_thread = threading.Thread(target=lambda: self.__reconnect()) #You can't end this thread unless you do it in another thread
                    term_thread.start()
                    break           

            else:
                break        
        
                
        _timer_count = 0
        while self.ws.sock and self.ws.sock.connected:
            
            time.sleep(1)
            if _timer_count < 30:
                print("wait until 30")
                _timer_count += 1
                continue

            print(">>>>>>check candle")


            self.__thread_lock()

            _dt = datetime.now().replace(second=0, microsecond=0)
            mark_ts = _dt.timestamp()
            last_candle = self.__candle[-1]
            last_ts = last_candle["timestamp"]
            #Not within 1 minute of the current time
            if last_ts != mark_ts:
                print("---->>>>>>>  new in check candle")
                self._write_log(last_candle)
                self.__candle.append(
                    {
                        "timestamp": mark_ts,
                        "open": last_candle["close"],
                        "high": last_candle["close"],
                        "low": last_candle["close"],
                        "close": last_candle["close"],
                        "volume": 0,
                        "buy": 0,
                        "sell": 0,
                    }
                )
            if len(self.__candle) > (CoinCheckWebSocket.MAX_CANDLE_LEN * 1.5):
                self.__candle = self.__candle[-CoinCheckWebSocket.MAX_CANDLE_LEN:]

            self.__thread_unlock()
            _timer_count = 0

        print("check candle end")

if __name__ == "__main__":    
    chs = CoinCheckWebSocket()

    while True:
        time.sleep(60)

When executed as follows, a file called coin1m.log is generated, and 1-minute data is written in it.

>python3 coincheck_websocket.py
>cat coin1m.log
2019-12-19 03:30:00  750169.0  750169.0  749685.0  749714.0   1.265700   0.000000   1.265700
2019-12-19 03:31:00  749685.0  750428.0  749685.0  750415.0   0.348400   0.169315   0.179085
2019-12-19 03:32:00  750481.0  750481.0  750152.0  750152.0   0.347950   0.050000   0.297950

I have omitted the detailed processing explanation above, but if you have any mistakes or questions, please do not hesitate to contact us. However, ** I'm new to Python **. And ** investment is self-responsibility w **

Impressions

I want an explicit type ヽ (´ ・ ω ・) ノ

Recommended Posts

Automatic Zakuzaku, Bitcoin. A story about a Python beginner making a coin check 1-minute chart
A story about making 3D space recognition with Python
A story about making Hanon-like sheet music with Python
A story about an amateur making a breakout with python (kivy) ②
A story about an amateur making a breakout with python (kivy) ①
A story about a python beginner stuck with No module named'http.server'
A story about Python pop and append
A story about running Python on PHP on Heroku
A story about modifying Python and adding functions
A story about a Python beginner trying to get Google search results using the API
A story about trying a (Golang +) Python monorepo with Bazel
A story about a Linux beginner passing LPIC101 in a week
A story about a Linux beginner putting Linux on a Windows tablet
A story about a Python beginner who was about to be crushed by ModuleNotFoundError: No module named'tweepy'
A story about how to specify a relative path in python.
A story about making a tanka by chance with Sudachi Py
A story about trying to implement a private variable in Python.
A story about a GCP beginner building a Minecraft server on GCE