Automatisches Zakuzaku, Bitcoin. Eine Geschichte über einen Python-Anfänger, der ein 1-Minuten-Diagramm für Münzprüfungen erstellt

Überblick

Da der automatische Handel mit virtuellen Währungstransaktionen (Bitmünzen) an der Tagesordnung ist, generieren Python-Anfänger ihre eigenen 1-Minuten-Balken, um automatisch mit Münzschecks zu handeln. Ich konnte mit der von Coin Check veröffentlichten API keine Minute bekommen, also habe ich es selbst gemacht. Nachdem ich die Kernlogik kurz erklärt habe, werde ich den gesamten Quellcode und die Ausführungsergebnisse veröffentlichen

Vorbereitung

Überprüfen Sie die von Coincheck bereitgestellte API. Coincheck bietet API per REST, aber auch einige APIs, die WebSocket verwenden, werden bereitgestellt. Derzeit kann nur WebSocket erhalten

Verwenden Sie also ** Transaktionsverlauf **, um einen 1-Minuten-Balken zu erstellen. Betrachten Sie die Verwendung des Transaktionsverlaufs.

REQUEST


{
  "type": "subscribe",
  "channel": "[pair]-trades"
}

Ist das Anforderungsformat.

RESPONSE


[
  "ID",
  "Handelspaar",
  "Bestellpreis",
  "Menge auf Bestellung",
  "Wie man bestellt"
]

Kann bestätigt werden, dass dies das Antwortformat ist. Da wir diesmal einen 1-Minuten-Balken mit Bitcoin erstellen, ist [pair] "btc_jpy". Verwenden Sie daher "btc_jpn-trades", um die Anforderung anzugeben. Um Websocket mit Python zu verwenden, installieren Sie den Websocket-Client mit pip.

grundlegende Gestaltung

Gehen Sie wie folgt vor, um einen 1-Minuten-Balken zu erstellen.

  1. Empfangen Sie den Transaktionsverlauf nacheinander von WebSocket
  2. Trennen Sie sich um 1 Minute und legen Sie die Bestellrate des ersten zu öffnenden Transaktionsverlaufs fest.
  3. Vergleichen Sie während des Empfangs der Transaktionshistorie die Bestellraten und aktualisieren Sie hoch und niedrig. Und schließen wird ständig aktualisiert.
  4. Fügen Sie zusätzlich das Kauf- und Verkaufsvolumen entsprechend der Bestellmenge und der Bestellmethode hinzu.

Unten finden Sie ein Codefragment, das diese Prozedur implementiert

	def __update_candle_data(self, trade):
        last_candle = self.__candle[-1]        
        _dt = datetime.now().replace(second=0, microsecond=0) #Stellen Sie Sekunde und Mikrosekunde auf 0, um 1 Minute zu machen
        mark_ts = _dt.timestamp()
        last_ts = last_candle["timestamp"]
        if last_ts == mark_ts:       #Update innerhalb von 1 Minute
            last_candle["high"] = max(last_candle["high"], trade["price"])
            last_candle["low"] = min(last_candle["low"], trade["price"])
            last_candle["close"] = trade["price"]
            last_candle["volume"] += trade["volume"]
            last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
            last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0            
        else:                        #Fügen Sie ein neues Bein hinzu, wenn es außerhalb des Bereichs von 1 Minute liegt
            self.__candle.append(
                {
                    "timestamp": mark_ts,
                    "open": trade["price"],
                    "high": trade["price"],
                    "low": trade["price"],
                    "close": trade["price"],
                    "volume": trade["volume"],
                    "buy": trade["volume"] if trade["type"] == 'buy' else 0,
                    "sell": trade["volume"] if trade["type"] == 'sell' else 0,
                }
            )

Diese Funktion wird jedes Mal aufgerufen, wenn der Transaktionsverlauf von WebSocket empfangen wird. Stellen Sie den Rückruf ein, um den Transaktionsverlauf zu erhalten. Unten ist ein Codefragment


