Depuis que le trading automatique de transactions en monnaie virtuelle (pièces de monnaie) est devenu monnaie courante, les débutants en Python génèrent eux-mêmes des barres d'une minute afin de négocier automatiquement avec des chèques de pièces. Je n'ai pas pu obtenir 1 minute avec l'API publiée par Coin Check, alors je l'ai faite moi-même. Après avoir brièvement expliqué la logique de base, je publierai l'intégralité du code source et les résultats de l'exécution
Vérifiez l 'API fournie par Coincheck. Coincheck fournit une API par REST, mais certaines API utilisant WebSocket sont également fournies. Actuellement, seul WebSocket peut obtenir
Alors, utilisez ** l'historique des transactions ** pour créer une barre d'une minute. En regardant comment utiliser l'historique des transactions,
REQUEST
{
"type": "subscribe",
"channel": "[pair]-trades"
}
Est le format de la demande.
RESPONSE
[
"ID",
"Paire de négociation",
"Taux de commande",
"Quantité sur commande",
"Comment commander"
]
Peut être confirmé que c'est le format de réponse. Cette fois, nous allons faire une barre de 1 minute de pièce de monnaie, donc [paire] est btc_jpy
, alors utilisez" btc_jpn-trades "pour spécifier la demande.
De plus, pour utiliser websocket avec python, installez websocket-client avec pip.
Pour créer une barre d'une minute, suivez essentiellement la procédure ci-dessous.
Vous trouverez ci-dessous un fragment de code qui implémente cette procédure
def __update_candle_data(self, trade):
last_candle = self.__candle[-1]
_dt = datetime.now().replace(second=0, microsecond=0) #Réglez la seconde et la microseconde à 0 pour faire 1 minute
mark_ts = _dt.timestamp()
last_ts = last_candle["timestamp"]
if last_ts == mark_ts: #Mettre à jour dans 1 minute
last_candle["high"] = max(last_candle["high"], trade["price"])
last_candle["low"] = min(last_candle["low"], trade["price"])
last_candle["close"] = trade["price"]
last_candle["volume"] += trade["volume"]
last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0
else: #Ajouter une nouvelle étape si elle est en dehors de la plage de 1 minute
self.__candle.append(
{
"timestamp": mark_ts,
"open": trade["price"],
"high": trade["price"],
"low": trade["price"],
"close": trade["price"],
"volume": trade["volume"],
"buy": trade["volume"] if trade["type"] == 'buy' else 0,
"sell": trade["volume"] if trade["type"] == 'sell' else 0,
}
)
Cette fonction est appelée à chaque fois que l'historique des transactions est reçu par WebSocket. Définissez le rappel pour recevoir l'historique des transactions. Ci-dessous un fragment de code
self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
on_message = self.__on_message, #Rappel pour recevoir les informations de transaction
on_close = self.__on_close,
on_open = self.__on_open, #Rappel appelé immédiatement après l'ouverture de WebSocket
on_error = self.__on_error)
def __on_open(self, ws):
self.ws.send(json.dumps({ #Envoyez la paire de devises que vous souhaitez recevoir dans cette méthode
"type": "subscribe",
"channel": "btc_jpy-trades"
}))
def __on_message(self, ws, message):
if self.__connected == False:
self.__connected = True
trade = self.__make_trade(message)
if self.__init_flg == False:
self.__init_candle_data(trade)
self.__init_flg = True
else:
self.__update_candle_data(trade) #Mettez à jour les données de pied ici
Avec la mise en œuvre jusqu'à présent, des données d'une minute peuvent être créées, mais sans historique des transactions, même des données d'une minute ne peuvent pas être créées. Par conséquent, s'il n'y a pas de transaction pendant 1 minute, les données seront perdues pendant 1 minute. Par conséquent, vérifiez les données toutes les 30 secondes pour éviter toute perte. Voici le fragment de code. Puisque ce processus doit être effectué régulièrement, il est exécuté dans un thread séparé.
while self.ws.sock and self.ws.sock.connected:
time.sleep(1)
if _timer_count < 30: #Attendez 30 secondes
print("wait until 30")
_timer_count += 1
continue
self.__thread_lock()
_dt = datetime.now().replace(second=0, microsecond=0)
mark_ts = _dt.timestamp()
last_candle = self.__candle[-1]
last_ts = last_candle["timestamp"]
#Pas à moins d'une minute de l'heure actuelle
if last_ts != mark_ts:
self.__candle.append( #Créez de nouvelles données d'une minute en utilisant les dernières données d'une minute
{
"timestamp": mark_ts,
"open": last_candle["close"],
"high": last_candle["close"],
"low": last_candle["close"],
"close": last_candle["close"],
"volume": 0,
"buy": 0,
"sell": 0,
}
)
self.__thread_unlock()
_timer_count = 0
Lorsque je l'essaye, le contrôle des pièces WebSocket est souvent déconnecté. Lorsqu'il est déconnecté, ʻon_error est appelé avant cela. De plus, en observant le moment de l'expiration à plusieurs reprises, il a été constaté que ʻon_open
ne peut pas être appelé en premier lieu et qu'il peut expirer tel quel. C'est un problème que vous ne pouvez pas faire un pied de 1 minute dès qu'il est coupé, donc si vous êtes déconnecté, vous voulez vous reconnecter avec les données de pied créées jusqu'à présent. De plus, si ʻon_open` n'est pas appelé, il sera coupé à 100%, donc j'aimerais essayer de me reconnecter dans les 3 secondes. Alors, ajoutez le traitement suivant
def __on_error(self, ws, error):
self.__reconnect() #Reconnectez-vous si une erreur se produit
def __candle_thread_terminate(self):
self.__connected = True #Si le thread d'achèvement de données est dans une boucle occupée en attente d'une connexion, quittez la boucle.
time.sleep(1.5)
def __exit(self):
self.ws.close() #Déconnecter WebSocket
self.__candle_thread_terminate() #Terminer le thread pour l'achèvement des données
def __reconnect(self):
self.__exit() #Déconnecter explicitement
time.sleep(2)
self.__connect() #Connectez-vous avec Coincheck WebSocket
#Méthode exécutée par le thread de complétion de données
def __check_candle(self, args):
_error_count = 0
while True:
if not self.__connected: #La connexion WebSocket n'est pas établie(on_le message n'est jamais appelé)Si attendre
_error_count += 1
self._logger.debug("wait 1 sec")
time.sleep(1)
if not self.__opened and _error_count > 3: #Même si vous attendez 3 secondes_Se reconnecter si open n'est pas appelé
self.ws.on_error = None #Allumé lors de la reconnexion_Empêcher l'erreur d'être appelée et de se connecter deux fois
term_thread = threading.Thread(target=lambda: self.__reconnect()) #Vous ne pouvez pas terminer ce fil à moins de le faire dans un autre fil
term_thread.start()
break
else:
break
Fondamentalement, il traite selon le commentaire dans le code source, mais si vous ne pouvez pas vous connecter même après avoir attendu 3 secondes, si vous appelez self .__ reconnect ()
directement à partir du thread pour la complétion des données, ce thread J'étais accro au fait que je devais l'exécuter dans un thread séparé parce que je ne pouvais pas me terminer. Si vous ne l'exécutez pas dans un autre thread, le nombre de threads d'achèvement de données doublera à chaque fois que vous serez déconnecté, ce qui est une situation terrifiante.
coincheck_websocket
import websocket
import json
import threading
import time
from datetime import datetime
class CoinCheckWebSocket:
URL = "wss://ws-api.coincheck.com/"
MAX_CANDLE_LEN = 24 * 60
def __init__(self):
self.__candle = []
self.__init_flg = False
self.__f = open("coin1m.log", 'w')
self.__lock = threading.Lock()
self.__connect()
def __connect(self):
self.__connected = False
self.__opened = False
self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
on_message = self.__on_message,
on_close = self.__on_close,
on_open = self.__on_open,
on_error = self.__on_error)
self.wst = threading.Thread(target=lambda: self.ws.run_forever())
self.wst.daemon = True
self.wst.start()
# Candle Thread
self._check_candle_thread = threading.Thread(
target = self.__check_candle, args=("check_candle",)
)
self._check_candle_thread.daemon = True
self._check_candle_thread.start()
print("check candle thread start")
def __on_open(self, ws):
print("open")
self.ws.send(json.dumps({
"type": "subscribe",
"channel": "btc_jpy-trades"
}))
self.__opened = True
def __on_error(self, ws, error):
print("error")
print(error)
self.__reconnect()
def __on_close(self, ws):
print("close, why close")
def __on_message(self, ws, message):
#print(message)
if self.__connected == False:
self.__connected = True
trade = self.__make_trade(message)
#print(trade)
if self.__init_flg == False:
self.__init_candle_data(trade)
self.__init_flg = True
else:
self.__update_candle_data(trade)
def __candle_thread_terminate(self):
print("__candle_thread_terminate invoked")
self.__connected = True #Parce qu'il peut être arrêté par l'attente
time.sleep(1.5)
def __exit(self):
print("__exit invoked")
self.ws.close()
self.__candle_thread_terminate()
def __reconnect(self):
print("__reconnect invoked")
self.__exit()
time.sleep(2)
self.__connect()
def __make_trade(self, message):
elements = message.split(',')
trade = {
"id": int(elements[0][1:len(elements[0])-1]),
"price": float(elements[2][1:len(elements[2])-1]),
"volume": float(elements[3][1:len(elements[3])-1]),
"type": elements[4][1:len(elements[4])-2]
}
return trade
def __thread_lock(self):
_count = 0
while self.__lock.acquire(blocking=True, timeout=1) == False:
_count += 1
if _count > 3:
print("lock acquite timeout")
return False
return True
def __thread_unlock(self):
try:
self.__lock.release()
except Exception as e:
print("lock release a {}".format(e))
return False
return True
def __format_candle(self, candle):
dt = datetime.fromtimestamp(candle["timestamp"])
s_str = "{0:%Y-%m-%d %H:%M:%S}".format(dt)
fmt_str = "%s %.1f %.1f %.1f %.1f %.6f %.6f %.6f" % (s_str,
candle["open"],
candle["high"],
candle["low"],
candle["close"],
candle["volume"],
candle["buy"],
candle["sell"],
)
return fmt_str
def _write_log(self, candle):
fmt_str = self.__format_candle(candle)
fmt_str += '\r\n'
self.__f.write(fmt_str)
self.__f.flush()
def _print_log(self, candle):
fmt_str = self.__format_candle(candle)
print(fmt_str)
def __init_candle_data(self, trade):
_dt = datetime.now().replace(second=0, microsecond=0)
_stamp = _dt.timestamp()
self.__candle.append(
{
"timestamp": _stamp,
"open": trade["price"],
"high": trade["price"],
"low": trade["price"],
"close": trade["price"],
"volume": trade["volume"],
"buy": trade["volume"] if trade["type"] == 'buy' else 0,
"sell": trade["volume"] if trade["type"] == 'sell' else 0,
}
)
def __update_candle_data(self, trade):
last_candle = self.__candle[-1]
_dt = datetime.now().replace(second=0, microsecond=0)
mark_ts = _dt.timestamp()
last_ts = last_candle["timestamp"]
if last_ts == mark_ts:
print("append")
last_candle["high"] = max(last_candle["high"], trade["price"])
last_candle["low"] = min(last_candle["low"], trade["price"])
last_candle["close"] = trade["price"]
last_candle["volume"] += trade["volume"]
last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0
self._print_log(last_candle)
else:
print("add new")
self._write_log(last_candle)
self.__candle.append(
{
"timestamp": mark_ts,
"open": trade["price"],
"high": trade["price"],
"low": trade["price"],
"close": trade["price"],
"volume": trade["volume"],
"buy": trade["volume"] if trade["type"] == 'buy' else 0,
"sell": trade["volume"] if trade["type"] == 'sell' else 0,
}
)
def get_candle(self, type=0):
self.__thread_lock()
if type == 0:
candle = self.__candle[:-1]
else:
candle = self.__candle[:]
self.__thread_unlock()
return candle
def __check_candle(self, args):
_error_count = 0
while True:
if not self.__connected:
_error_count += 1
print("wait 1 sec")
time.sleep(1)
if not self.__opened and _error_count > 3:
#print("nonono reconnect!!!")
self.ws.on_error = None #Évitez d'être appelé deux fois
term_thread = threading.Thread(target=lambda: self.__reconnect()) #Vous ne pouvez pas terminer ce fil à moins de le faire dans un autre fil
term_thread.start()
break
else:
break
_timer_count = 0
while self.ws.sock and self.ws.sock.connected:
time.sleep(1)
if _timer_count < 30:
print("wait until 30")
_timer_count += 1
continue
print(">>>>>>check candle")
self.__thread_lock()
_dt = datetime.now().replace(second=0, microsecond=0)
mark_ts = _dt.timestamp()
last_candle = self.__candle[-1]
last_ts = last_candle["timestamp"]
#Pas à moins d'une minute de l'heure actuelle
if last_ts != mark_ts:
print("---->>>>>>> new in check candle")
self._write_log(last_candle)
self.__candle.append(
{
"timestamp": mark_ts,
"open": last_candle["close"],
"high": last_candle["close"],
"low": last_candle["close"],
"close": last_candle["close"],
"volume": 0,
"buy": 0,
"sell": 0,
}
)
if len(self.__candle) > (CoinCheckWebSocket.MAX_CANDLE_LEN * 1.5):
self.__candle = self.__candle[-CoinCheckWebSocket.MAX_CANDLE_LEN:]
self.__thread_unlock()
_timer_count = 0
print("check candle end")
if __name__ == "__main__":
chs = CoinCheckWebSocket()
while True:
time.sleep(60)
Lorsqu'il est exécuté comme suit, un fichier appelé coin1m.log est généré et des données d'une minute y sont écrites.
>python3 coincheck_websocket.py
>cat coin1m.log
2019-12-19 03:30:00 750169.0 750169.0 749685.0 749714.0 1.265700 0.000000 1.265700
2019-12-19 03:31:00 749685.0 750428.0 749685.0 750415.0 0.348400 0.169315 0.179085
2019-12-19 03:32:00 750481.0 750481.0 750152.0 750152.0 0.347950 0.050000 0.297950
J'ai omis l'explication détaillée du traitement ci-dessus, mais si vous avez des erreurs ou des questions, n'hésitez pas à nous contacter. Cependant, ** je suis nouveau sur Python **. Et ** l'investissement est une responsabilité personnelle w **
Je veux un type explicite ヽ (´ ・ ω ・) ノ
Recommended Posts