Erhalten Sie einen Websocket der kabu station ® API in Python

Überblick

Ab Letztes Mal werden wir die von au Kabucom Securities für Personen aus Python bereitgestellte Kabu-Station-API verwenden. Dieses Mal wird die Marke von Websocket in Python geliefert. Gleichzeitig werden wir den Code zum Registrieren, Abmelden und Abmelden aller Probleme einführen.

Umgebung

Zusätzliche Pakete

Code

Markenregistrierung

import json
import requests
import yaml

# ---

def get_token():
    with open('auth.yaml', 'r') as yml:
        auth = yaml.safe_load(yml)

    url = 'http://localhost:18080/kabusapi/token'
    headers = {'content-type': 'application/json'}
    payload = json.dumps(
        {'APIPassword': auth['PASS'],}
        ).encode('utf8')

    response = requests.post(url, data=payload, headers=headers)

    return json.loads(response.text)['Token']

# ---

token = get_token()

EXCHANGES = {
    1: 'TSE',
    3: 'Namenszertifikat',
    5: 'Vermögen',
    6: 'Rechnungszertifikat',
}

payload = json.dumps({
    'Symbols': [
        {'Symbol': 8306 ,'Exchange': 1},  # MUFG
        {'Symbol': 9433 ,'Exchange': 1},  # KDDI
        # ...Bis zu 50 können registriert werden
    ],}).encode('utf8')

url = 'http://localhost:18080/kabusapi/register'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, payload, headers=headers)

regist_list = json.loads(response.text)

print('Lieferregistrierungsmarke')
for regist in regist_list['RegistList']:
    print("{} {}".format(
        regist['Symbol'],
        EXCHANGES[regist['Exchange']]))

Stornierung der Markenregistrierung

Ändern Sie nur die URL. Die Anzeigeeinheit kann umgeleitet werden.

url = 'http://localhost:18080/kabusapi/unregister'

Alle registrierten Bestände stornieren

"Nutzlast" wird nicht mehr benötigt.

url = 'http://localhost:18080/kabusapi/unregister/all'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, headers=headers)

Lieferanzeige

Erhalten Sie weiterhin den Vertrieb registrierter Marken. Beenden Sie mit Strg + C. Beachten Sie, dass ".text" nicht zu "response" hinzugefügt wird, nämlich "json.loads".

import asyncio
import json
import websockets

# ---

async def stream():
    uri = 'ws://localhost:18080/kabusapi/websocket'

    async with websockets.connect(uri, ping_timeout=None) as ws:
        while not ws.closed:
            response = await ws.recv()
            board = json.loads(response)
            print("{} {} {}".format(
                board['Symbol'],
                board['SymbolName'],
                board['CurrentPrice'],
            ))

loop = asyncio.get_event_loop()
loop.create_task(stream())
try:
    loop.run_forever()
except KeyboardInterrupt:
    exit()

Da die Serverseite keinen Heartbeat implementiert, ist das Argument "ping_timeout = None" in "websockets.connect" erforderlich.

[Anfrage] WebSocket Ping / Pong-Unterstützung   Issue#8 https://github.com/kabucom/kabusapi/issues/8

Recommended Posts

Erhalten Sie einen Websocket der kabu station ® API in Python
Verwenden Sie die kabu Station® API von Python
Evernote-API in Python
C-API in Python 3
Versuchen Sie es mit der Kabu Station API von au Kabucom Securities
kabu Station® API - Aktualisierter Python-Wrapper für die PUSH-API
Hit Mastodons API in Python
Empfangen Sie Laufzeitargumente in Python 2.7
Objektäquivalenzbeurteilung in Python
Blender Python API in Houdini (Python 3)
Implementierung der schnellen Sortierung in Python
Berühren wir die API der Netatmo Weather Station mit Python. #Python #Netatmo
Bildpixel-Manipulation in Python
Abrufen der arXiv-API in Python
Klicken Sie in Python auf die Sesami-API
Zeitdelta in Python 2.7-Serie teilen
MySQL-automatische Escape-Funktion von Parametern in Python
Umgang mit JSON-Dateien in Python
Implementierung eines Lebensspiels in Python
Audio-Wellenform-Anzeige in Python
Erstellen Sie Google Mail in Python ohne Verwendung der API
Klicken Sie auf die Web-API in Python
Implementieren Sie die REST-API schnell in Python
Das Gesetz der Zahlen in Python
Implementierung der ursprünglichen Sortierung in Python
Reversibles Verwürfeln von Ganzzahlen in Python
Greifen Sie mit Python auf die Twitter-API zu
Mausbedienung mit Windows-API in Python
Versuchen Sie es mit der Wunderlist-API in Python
Überprüfen Sie das Verhalten des Zerstörers in Python
Übung, dies in Python zu verwenden (schlecht)
Allgemeine Relativitätstheorie in Python: Einführung
Versuchen Sie, die Kraken-API mit Python zu verwenden
Ausgabebaumstruktur von Dateien in Python
Zeigen Sie eine Liste der Alphabete in Python 3 an
Vergleich japanischer Konvertierungsmodule in Python3
Überprüfen und empfangen Sie die serielle Schnittstelle in Python (Portprüfung)
Zusammenfassung verschiedener for-Anweisungen in Python
Tweet mit der Twitter-API in Python
Holen Sie sich Google Fit API-Daten in Python
Das Ergebnis der Installation von Python auf Anaconda
Holen Sie sich Youtube-Daten in Python mithilfe der Youtube-Daten-API
Entwicklung und Bereitstellung der REST-API in Python mit Falcon Web Framework
Gang of Four (GoF) -Muster in Python
Probieren Sie schnell die Microsoft Face API in Python aus
Grundlagen zum Ausführen von NoxPlayer in Python
Massenersatz von Zeichenfolgen in Python-Arrays
Erhalten Sie eine Liste der Ergebnisse der Parallelverarbeitung in Python mit Starmap
Projekt Euler # 16 "Summe der Kräfte" in Python
Traffic Safety-Kun: Erkennung von Verkehrszeichen in Python
Zusammenfassung der integrierten Methoden usw. der Python-Liste
Nicht logische Operatorverwendung von oder in Python
Auf der Suche nach dem schnellsten FizzBuzz in Python
Praktisches Beispiel für hexagonale Architektur in Python
Projekt Euler # 17 "Anzahl der Zeichen" in Python
Doppelte Pendelbewegungsgleichung in Python
Entfernen Sie DICOM-Bilder in Python
Status jedes Python-Verarbeitungssystems im Jahr 2020
Stellen Sie von Python aus eine Verbindung zur Websocket-API von coincheck her
Projekt Euler # 1 "Vielfaches von 3 und 5" in Python