self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
                                        on_message = self.__on_message, #Rückruf zum Empfang von Transaktionsinformationen
                                        on_close = self.__on_close,
                                        on_open = self.__on_open, #Rückruf wird sofort nach dem Öffnen von WebSocket aufgerufen
                                        on_error = self.__on_error)

def __on_open(self, ws): 
    self.ws.send(json.dumps({   #Senden Sie das Währungspaar, das Sie innerhalb dieser Methode erhalten möchten
        "type": "subscribe",
        "channel": "btc_jpy-trades"
    }))   

def __on_message(self, ws, message):
        if self.__connected == False:
            self.__connected = True

        trade = self.__make_trade(message)
        if self.__init_flg == False:
            self.__init_candle_data(trade)
            self.__init_flg = True
        else:
            self.__update_candle_data(trade) #Fußdaten hier aktualisieren

Datenvervollständigung

Mit der bisherigen Implementierung können grundsätzlich 1-Minuten-Daten erstellt werden, aber ohne Transaktionsverlauf können nicht einmal 1-Minuten-Daten erstellt werden. Wenn 1 Minute lang keine Transaktion stattfindet, gehen die Daten 1 Minute lang verloren. Überprüfen Sie daher die Daten alle 30 Sekunden, um einen Verlust zu vermeiden. Unten ist das Codefragment. Da dieser Prozess regelmäßig ausgeführt werden muss, wird er in einem separaten Thread ausgeführt.


	while self.ws.sock and self.ws.sock.connected:
            
            time.sleep(1)
            if _timer_count < 30:   #Warten Sie 30 Sekunden
                print("wait until 30")
                _timer_count += 1
                continue

            self.__thread_lock()

            _dt = datetime.now().replace(second=0, microsecond=0)
            mark_ts = _dt.timestamp()
            last_candle = self.__candle[-1]
            last_ts = last_candle["timestamp"]
            #Nicht innerhalb von 1 Minute nach der aktuellen Zeit
            if last_ts != mark_ts:
                self.__candle.append(  #Erstellen Sie neue 1-Minuten-Daten mit den letzten 1-Minuten-Daten
                    {
                        "timestamp": mark_ts,
                        "open": last_candle["close"],
                        "high": last_candle["close"],
                        "low": last_candle["close"],
                        "close": last_candle["close"],
                        "volume": 0,
                        "buy": 0,
                        "sell": 0,
                    }
                )
            self.__thread_unlock()
            _timer_count = 0

Korrespondenz zur Trennung

Wenn ich es tatsächlich versuche, wird die Münzprüfung WebSocket oft getrennt. Wenn die Verbindung getrennt ist, wird vorher "on_error" aufgerufen. Bei mehrmaliger Beobachtung des Ablaufzeitpunkts wurde außerdem festgestellt, dass "on_open" möglicherweise nicht an erster Stelle aufgerufen wird und möglicherweise eine Zeitüberschreitung aufweist. Es ist ein Problem, dass Sie keinen 1-Minuten-Fuß machen können, sobald er abgeschnitten ist. Wenn Sie also nicht verbunden sind, möchten Sie die Verbindung zu den bisher erstellten Fußdaten wieder herstellen. Wenn on_open nicht aufgerufen wird, wird es zu 100% abgeschnitten, daher möchte ich versuchen, die Verbindung innerhalb von 3 Sekunden wieder herzustellen. Fügen Sie also die folgende Verarbeitung hinzu


    def __on_error(self, ws, error):
        self.__reconnect()  #Stellen Sie die Verbindung erneut her, wenn ein Fehler auftritt

    def __candle_thread_terminate(self):
        self.__connected = True #Wenn sich der Datenvervollständigungsthread in einer Besetztschleife befindet und auf eine Verbindung wartet, beenden Sie die Schleife.
        time.sleep(1.5)


    def __exit(self):
        self.ws.close()    #Trennen Sie WebSocket
        self.__candle_thread_terminate() #Beenden Sie den Thread zur Datenvervollständigung
        

    def __reconnect(self):
        self.__exit()  #Explizit trennen
        time.sleep(2)
        self.__connect() #Stellen Sie eine Verbindung mit Coincheck WebSocket her

