Zakuzaku automatique, Bitcoin. Une histoire sur un débutant en Python faisant un tableau de contrôle de pièces en 1 minute

Aperçu

Depuis que le trading automatique de transactions en monnaie virtuelle (pièces de monnaie) est devenu monnaie courante, les débutants en Python génèrent eux-mêmes des barres d'une minute afin de négocier automatiquement avec des chèques de pièces. Je n'ai pas pu obtenir 1 minute avec l'API publiée par Coin Check, alors je l'ai faite moi-même. Après avoir brièvement expliqué la logique de base, je publierai l'intégralité du code source et les résultats de l'exécution

Préparation

Vérifiez l 'API fournie par Coincheck. Coincheck fournit une API par REST, mais certaines API utilisant WebSocket sont également fournies. Actuellement, seul WebSocket peut obtenir

Alors, utilisez ** l'historique des transactions ** pour créer une barre d'une minute. En regardant comment utiliser l'historique des transactions,

REQUEST


{
  "type": "subscribe",
  "channel": "[pair]-trades"
}

Est le format de la demande.

RESPONSE


[
  "ID",
  "Paire de négociation",
  "Taux de commande",
  "Quantité sur commande",
  "Comment commander"
]

Peut être confirmé que c'est le format de réponse. Cette fois, nous allons faire une barre de 1 minute de pièce de monnaie, donc [paire] est btc_jpy, alors utilisez" btc_jpn-trades "pour spécifier la demande. De plus, pour utiliser websocket avec python, installez websocket-client avec pip.

conception de base

Pour créer une barre d'une minute, suivez essentiellement la procédure ci-dessous.

  1. Recevoir l'historique des transactions de manière séquentielle depuis WebSocket
  2. Séparez par 1 minute et définissez le taux de commande du premier historique des transactions à ouvrir.
  3. Lors de la réception de l'historique des transactions, comparez les taux de commande et mettez à jour haut et bas. Et fermer est constamment mis à jour.
  4. De plus, ajoutez les volumes d'achat et de vente en fonction de la quantité et de la méthode de commande.

Vous trouverez ci-dessous un fragment de code qui implémente cette procédure

	def __update_candle_data(self, trade):
        last_candle = self.__candle[-1]        
        _dt = datetime.now().replace(second=0, microsecond=0) #Réglez la seconde et la microseconde à 0 pour faire 1 minute
        mark_ts = _dt.timestamp()
        last_ts = last_candle["timestamp"]
        if last_ts == mark_ts:       #Mettre à jour dans 1 minute
            last_candle["high"] = max(last_candle["high"], trade["price"])
            last_candle["low"] = min(last_candle["low"], trade["price"])
            last_candle["close"] = trade["price"]
            last_candle["volume"] += trade["volume"]
            last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
            last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0            
        else:                        #Ajouter une nouvelle étape si elle est en dehors de la plage de 1 minute
            self.__candle.append(
                {
                    "timestamp": mark_ts,
                    "open": trade["price"],
                    "high": trade["price"],
                    "low": trade["price"],
                    "close": trade["price"],
                    "volume": trade["volume"],
                    "buy": trade["volume"] if trade["type"] == 'buy' else 0,
                    "sell": trade["volume"] if trade["type"] == 'sell' else 0,
                }
            )

Cette fonction est appelée à chaque fois que l'historique des transactions est reçu par WebSocket. Définissez le rappel pour recevoir l'historique des transactions. Ci-dessous un fragment de code


self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
                                        on_message = self.__on_message, #Rappel pour recevoir les informations de transaction
                                        on_close = self.__on_close,
                                        on_open = self.__on_open, #Rappel appelé immédiatement après l'ouverture de WebSocket
                                        on_error = self.__on_error)

def __on_open(self, ws): 
    self.ws.send(json.dumps({   #Envoyez la paire de devises que vous souhaitez recevoir dans cette méthode
        "type": "subscribe",
        "channel": "btc_jpy-trades"
    }))   

def __on_message(self, ws, message):
        if self.__connected == False:
            self.__connected = True

        trade = self.__make_trade(message)
        if self.__init_flg == False:
            self.__init_candle_data(trade)
            self.__init_flg = True
        else:
            self.__update_candle_data(trade) #Mettez à jour les données de pied ici

Achèvement des données

