Qiita API Python wrapper for batch processing to grab Qiita posts

A Python wrapper for the Qiita API for batches created in Get "almost" all posts with the Qiita API.

I didn't make it so well, so when I thought about brushing it up, I didn't do anything and left it for a month. .. .. Since the API has also been upgraded, we will only support v2 and release it for the time being.


This is a code example that retrieves up to 5 pages of 100 new posts and saves them in a file. Since the paging process is done internally, there is no need to write a double loop.

qiita2.wait_seconds = 0
for item in qiita2.items(100, 5):

Calling ʻitems () returns the iterator of the post's json object, so you can use it as is in a for statement or sort.

Supported API

I made only what I needed, so that's it.

How to use

Specifying an access token

Set with the module variable of qiita2.

qiita2.auth_token = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"


Variable name Default Contents
default_per_page 100 Number of acquisitions per page(Per in each API_Default when page is omitted)
default_max_page 100 Maximum number of pages(Max in each API_Default when page is omitted)
wait_seconds 12 Wait seconds before sending request
retry_wait_min 1 Wait until retry when an error occurs(Minutes)
retry_limit 10 Limit the number of retries when an error occurs


Get a list of new posts

Returns the iterator of the new post list.

#Get with default number of acquisitions and maximum number of pages
items = qiita2.items()
#Get by specifying the number of acquisitions and the maximum number of pages
items = qiita2.items(per_page=20, max_page=5)

Get tag list

Returns the iterator of the tag list.

items = qiita2.tags()
items = qiita2.tags(per_page, max_page)

Get a list of posts with a specific tag

Returns an iterator for a list of specific tag posts.

items = qiita2.tag_items(tag_url)
items = qiita2.tag_items(tag_url, per_page, max_page)

Acquisition of total number

Get from the Total-Count response header.


Save post to file

The file name is data / items / <post ID> .json. ← Is the save destination fixed?



Since it is one file, you can use it as it is by copying and pasting.


import time

import codecs
import json
from logging import getLogger
import requests
from urllib.parse import urlparse, parse_qs

logger = getLogger(__name__)

URL_ITEMS     = "https://qiita.com/api/v2/items"
URL_TAG_ITEMS = "https://qiita.com/api/v2/tags/%s/items"
URL_TAGS      = "https://qiita.com/api/v2/tags"

HEADER_TOTAL = "Total-Count"
LINK_NEXT = "next"
LINK_LAST = "last"

default_per_page = 100
default_max_page = 100
wait_seconds = 12
retry_wait_min = 1
retry_limit = 10

auth_token = None

def items(per_page = default_per_page, max_page = default_max_page):
    req = QiitaRequest(URL_ITEMS, per_page, max_page)
    return QiitaIterator(req)

def tag_items(tag_url, per_page = default_per_page, max_page = default_max_page):
    req = QiitaRequest(URL_TAG_ITEMS % tag_url, per_page, max_page)
    return QiitaIterator(req)

def tags(per_page = default_per_page, max_page = default_max_page):
    req = QiitaRequest(URL_TAGS, per_page, max_page)
    return QiitaIterator(req)

class QiitaIterator:
    def __init__(self, req):
        self.req = req
        self.items = req.request().__iter__()

    def __iter__(self):
        return self

    def __next__(self):
        if self.items == None: raise StopIteration
            val = self.items.__next__()
            return val
        except StopIteration:
            if self.req.has_next():
                self.items = self.req.next().__iter__()
                return self.__next__()
                raise StopIteration

    def __len__(self):
        return self.req.total_count()

class QiitaRequest:

    last_request_time = None

    retry_num = 0

    def __init__(self, url, per_page = default_per_page, max_page = default_max_page, page = 1):
        self.url = url
        self.per_page = per_page
        self.max_page = max_page
        self.page = page
        self.res = None
        self.current_page = None

    def request(self):
        self.links = dict()
        params = {"per_page": self.per_page, "page": self.page}
        return self.__request__(self.url, params)

    def __request__(self, url, params = None):
        logger.info("url:%s" % url)

        headers = {"Authorization": "Bearer " + auth_token} if auth_token != None else None
        self.res = requests.get(url, params = params, headers = headers)
        status = self.res.status_code

        while status != 200 and QiitaRequest.retry_num <= retry_limit:
            logger.warning("status:%d" % status)
            logger.warn(u"%Wait d minutes." % retry_wait_min)
            time.sleep(retry_wait_min * 60)
            QiitaRequest.retry_num = QiitaRequest.retry_num + 1
            self.res = requests.get(url, params = params)
            status = self.res.status_code

        if status != 200:
            logger.warning("status:%d" % status)
            return None

        QiitaRequest.retry_num = 0
        return self.res.json()

    def next(self):
        if not self.has_next(): raise Exception()
        #Per in Link response header in v2_Dealing with missing page
        params = {"per_page": self.per_page}
        return self.__request__(self.res.links[LINK_NEXT]["url"], params)

    def retry(self):
    def has_error(self):
    def has_next(self):
        if not LINK_NEXT in self.res.links: return False
        url = self.res.links[LINK_NEXT]["url"]
        page = self.__get_page__(url)
        return page <= self.max_page

    def last_page(self):
        url = self.res.links[LINK_LAST]["url"]
        return self.__get_page__(url)

    def total_count(self):
        return int(self.res.headers[HEADER_TOTAL])

    def __get_page__(self, url):
        query = urlparse(url).query
        page = parse_qs(query)["page"][0]
        return int(page)

    def __wait__(self):
        if QiitaRequest.last_request_time != None:
            last = QiitaRequest.last_request_time
            now = time.clock()
            wait = wait_seconds - (now - last)
            if 0 < wait:
        QiitaRequest.last_request_time = time.clock()

def save_item(item):
    item_id = item["id"]
    filename = "data/items/%s.json" % item_id
    with codecs.open(filename, "w", "utf-8") as f:
        f.write(json.dumps(item, indent = 4, ensure_ascii=False))

