Ab Letztes Mal werden wir die von au Kabucom Securities für Personen aus Python bereitgestellte Kabu-Station-API verwenden. Dieses Mal wird die Marke von Websocket in Python geliefert. Gleichzeitig werden wir den Code zum Registrieren, Abmelden und Abmelden aller Probleme einführen.
import json
import requests
import yaml
# ---
def get_token():
with open('auth.yaml', 'r') as yml:
auth = yaml.safe_load(yml)
url = 'http://localhost:18080/kabusapi/token'
headers = {'content-type': 'application/json'}
payload = json.dumps(
{'APIPassword': auth['PASS'],}
).encode('utf8')
response = requests.post(url, data=payload, headers=headers)
return json.loads(response.text)['Token']
# ---
token = get_token()
EXCHANGES = {
1: 'TSE',
3: 'Namenszertifikat',
5: 'Vermögen',
6: 'Rechnungszertifikat',
}
payload = json.dumps({
'Symbols': [
{'Symbol': 8306 ,'Exchange': 1}, # MUFG
{'Symbol': 9433 ,'Exchange': 1}, # KDDI
# ...Bis zu 50 können registriert werden
],}).encode('utf8')
url = 'http://localhost:18080/kabusapi/register'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, payload, headers=headers)
regist_list = json.loads(response.text)
print('Lieferregistrierungsmarke')
for regist in regist_list['RegistList']:
print("{} {}".format(
regist['Symbol'],
EXCHANGES[regist['Exchange']]))
Ändern Sie nur die URL. Die Anzeigeeinheit kann umgeleitet werden.
url = 'http://localhost:18080/kabusapi/unregister'
"Nutzlast" wird nicht mehr benötigt.
url = 'http://localhost:18080/kabusapi/unregister/all'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, headers=headers)
Erhalten Sie weiterhin den Vertrieb registrierter Marken. Beenden Sie mit Strg + C. Beachten Sie, dass ".text" nicht zu "response" hinzugefügt wird, nämlich "json.loads".
import asyncio
import json
import websockets
# ---
async def stream():
uri = 'ws://localhost:18080/kabusapi/websocket'
async with websockets.connect(uri, ping_timeout=None) as ws:
while not ws.closed:
response = await ws.recv()
board = json.loads(response)
print("{} {} {}".format(
board['Symbol'],
board['SymbolName'],
board['CurrentPrice'],
))
loop = asyncio.get_event_loop()
loop.create_task(stream())
try:
loop.run_forever()
except KeyboardInterrupt:
exit()
Da die Serverseite keinen Heartbeat implementiert, ist das Argument "ping_timeout = None" in "websockets.connect" erforderlich.
[Anfrage] WebSocket Ping / Pong-Unterstützung Issue#8 https://github.com/kabucom/kabusapi/issues/8
Recommended Posts