Avec la mise en œuvre jusqu'à présent, des données d'une minute peuvent être créées, mais sans historique des transactions, même des données d'une minute ne peuvent pas être créées. Par conséquent, s'il n'y a pas de transaction pendant 1 minute, les données seront perdues pendant 1 minute. Par conséquent, vérifiez les données toutes les 30 secondes pour éviter toute perte. Voici le fragment de code. Puisque ce processus doit être effectué régulièrement, il est exécuté dans un thread séparé.


	while self.ws.sock and self.ws.sock.connected:
            
            time.sleep(1)
            if _timer_count < 30:   #Attendez 30 secondes
                print("wait until 30")
                _timer_count += 1
                continue

            self.__thread_lock()

            _dt = datetime.now().replace(second=0, microsecond=0)
            mark_ts = _dt.timestamp()
            last_candle = self.__candle[-1]
            last_ts = last_candle["timestamp"]
            #Pas à moins d'une minute de l'heure actuelle
            if last_ts != mark_ts:
                self.__candle.append(  #Créez de nouvelles données d'une minute en utilisant les dernières données d'une minute
                    {
                        "timestamp": mark_ts,
                        "open": last_candle["close"],
                        "high": last_candle["close"],
                        "low": last_candle["close"],
                        "close": last_candle["close"],
                        "volume": 0,
                        "buy": 0,
                        "sell": 0,
                    }
                )
            self.__thread_unlock()
            _timer_count = 0

Correspondance à la déconnexion

Lorsque je l'essaye, le contrôle des pièces WebSocket est souvent déconnecté. Lorsqu'il est déconnecté, ʻon_error est appelé avant cela. De plus, en observant le moment de l'expiration à plusieurs reprises, il a été constaté que ʻon_open ne peut pas être appelé en premier lieu et qu'il peut expirer tel quel. C'est un problème que vous ne pouvez pas faire un pied de 1 minute dès qu'il est coupé, donc si vous êtes déconnecté, vous voulez vous reconnecter avec les données de pied créées jusqu'à présent. De plus, si ʻon_open` n'est pas appelé, il sera coupé à 100%, donc j'aimerais essayer de me reconnecter dans les 3 secondes. Alors, ajoutez le traitement suivant


    def __on_error(self, ws, error):
        self.__reconnect()  #Reconnectez-vous si une erreur se produit

    def __candle_thread_terminate(self):
        self.__connected = True #Si le thread d'achèvement de données est dans une boucle occupée en attente d'une connexion, quittez la boucle.
        time.sleep(1.5)


    def __exit(self):
        self.ws.close()    #Déconnecter WebSocket
        self.__candle_thread_terminate() #Terminer le thread pour l'achèvement des données
        

    def __reconnect(self):
        self.__exit()  #Déconnecter explicitement
        time.sleep(2)
        self.__connect() #Connectez-vous avec Coincheck WebSocket

#Méthode exécutée par le thread de complétion de données
def __check_candle(self, args):
    _error_count = 0
    while True:
        if not self.__connected: #La connexion WebSocket n'est pas établie(on_le message n'est jamais appelé)Si attendre
            _error_count += 1 
            self._logger.debug("wait 1 sec")
            time.sleep(1)

            if not self.__opened and _error_count > 3:  #Même si vous attendez 3 secondes_Se reconnecter si open n'est pas appelé
                self.ws.on_error = None #Allumé lors de la reconnexion_Empêcher l'erreur d'être appelée et de se connecter deux fois
                term_thread = threading.Thread(target=lambda: self.__reconnect()) #Vous ne pouvez pas terminer ce fil à moins de le faire dans un autre fil
                term_thread.start()
                break           

        else:
            break        

Fondamentalement, il traite selon le commentaire dans le code source, mais si vous ne pouvez pas vous connecter même après avoir attendu 3 secondes, si vous appelez self .__ reconnect () directement à partir du thread pour la complétion des données, ce thread J'étais accro au fait que je devais l'exécuter dans un thread séparé parce que je ne pouvais pas me terminer. Si vous ne l'exécutez pas dans un autre thread, le nombre de threads d'achèvement de données doublera à chaque fois que vous serez déconnecté, ce qui est une situation terrifiante.

Tout le code source et les résultats d'exécution

coincheck_websocket


import websocket
import json
import threading
import time

from datetime import datetime