#Methode, die vom Datenvervollständigungsthread ausgeführt wird
def __check_candle(self, args):
    _error_count = 0
    while True:
        if not self.__connected: #Die WebSocket-Verbindung wurde nicht hergestellt(on_Nachricht wird nie aufgerufen)Wenn warten
            _error_count += 1 
            self._logger.debug("wait 1 sec")
            time.sleep(1)

            if not self.__opened and _error_count > 3:  #Auch wenn Sie 3 Sekunden warten_Stellen Sie die Verbindung wieder her, wenn open nicht aufgerufen wird
                self.ws.on_error = None #Ein beim erneuten Verbinden_Verhindern Sie, dass Fehler aufgerufen werden und zweimal eine Verbindung hergestellt wird
                term_thread = threading.Thread(target=lambda: self.__reconnect()) #Sie können diesen Thread nur beenden, wenn Sie ihn in einem anderen Thread ausführen
                term_thread.start()
                break           

        else:
            break        

Grundsätzlich wird es gemäß dem Kommentar im Quellcode verarbeitet, aber wenn Sie auch nach 3 Sekunden Wartezeit keine Verbindung herstellen können, wenn Sie "self .__ reconnect ()" direkt vom Thread zur Datenvervollständigung aufrufen, wird dieser Thread Ich war süchtig nach der Tatsache, dass ich es in einem separaten Thread ausführen musste, weil ich mich nicht selbst beenden konnte. Wenn Sie es nicht in einem anderen Thread ausführen, verdoppelt sich die Anzahl der Datenvervollständigungsthreads jedes Mal, wenn Sie die Verbindung trennen. Dies ist eine schreckliche Situation.

Alle Quellcode- und Ausführungsergebnisse

coincheck_websocket


import websocket
import json
import threading
import time

from datetime import datetime

