Da der automatische Handel mit virtuellen Währungstransaktionen (Bitmünzen) an der Tagesordnung ist, generieren Python-Anfänger ihre eigenen 1-Minuten-Balken, um automatisch mit Münzschecks zu handeln. Ich konnte mit der von Coin Check veröffentlichten API keine Minute bekommen, also habe ich es selbst gemacht. Nachdem ich die Kernlogik kurz erklärt habe, werde ich den gesamten Quellcode und die Ausführungsergebnisse veröffentlichen
Überprüfen Sie die von Coincheck bereitgestellte API. Coincheck bietet API per REST, aber auch einige APIs, die WebSocket verwenden, werden bereitgestellt. Derzeit kann nur WebSocket erhalten
Verwenden Sie also ** Transaktionsverlauf **, um einen 1-Minuten-Balken zu erstellen. Betrachten Sie die Verwendung des Transaktionsverlaufs.
REQUEST
{
"type": "subscribe",
"channel": "[pair]-trades"
}
Ist das Anforderungsformat.
RESPONSE
[
"ID",
"Handelspaar",
"Bestellpreis",
"Menge auf Bestellung",
"Wie man bestellt"
]
Kann bestätigt werden, dass dies das Antwortformat ist. Da wir diesmal einen 1-Minuten-Balken mit Bitcoin erstellen, ist [pair] "btc_jpy". Verwenden Sie daher "btc_jpn-trades", um die Anforderung anzugeben. Um Websocket mit Python zu verwenden, installieren Sie den Websocket-Client mit pip.
Gehen Sie wie folgt vor, um einen 1-Minuten-Balken zu erstellen.
Unten finden Sie ein Codefragment, das diese Prozedur implementiert
def __update_candle_data(self, trade):
last_candle = self.__candle[-1]
_dt = datetime.now().replace(second=0, microsecond=0) #Stellen Sie Sekunde und Mikrosekunde auf 0, um 1 Minute zu machen
mark_ts = _dt.timestamp()
last_ts = last_candle["timestamp"]
if last_ts == mark_ts: #Update innerhalb von 1 Minute
last_candle["high"] = max(last_candle["high"], trade["price"])
last_candle["low"] = min(last_candle["low"], trade["price"])
last_candle["close"] = trade["price"]
last_candle["volume"] += trade["volume"]
last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0
else: #Fügen Sie ein neues Bein hinzu, wenn es außerhalb des Bereichs von 1 Minute liegt
self.__candle.append(
{
"timestamp": mark_ts,
"open": trade["price"],
"high": trade["price"],
"low": trade["price"],
"close": trade["price"],
"volume": trade["volume"],
"buy": trade["volume"] if trade["type"] == 'buy' else 0,
"sell": trade["volume"] if trade["type"] == 'sell' else 0,
}
)
Diese Funktion wird jedes Mal aufgerufen, wenn der Transaktionsverlauf von WebSocket empfangen wird. Stellen Sie den Rückruf ein, um den Transaktionsverlauf zu erhalten. Unten ist ein Codefragment
self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
on_message = self.__on_message, #Rückruf zum Empfang von Transaktionsinformationen
on_close = self.__on_close,
on_open = self.__on_open, #Rückruf wird sofort nach dem Öffnen von WebSocket aufgerufen
on_error = self.__on_error)
def __on_open(self, ws):
self.ws.send(json.dumps({ #Senden Sie das Währungspaar, das Sie innerhalb dieser Methode erhalten möchten
"type": "subscribe",
"channel": "btc_jpy-trades"
}))
def __on_message(self, ws, message):
if self.__connected == False:
self.__connected = True
trade = self.__make_trade(message)
if self.__init_flg == False:
self.__init_candle_data(trade)
self.__init_flg = True
else:
self.__update_candle_data(trade) #Fußdaten hier aktualisieren
Mit der bisherigen Implementierung können grundsätzlich 1-Minuten-Daten erstellt werden, aber ohne Transaktionsverlauf können nicht einmal 1-Minuten-Daten erstellt werden. Wenn 1 Minute lang keine Transaktion stattfindet, gehen die Daten 1 Minute lang verloren. Überprüfen Sie daher die Daten alle 30 Sekunden, um einen Verlust zu vermeiden. Unten ist das Codefragment. Da dieser Prozess regelmäßig ausgeführt werden muss, wird er in einem separaten Thread ausgeführt.
while self.ws.sock and self.ws.sock.connected:
time.sleep(1)
if _timer_count < 30: #Warten Sie 30 Sekunden
print("wait until 30")
_timer_count += 1
continue
self.__thread_lock()
_dt = datetime.now().replace(second=0, microsecond=0)
mark_ts = _dt.timestamp()
last_candle = self.__candle[-1]
last_ts = last_candle["timestamp"]
#Nicht innerhalb von 1 Minute nach der aktuellen Zeit
if last_ts != mark_ts:
self.__candle.append( #Erstellen Sie neue 1-Minuten-Daten mit den letzten 1-Minuten-Daten
{
"timestamp": mark_ts,
"open": last_candle["close"],
"high": last_candle["close"],
"low": last_candle["close"],
"close": last_candle["close"],
"volume": 0,
"buy": 0,
"sell": 0,
}
)
self.__thread_unlock()
_timer_count = 0
Wenn ich es tatsächlich versuche, wird die Münzprüfung WebSocket oft getrennt. Wenn die Verbindung getrennt ist, wird vorher "on_error" aufgerufen. Bei mehrmaliger Beobachtung des Ablaufzeitpunkts wurde außerdem festgestellt, dass "on_open" möglicherweise nicht an erster Stelle aufgerufen wird und möglicherweise eine Zeitüberschreitung aufweist. Es ist ein Problem, dass Sie keinen 1-Minuten-Fuß machen können, sobald er abgeschnitten ist. Wenn Sie also nicht verbunden sind, möchten Sie die Verbindung zu den bisher erstellten Fußdaten wieder herstellen. Wenn on_open
nicht aufgerufen wird, wird es zu 100% abgeschnitten, daher möchte ich versuchen, die Verbindung innerhalb von 3 Sekunden wieder herzustellen. Fügen Sie also die folgende Verarbeitung hinzu
def __on_error(self, ws, error):
self.__reconnect() #Stellen Sie die Verbindung erneut her, wenn ein Fehler auftritt
def __candle_thread_terminate(self):
self.__connected = True #Wenn sich der Datenvervollständigungsthread in einer Besetztschleife befindet und auf eine Verbindung wartet, beenden Sie die Schleife.
time.sleep(1.5)
def __exit(self):
self.ws.close() #Trennen Sie WebSocket
self.__candle_thread_terminate() #Beenden Sie den Thread zur Datenvervollständigung
def __reconnect(self):
self.__exit() #Explizit trennen
time.sleep(2)
self.__connect() #Stellen Sie eine Verbindung mit Coincheck WebSocket her
#Methode, die vom Datenvervollständigungsthread ausgeführt wird
def __check_candle(self, args):
_error_count = 0
while True:
if not self.__connected: #Die WebSocket-Verbindung wurde nicht hergestellt(on_Nachricht wird nie aufgerufen)Wenn warten
_error_count += 1
self._logger.debug("wait 1 sec")
time.sleep(1)
if not self.__opened and _error_count > 3: #Auch wenn Sie 3 Sekunden warten_Stellen Sie die Verbindung wieder her, wenn open nicht aufgerufen wird
self.ws.on_error = None #Ein beim erneuten Verbinden_Verhindern Sie, dass Fehler aufgerufen werden und zweimal eine Verbindung hergestellt wird
term_thread = threading.Thread(target=lambda: self.__reconnect()) #Sie können diesen Thread nur beenden, wenn Sie ihn in einem anderen Thread ausführen
term_thread.start()
break
else:
break
Grundsätzlich wird es gemäß dem Kommentar im Quellcode verarbeitet, aber wenn Sie auch nach 3 Sekunden Wartezeit keine Verbindung herstellen können, wenn Sie "self .__ reconnect ()" direkt vom Thread zur Datenvervollständigung aufrufen, wird dieser Thread Ich war süchtig nach der Tatsache, dass ich es in einem separaten Thread ausführen musste, weil ich mich nicht selbst beenden konnte. Wenn Sie es nicht in einem anderen Thread ausführen, verdoppelt sich die Anzahl der Datenvervollständigungsthreads jedes Mal, wenn Sie die Verbindung trennen. Dies ist eine schreckliche Situation.
coincheck_websocket
import websocket
import json
import threading
import time
from datetime import datetime
class CoinCheckWebSocket:
URL = "wss://ws-api.coincheck.com/"
MAX_CANDLE_LEN = 24 * 60
def __init__(self):
self.__candle = []
self.__init_flg = False
self.__f = open("coin1m.log", 'w')
self.__lock = threading.Lock()
self.__connect()
def __connect(self):
self.__connected = False
self.__opened = False
self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
on_message = self.__on_message,
on_close = self.__on_close,
on_open = self.__on_open,
on_error = self.__on_error)
self.wst = threading.Thread(target=lambda: self.ws.run_forever())
self.wst.daemon = True
self.wst.start()
# Candle Thread
self._check_candle_thread = threading.Thread(
target = self.__check_candle, args=("check_candle",)
)
self._check_candle_thread.daemon = True
self._check_candle_thread.start()
print("check candle thread start")
def __on_open(self, ws):
print("open")
self.ws.send(json.dumps({
"type": "subscribe",
"channel": "btc_jpy-trades"
}))
self.__opened = True
def __on_error(self, ws, error):
print("error")
print(error)
self.__reconnect()
def __on_close(self, ws):
print("close, why close")
def __on_message(self, ws, message):
#print(message)
if self.__connected == False:
self.__connected = True
trade = self.__make_trade(message)
#print(trade)
if self.__init_flg == False:
self.__init_candle_data(trade)
self.__init_flg = True
else:
self.__update_candle_data(trade)
def __candle_thread_terminate(self):
print("__candle_thread_terminate invoked")
self.__connected = True #Weil es durch Warten gestoppt werden kann
time.sleep(1.5)
def __exit(self):
print("__exit invoked")
self.ws.close()
self.__candle_thread_terminate()
def __reconnect(self):
print("__reconnect invoked")
self.__exit()
time.sleep(2)
self.__connect()
def __make_trade(self, message):
elements = message.split(',')
trade = {
"id": int(elements[0][1:len(elements[0])-1]),
"price": float(elements[2][1:len(elements[2])-1]),
"volume": float(elements[3][1:len(elements[3])-1]),
"type": elements[4][1:len(elements[4])-2]
}
return trade
def __thread_lock(self):
_count = 0
while self.__lock.acquire(blocking=True, timeout=1) == False:
_count += 1
if _count > 3:
print("lock acquite timeout")
return False
return True
def __thread_unlock(self):
try:
self.__lock.release()
except Exception as e:
print("lock release a {}".format(e))
return False
return True
def __format_candle(self, candle):
dt = datetime.fromtimestamp(candle["timestamp"])
s_str = "{0:%Y-%m-%d %H:%M:%S}".format(dt)
fmt_str = "%s %.1f %.1f %.1f %.1f %.6f %.6f %.6f" % (s_str,
candle["open"],
candle["high"],
candle["low"],
candle["close"],
candle["volume"],
candle["buy"],
candle["sell"],
)
return fmt_str
def _write_log(self, candle):
fmt_str = self.__format_candle(candle)
fmt_str += '\r\n'
self.__f.write(fmt_str)
self.__f.flush()
def _print_log(self, candle):
fmt_str = self.__format_candle(candle)
print(fmt_str)
def __init_candle_data(self, trade):
_dt = datetime.now().replace(second=0, microsecond=0)
_stamp = _dt.timestamp()
self.__candle.append(
{
"timestamp": _stamp,
"open": trade["price"],
"high": trade["price"],
"low": trade["price"],
"close": trade["price"],
"volume": trade["volume"],
"buy": trade["volume"] if trade["type"] == 'buy' else 0,
"sell": trade["volume"] if trade["type"] == 'sell' else 0,
}
)
def __update_candle_data(self, trade):
last_candle = self.__candle[-1]
_dt = datetime.now().replace(second=0, microsecond=0)
mark_ts = _dt.timestamp()
last_ts = last_candle["timestamp"]
if last_ts == mark_ts:
print("append")
last_candle["high"] = max(last_candle["high"], trade["price"])
last_candle["low"] = min(last_candle["low"], trade["price"])
last_candle["close"] = trade["price"]
last_candle["volume"] += trade["volume"]
last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0
self._print_log(last_candle)
else:
print("add new")
self._write_log(last_candle)
self.__candle.append(
{
"timestamp": mark_ts,
"open": trade["price"],
"high": trade["price"],
"low": trade["price"],
"close": trade["price"],
"volume": trade["volume"],
"buy": trade["volume"] if trade["type"] == 'buy' else 0,
"sell": trade["volume"] if trade["type"] == 'sell' else 0,
}
)
def get_candle(self, type=0):
self.__thread_lock()
if type == 0:
candle = self.__candle[:-1]
else:
candle = self.__candle[:]
self.__thread_unlock()
return candle
def __check_candle(self, args):
_error_count = 0
while True:
if not self.__connected:
_error_count += 1
print("wait 1 sec")
time.sleep(1)
if not self.__opened and _error_count > 3:
#print("nonono reconnect!!!")
self.ws.on_error = None #Vermeiden Sie es, zweimal angerufen zu werden
term_thread = threading.Thread(target=lambda: self.__reconnect()) #Sie können diesen Thread nur beenden, wenn Sie ihn in einem anderen Thread ausführen
term_thread.start()
break
else:
break
_timer_count = 0
while self.ws.sock and self.ws.sock.connected:
time.sleep(1)
if _timer_count < 30:
print("wait until 30")
_timer_count += 1
continue
print(">>>>>>check candle")
self.__thread_lock()
_dt = datetime.now().replace(second=0, microsecond=0)
mark_ts = _dt.timestamp()
last_candle = self.__candle[-1]
last_ts = last_candle["timestamp"]
#Nicht innerhalb von 1 Minute nach der aktuellen Zeit
if last_ts != mark_ts:
print("---->>>>>>> new in check candle")
self._write_log(last_candle)
self.__candle.append(
{
"timestamp": mark_ts,
"open": last_candle["close"],
"high": last_candle["close"],
"low": last_candle["close"],
"close": last_candle["close"],
"volume": 0,
"buy": 0,
"sell": 0,
}
)
if len(self.__candle) > (CoinCheckWebSocket.MAX_CANDLE_LEN * 1.5):
self.__candle = self.__candle[-CoinCheckWebSocket.MAX_CANDLE_LEN:]
self.__thread_unlock()
_timer_count = 0
print("check candle end")
if __name__ == "__main__":
chs = CoinCheckWebSocket()
while True:
time.sleep(60)
Bei folgender Ausführung wird eine Datei mit dem Namen coin1m.log generiert und 1-Minuten-Daten werden darin geschrieben.
>python3 coincheck_websocket.py
>cat coin1m.log
2019-12-19 03:30:00 750169.0 750169.0 749685.0 749714.0 1.265700 0.000000 1.265700
2019-12-19 03:31:00 749685.0 750428.0 749685.0 750415.0 0.348400 0.169315 0.179085
2019-12-19 03:32:00 750481.0 750481.0 750152.0 750152.0 0.347950 0.050000 0.297950
Ich habe die ausführliche Erklärung zur Verarbeitung oben weggelassen, aber wenn Sie Fehler oder Fragen haben, zögern Sie bitte nicht, uns zu kontaktieren. Allerdings bin ich neu in Python. Und ** Investition ist Eigenverantwortung w **
Ich möchte einen expliziten Typ ヽ (´ ´ ω ・) ノ
Recommended Posts