class CoinCheckWebSocket:

    URL = "wss://ws-api.coincheck.com/"
    MAX_CANDLE_LEN = 24 * 60

    def __init__(self):
        self.__candle = []
        self.__init_flg = False

        self.__f = open("coin1m.log", 'w')

        self.__lock = threading.Lock()

        self.__connect()
    

    def __connect(self):
        self.__connected = False
        self.__opened = False
        self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
                                        on_message = self.__on_message,
                                        on_close = self.__on_close,
                                        on_open = self.__on_open,
                                        on_error = self.__on_error)

        self.wst = threading.Thread(target=lambda: self.ws.run_forever())
        self.wst.daemon = True
        self.wst.start()


        # Candle Thread
        self._check_candle_thread = threading.Thread(
            target = self.__check_candle, args=("check_candle",)
        )
        self._check_candle_thread.daemon = True
        self._check_candle_thread.start()
        print("check candle thread start")
    

        

    def __on_open(self, ws):
        print("open")
        
        self.ws.send(json.dumps({
            "type": "subscribe",
            "channel": "btc_jpy-trades"
        }))        

        self.__opened = True

    def __on_error(self, ws, error):
        print("error")
        print(error)
        self.__reconnect()

    def __on_close(self, ws):
        print("close, why close")

    def __on_message(self, ws, message):
        #print(message)
        if self.__connected == False:
            self.__connected = True

        trade = self.__make_trade(message)
        #print(trade)
        if self.__init_flg == False:
            self.__init_candle_data(trade)
            self.__init_flg = True
        else:
            self.__update_candle_data(trade)

    def __candle_thread_terminate(self):
        print("__candle_thread_terminate invoked")
        self.__connected = True #Parce qu'il peut être arrêté par l'attente
        time.sleep(1.5)


    def __exit(self):
        print("__exit invoked")
        self.ws.close()
        self.__candle_thread_terminate()
        

    def __reconnect(self):
        print("__reconnect invoked")
        self.__exit()
        time.sleep(2)
        self.__connect()

    def __make_trade(self, message):
        elements = message.split(',')
        trade = {
            "id": int(elements[0][1:len(elements[0])-1]),
            "price": float(elements[2][1:len(elements[2])-1]),
            "volume": float(elements[3][1:len(elements[3])-1]),
            "type": elements[4][1:len(elements[4])-2]
        }    
        return trade

    def __thread_lock(self):
        _count = 0
        while self.__lock.acquire(blocking=True, timeout=1) == False:
            _count += 1
            if _count > 3:
                print("lock acquite timeout")
                return False
        return True

    def __thread_unlock(self):
        try:
            self.__lock.release()
        except Exception as e:
            print("lock release a {}".format(e))
            return False
        return True

    def __format_candle(self, candle):
        dt = datetime.fromtimestamp(candle["timestamp"])
        s_str = "{0:%Y-%m-%d %H:%M:%S}".format(dt)
        fmt_str = "%s  %.1f  %.1f  %.1f  %.1f   %.6f   %.6f   %.6f" % (s_str, 
                                candle["open"],
                                candle["high"],
                                candle["low"],
                                candle["close"],
                                candle["volume"],
                                candle["buy"],
                                candle["sell"],
                                ) 
        return fmt_str

    def _write_log(self, candle):
        fmt_str = self.__format_candle(candle)
        fmt_str += '\r\n'        
        self.__f.write(fmt_str)
        self.__f.flush()

    def _print_log(self, candle):
        fmt_str = self.__format_candle(candle)
        print(fmt_str)
        

    def __init_candle_data(self, trade):
        _dt = datetime.now().replace(second=0, microsecond=0)
        _stamp = _dt.timestamp()
        self.__candle.append(
            {
                "timestamp": _stamp,
                "open": trade["price"],
                "high": trade["price"],
                "low": trade["price"],
                "close": trade["price"],
                "volume": trade["volume"],
                "buy": trade["volume"] if trade["type"] == 'buy' else 0,
                "sell": trade["volume"] if trade["type"] == 'sell' else 0,
            }
        )                

    def __update_candle_data(self, trade):
        last_candle = self.__candle[-1]        
        _dt = datetime.now().replace(second=0, microsecond=0)
        mark_ts = _dt.timestamp()
        last_ts = last_candle["timestamp"]
        if last_ts == mark_ts:
            print("append")
            last_candle["high"] = max(last_candle["high"], trade["price"])
            last_candle["low"] = min(last_candle["low"], trade["price"])
            last_candle["close"] = trade["price"]
            last_candle["volume"] += trade["volume"]
            last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
            last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0 
            
            self._print_log(last_candle)
        else:
            print("add new")
            self._write_log(last_candle)
            self.__candle.append(
                {
                    "timestamp": mark_ts,
                    "open": trade["price"],
                    "high": trade["price"],
                    "low": trade["price"],
                    "close": trade["price"],
                    "volume": trade["volume"],
                    "buy": trade["volume"] if trade["type"] == 'buy' else 0,
                    "sell": trade["volume"] if trade["type"] == 'sell' else 0,
                }
            )
    def get_candle(self, type=0):
        self.__thread_lock()
        if type == 0:
            candle = self.__candle[:-1]
        else:
            candle = self.__candle[:]
        self.__thread_unlock()
        return candle

    def __check_candle(self, args):
        _error_count = 0
        while True:
            if not self.__connected:
                _error_count += 1                
                print("wait 1 sec")
                time.sleep(1)

                if not self.__opened and _error_count > 3:
                    #print("nonono reconnect!!!")
                    self.ws.on_error = None #Évitez d'être appelé deux fois
                    term_thread = threading.Thread(target=lambda: self.__reconnect()) #Vous ne pouvez pas terminer ce fil à moins de le faire dans un autre fil
                    term_thread.start()
                    break           

            else:
                break        
        
                
        _timer_count = 0
        while self.ws.sock and self.ws.sock.connected:
            
            time.sleep(1)
            if _timer_count < 30:
                print("wait until 30")
                _timer_count += 1
                continue

            print(">>>>>>check candle")


            self.__thread_lock()

            _dt = datetime.now().replace(second=0, microsecond=0)
            mark_ts = _dt.timestamp()
            last_candle = self.__candle[-1]
            last_ts = last_candle["timestamp"]
            #Pas à moins d'une minute de l'heure actuelle
            if last_ts != mark_ts:
                print("---->>>>>>>  new in check candle")
                self._write_log(last_candle)
                self.__candle.append(
                    {
                        "timestamp": mark_ts,
                        "open": last_candle["close"],
                        "high": last_candle["close"],
                        "low": last_candle["close"],
                        "close": last_candle["close"],
                        "volume": 0,
                        "buy": 0,
                        "sell": 0,
                    }
                )
            if len(self.__candle) > (CoinCheckWebSocket.MAX_CANDLE_LEN * 1.5):
                self.__candle = self.__candle[-CoinCheckWebSocket.MAX_CANDLE_LEN:]

            self.__thread_unlock()
            _timer_count = 0

        print("check candle end")