class CoinCheckWebSocket:

    URL = "wss://ws-api.coincheck.com/"
    MAX_CANDLE_LEN = 24 * 60

    def __init__(self):
        self.__candle = []
        self.__init_flg = False

        self.__f = open("coin1m.log", 'w')

        self.__lock = threading.Lock()

        self.__connect()
    

    def __connect(self):
        self.__connected = False
        self.__opened = False
        self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
                                        on_message = self.__on_message,
                                        on_close = self.__on_close,
                                        on_open = self.__on_open,
                                        on_error = self.__on_error)

        self.wst = threading.Thread(target=lambda: self.ws.run_forever())
        self.wst.daemon = True
        self.wst.start()


        # Candle Thread
        self._check_candle_thread = threading.Thread(
            target = self.__check_candle, args=("check_candle",)
        )
        self._check_candle_thread.daemon = True
        self._check_candle_thread.start()
        print("check candle thread start")
    

        

    def __on_open(self, ws):
        print("open")
        
        self.ws.send(json.dumps({
            "type": "subscribe",
            "channel": "btc_jpy-trades"
        }))        

        self.__opened = True

    def __on_error(self, ws, error):
        print("error")
        print(error)
        self.__reconnect()

    def __on_close(self, ws):
        print("close, why close")

    def __on_message(self, ws, message):
        #print(message)
        if self.__connected == False:
            self.__connected = True

        trade = self.__make_trade(message)
        #print(trade)
        if self.__init_flg == False:
            self.__init_candle_data(trade)
            self.__init_flg = True
        else:
            self.__update_candle_data(trade)

    def __candle_thread_terminate(self):
        print("__candle_thread_terminate invoked")
        self.__connected = True #Weil es durch Warten gestoppt werden kann
        time.sleep(1.5)


    def __exit(self):
        print("__exit invoked")
        self.ws.close()
        self.__candle_thread_terminate()
        

    def __reconnect(self):
        print("__reconnect invoked")
        self.__exit()
        time.sleep(2)
        self.__connect()

    def __make_trade(self, message):
        elements = message.split(',')
        trade = {
            "id": int(elements[0][1:len(elements[0])-1]),
            "price": float(elements[2][1:len(elements[2])-1]),
            "volume": float(elements[3][1:len(elements[3])-1]),
            "type": elements[4][1:len(elements[4])-2]
        }    
        return trade

    def __thread_lock(self):
        _count = 0
        while self.__lock.acquire(blocking=True, timeout=1) == False:
            _count += 1
            if _count > 3:
                print("lock acquite timeout")
                return False
        return True

    def __thread_unlock(self):
        try:
            self.__lock.release()
        except Exception as e:
            print("lock release a {}".format(e))
            return False
        return True

    def __format_candle(self, candle):
        dt = datetime.fromtimestamp(candle["timestamp"])
        s_str = "{0:%Y-%m-%d %H:%M:%S}".format(dt)
        fmt_str = "%s  %.1f  %.1f  %.1f  %.1f   %.6f   %.6f   %.6f" % (s_str, 
                                candle["open"],
                                candle["high"],
                                candle["low"],
                                candle["close"],
                                candle["volume"],
                                candle["buy"],
                                candle["sell"],
                                ) 
        return fmt_str

    def _write_log(self, candle):
        fmt_str = self.__format_candle(candle)
        fmt_str += '\r\n'        
        self.__f.write(fmt_str)
        self.__f.flush()

    def _print_log(self, candle):
        fmt_str = self.__format_candle(candle)
        print(fmt_str)
        

    def __init_candle_data(self, trade):
        _dt = datetime.now().replace(second=0, microsecond=0)
        _stamp = _dt.timestamp()
        self.__candle.append(
            {
                "timestamp": _stamp,
                "open": trade["price"],
                "high": trade["price"],
                "low": trade["price"],
                "close": trade["price"],
                "volume": trade["volume"],
                "buy": trade["volume"] if trade["type"] == 'buy' else 0,
                "sell": trade["volume"] if trade["type"] == 'sell' else 0,
            }
        )                

    def __update_candle_data(self, trade):
        last_candle = self.__candle[-1]        
        _dt = datetime.now().replace(second=0, microsecond=0)
        mark_ts = _dt.timestamp()
        last_ts = last_candle["timestamp"]
        if last_ts == mark_ts:
            print("append")
            last_candle["high"] = max(last_candle["high"], trade["price"])
            last_candle["low"] = min(last_candle["low"], trade["price"])
            last_candle["close"] = trade["price"]
            last_candle["volume"] += trade["volume"]
            last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
            last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0 
            
            self._print_log(last_candle)
        else:
            print("add new")
            self._write_log(last_candle)
            self.__candle.append(
                {
                    "timestamp": mark_ts,
                    "open": trade["price"],
                    "high": trade["price"],
                    "low": trade["price"],
                    "close": trade["price"],
                    "volume": trade["volume"],
                    "buy": trade["volume"] if trade["type"] == 'buy' else 0,
                    "sell": trade["volume"] if trade["type"] == 'sell' else 0,
                }
            )
    def get_candle(self, type=0):
        self.__thread_lock()
        if type == 0:
            candle = self.__candle[:-1]
        else:
            candle = self.__candle[:]
        self.__thread_unlock()
        return candle

    def __check_candle(self, args):
        _error_count = 0
        while True:
            if not self.__connected:
                _error_count += 1                
                print("wait 1 sec")
                time.sleep(1)

                if not self.__opened and _error_count > 3:
                    #print("nonono reconnect!!!")
                    self.ws.on_error = None #Vermeiden Sie es, zweimal angerufen zu werden
                    term_thread = threading.Thread(target=lambda: self.__reconnect()) #Sie können diesen Thread nur beenden, wenn Sie ihn in einem anderen Thread ausführen
                    term_thread.start()
                    break           

            else:
                break        
        
                
        _timer_count = 0
        while self.ws.sock and self.ws.sock.connected:
            
            time.sleep(1)
            if _timer_count < 30:
                print("wait until 30")
                _timer_count += 1
                continue

            print(">>>>>>check candle")


            self.__thread_lock()

            _dt = datetime.now().replace(second=0, microsecond=0)
            mark_ts = _dt.timestamp()
            last_candle = self.__candle[-1]
            last_ts = last_candle["timestamp"]
            #Nicht innerhalb von 1 Minute nach der aktuellen Zeit
            if last_ts != mark_ts:
                print("---->>>>>>>  new in check candle")
                self._write_log(last_candle)
                self.__candle.append(
                    {
                        "timestamp": mark_ts,
                        "open": last_candle["close"],
                        "high": last_candle["close"],
                        "low": last_candle["close"],
                        "close": last_candle["close"],
                        "volume": 0,
                        "buy": 0,
                        "sell": 0,
                    }
                )
            if len(self.__candle) > (CoinCheckWebSocket.MAX_CANDLE_LEN * 1.5):
                self.__candle = self.__candle[-CoinCheckWebSocket.MAX_CANDLE_LEN:]

            self.__thread_unlock()
            _timer_count = 0

        print("check candle end")

