Receive websocket of kabu station ® API in Python

Overview

Continuing from previous, kabu station API provided by kabu.com Securities to individuals will be used from Python. This time, we will receive the distribution by Websocket of the brand in Python. At the same time, we will also introduce the code for registering, deregistering, and deregistering all issues.

environment

Additional packages

code

Brand registration

import json
import requests
import yaml

# ---

def get_token():
    with open('auth.yaml', 'r') as yml:
        auth = yaml.safe_load(yml)

    url = 'http://localhost:18080/kabusapi/token'
    headers = {'content-type': 'application/json'}
    payload = json.dumps(
        {'APIPassword': auth['PASS'],}
        ).encode('utf8')

    response = requests.post(url, data=payload, headers=headers)

    return json.loads(response.text)['Token']

# ---

token = get_token()

EXCHANGES = {
    1: 'TSE',
    3: 'Nagoya Stock Exchange',
    5: 'Fukuoka Stock Exchange',
    6: 'Sapporo Stock Exchange',
}

payload = json.dumps({
    'Symbols': [
        {'Symbol': 8306 ,'Exchange': 1},  # MUFG
        {'Symbol': 9433 ,'Exchange': 1},  # KDDI
        # ...Up to 50 can be registered
    ],}).encode('utf8')

url = 'http://localhost:18080/kabusapi/register'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, payload, headers=headers)

regist_list = json.loads(response.text)

print('Delivery registration brand')
for regist in regist_list['RegistList']:
    print("{} {}".format(
        regist['Symbol'],
        EXCHANGES[regist['Exchange']]))

Brand registration cancellation

Only change the URL. The display unit can be diverted.

url = 'http://localhost:18080/kabusapi/unregister'

Cancel all registered stocks

payload is no longer needed.

url = 'http://localhost:18080/kabusapi/unregister/all'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, headers=headers)

Delivery display

Continue to receive distribution of registered brands. Exit with Ctrl + C. Note that .text is not added to response which is json.loads.

import asyncio
import json
import websockets

# ---

async def stream():
    uri = 'ws://localhost:18080/kabusapi/websocket'

    async with websockets.connect(uri, ping_timeout=None) as ws:
        while not ws.closed:
            response = await ws.recv()
            board = json.loads(response)
            print("{} {} {}".format(
                board['Symbol'],
                board['SymbolName'],
                board['CurrentPrice'],
            ))

loop = asyncio.get_event_loop()
loop.create_task(stream())
try:
    loop.run_forever()
except KeyboardInterrupt:
    exit()

Since the server side does not implement heartbeat, the argument of ping_timeout = None is required in websockets.connect.

[Request] WebSocket ping / pong support   Issue#8 https://github.com/kabucom/kabusapi/issues/8

Recommended Posts

Receive websocket of kabu station ® API in Python
Use kabu Station® API from Python
Evernote API in Python
C API in Python 3
Try using kabu station API of kabu.com Securities
kabu Station® API --Updated Python wrapper for PUSH API
Hit Mastodon's API in Python
Receive runtime arguments in Python 2.7
Equivalence of objects in Python
Blender Python API in Houdini (Python 3)
Implementation of quicksort in Python
Let's touch the API of Netatmo Weather Station with Python. #Python #Netatmo
Pixel manipulation of images in Python
Getting the arXiv API in Python
Hit the Sesami API in Python
Division of timedelta in Python 2.7 series
MySQL-automatic escape of parameters in python
Handling of JSON files in Python
Implementation of life game in Python
Waveform display of audio in Python
Create Gmail in Python without API
Hit the web API in Python
Quickly implement REST API in Python
Law of large numbers in python
Implementation of original sorting in Python
Reversible scrambling of integers in Python
Access the Twitter API in Python
Mouse operation using Windows API in Python
Try using the Wunderlist API in Python
Check the behavior of destructor in Python
(Bad) practice of using this in Python
General Theory of Relativity in Python: Introduction
Try using the Kraken API in Python
Output tree structure of files in Python
Display a list of alphabets in Python 3
Comparison of Japanese conversion module in Python3
Check and receive Serial port in Python (Port check)
Summary of various for statements in Python
Tweet using the Twitter API in Python
Get Google Fit API data in Python
The result of installing python in Anaconda
Get Youtube data in Python using Youtube Data API
Development and deployment of REST API in Python using Falcon Web Framework
Gang of Four (GoF) Patterns in Python
Quickly try Microsoft's Face API in Python
The basics of running NoxPlayer in Python
Bulk replacement of strings in Python arrays
Receive a list of the results of parallel processing in Python with starmap
Project Euler # 16 "Sum of Powers" in Python
Traffic Safety-kun: Recognition of traffic signs in Python
Summary of built-in methods in Python list
Non-logical operator usage of or in python
In search of the fastest FizzBuzz in Python
Try hitting the YouTube API in Python
Practical example of Hexagonal Architecture in Python
Project Euler # 17 "Number of Characters" in Python
Double pendulum equation of motion in python
Get rid of DICOM images in Python
Status of each Python processing system in 2020
Connect to coincheck's Websocket API from Python
Project Euler # 1 "Multiples of 3 and 5" in Python