if __name__ == "__main__":    
    chs = CoinCheckWebSocket()

    while True:
        time.sleep(60)

Lorsqu'il est exécuté comme suit, un fichier appelé coin1m.log est généré et des données d'une minute y sont écrites.

>python3 coincheck_websocket.py
>cat coin1m.log
2019-12-19 03:30:00  750169.0  750169.0  749685.0  749714.0   1.265700   0.000000   1.265700
2019-12-19 03:31:00  749685.0  750428.0  749685.0  750415.0   0.348400   0.169315   0.179085
2019-12-19 03:32:00  750481.0  750481.0  750152.0  750152.0   0.347950   0.050000   0.297950

J'ai omis l'explication détaillée du traitement ci-dessus, mais si vous avez des erreurs ou des questions, n'hésitez pas à nous contacter. Cependant, ** je suis nouveau sur Python **. Et ** l'investissement est une responsabilité personnelle w **

Impressions

Je veux un type explicite ヽ (´ ・ ω ・) ノ

Recommended Posts

Zakuzaku automatique, Bitcoin. Une histoire sur un débutant en Python faisant un tableau de contrôle de pièces en 1 minute
L'histoire de la création d'une partition de type Hanon avec Python
Une histoire sur un amateur faisant une rupture de bloc avec python (kivy) ②
Une histoire sur un amateur faisant une rupture de bloc avec python (kivy) ①
Une histoire à propos d'un débutant en python coincé avec aucun module nommé'ttp.server '
Une histoire sur Python pop and append
Une histoire sur l'exécution de Python sur PHP sur Heroku
Une histoire sur la modification de Python et l'ajout de fonctions
Une histoire sur un débutant Python essayant d'obtenir des résultats de recherche Google à l'aide de l'API
Une histoire d'essayer un monorepo (Golang +) Python avec Bazel
Une histoire sur un débutant Linux passant LPIC101 en une semaine
Une histoire sur un débutant Linux mettant Linux sur une tablette Windows
Une histoire à propos d'un débutant en Python qui était sur le point d'être écrasé par ModuleNotFoundError: Aucun module nommé 'weepy'
Une histoire sur la façon de spécifier un chemin relatif en python.
Une histoire sur la création d'une courte chanson par hasard avec Sudachi Py
Une histoire sur la tentative d'implémentation de variables privées en Python.
Une histoire sur un débutant de GCP essayant de créer un serveur Micra avec GCE