if __name__ == "__main__":    
    chs = CoinCheckWebSocket()

    while True:
        time.sleep(60)

Bei folgender Ausführung wird eine Datei mit dem Namen coin1m.log generiert und 1-Minuten-Daten werden darin geschrieben.

>python3 coincheck_websocket.py
>cat coin1m.log
2019-12-19 03:30:00  750169.0  750169.0  749685.0  749714.0   1.265700   0.000000   1.265700
2019-12-19 03:31:00  749685.0  750428.0  749685.0  750415.0   0.348400   0.169315   0.179085
2019-12-19 03:32:00  750481.0  750481.0  750152.0  750152.0   0.347950   0.050000   0.297950

Ich habe die ausführliche Erklärung zur Verarbeitung oben weggelassen, aber wenn Sie Fehler oder Fragen haben, zögern Sie bitte nicht, uns zu kontaktieren. Allerdings bin ich neu in Python. Und ** Investition ist Eigenverantwortung w **

Impressionen

Ich möchte einen expliziten Typ ヽ (´ ´ ω ・) ノ

Recommended Posts

Automatisches Zakuzaku, Bitcoin. Eine Geschichte über einen Python-Anfänger, der ein 1-Minuten-Diagramm für Münzprüfungen erstellt
Die Geschichte, mit Python eine Hanon-ähnliche Partitur zu machen
Eine Geschichte über einen Amateur, der mit Python (Kivy) einen Blockbruch macht ②
Eine Geschichte über einen Amateur, der mit Python (Kivy) einen Blockbruch macht ①
Eine Geschichte über einen Python-Anfänger, der mit dem No-Modul'http.server 'feststeckt.
Eine Geschichte über Python Pop und Append
Eine Geschichte über das Ausführen von Python auf PHP auf Heroku
Eine Geschichte über das Ändern von Python und das Hinzufügen von Funktionen
Eine Geschichte über einen Python-Anfänger, der versucht, Google-Suchergebnisse mithilfe der API abzurufen
Eine Geschichte über das Ausprobieren eines (Golang +) Python-Monorepo mit Bazel
Eine Geschichte über einen Linux-Anfänger, der in einer Woche LPIC101 bestanden hat
Eine Geschichte über einen Linux-Anfänger, der Linux auf ein Windows-Tablet bringt
Eine Geschichte über einen Python-Anfänger, der von ModuleNotFoundError vernichtet werden sollte: Kein Modul namens 'Wewey'
Eine Geschichte darüber, wie man einen relativen Pfad in Python angibt.
Eine Geschichte über das zufällige Erstellen eines kurzen Songs mit Sudachi Py
Eine Geschichte über den Versuch, private Variablen in Python zu implementieren.
Eine Geschichte über einen GCP-Anfänger, der versucht, mit GCE einen Micra-Server aufzubauen