Implémenter Redis Mutex en Python

Origine

Lors de la construction d'un service, je souhaite acquérir un verrou par exclusion mutuelle, mais je souhaite utiliser Redis car il ne suffit pas d'utiliser RDS. J'ai cherché une bibliothèque qui implémente "Design pattern: Locking with SETNX" dans SETNX - Redis en Python, mais je n'ai pas trouvé ce que je cherchais, alors je l'ai implémenté. J'ai essayé de. La traduction japonaise de SETNX - Redis est type de chaîne de caractères - documentationredis 2.0.3. Voir .html # commande-SETNX).

la mise en oeuvre

Source Code

mutex.py


from datetime import datetime
import time
from functools import wraps

from .exception import (DuplicateLockError,
                        HasNotLockError,
                        ExpiredLockError,
                        SetnxError,
                        LockError)


class Mutex(object):
    def __init__(self, client, key,
                 expire=10,
                 retry_count=6, # retry_count * retry_sleep_sec =Temps d'attente maximum
                 retry_setnx_count=100,
                 retry_sleep_sec=0.25):
        self._lock = None
        self._r = client
        self._key = key
        self._expire = expire
        self._retry_count = retry_count
        self._retry_setnx_count = retry_setnx_count
        self._retry_sleep_sec = retry_sleep_sec

    def _get_now(self):
        return float(datetime.now().strftime('%s.%f'))

    def lock(self):
        if self._lock:
            raise DuplicateLockError(self._key)
        self._do_lock()

    def _do_lock(self):
        for n in xrange(0, self._retry_count):
            is_set, old_expire = self._setnx()
            if is_set:
                self._lock = self._get_now()
                return

            if self._need_retry(old_expire):
                continue

            if not self._need_retry(self._getset()):
                self._lock = self._get_now()
                return 

        raise LockError(self._key)

    def _setnx(self):
        for n in xrange(0, self._retry_setnx_count):
            is_set = self._r.setnx(self._key, self._get_now() + self._expire)
            if is_set:
                return True, 0

            old_expire = self._r.get(self._key)
            if old_expire is not None:
                return False, float(old_expire)

        raise SetnxError(self._key)

    def _need_retry(self, expire):
        if expire < self._get_now():
            return False

        time.sleep(self._retry_sleep_sec)
        return True

    def _getset(self):
        old_expire = self._r.getset(self._key, self._get_now() + self._expire)
        if old_expire is None:
            return 0

        return float(old_expire)

    def unlock(self):
        if not self._lock:
            raise HasNotLockError(self._key)

        elapsed_time = self._get_now() - self._lock
        if self._expire <= elapsed_time:
            raise ExpiredLockError(self._key, elapsed_time)

        self._r.delete(self._key)
        self._lock = None

    def __enter__(self):
        self.lock()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if self._lock:
            self.unlock()
        return True if exc_type is None else False

    def __call__(self, func):
        @wraps(func)
        def inner(*args, **kwargs):
            with self:
                return func(*args, **kwargs)
        return inner

exception.py


class MutexError(Exception):
    pass


class DuplicateLockError(MutexError):

    """
Déjà verrouillé()Verrouiller sur un objet Mutex exécuté()Se produit lors de la réexécution.
une fois, unlock()Est-ce que tu cours,Besoin de créer un autre objet Mutex.
    """

    pass


class HasNotLockError(MutexError):

    """
encore, lock()Déverrouiller sur les objets Mutex qui ne sont pas en cours d'exécution()Se produit lorsque vous exécutez.
    lock()Doit être fait plus tard.
    """

    pass


class ExpiredLockError(MutexError):

    """
    lock()Après exécution,déverrouiller avec le verrou libéré par expire()Se produit lorsque vous exécutez.
    """

    pass


class SetnxError(MutexError):
    pass


class LockError(MutexError):
    pass

Commentaire

Le flux de verrouillage approximatif est le suivant.

  1. Réglez "Date d'expiration X" dans SETNX
  2. S'il peut être défini, le verrouillage réussit
  3. Si elle ne peut pas être définie, obtenez "Date d'expiration Y" par GET
  4. Si "Date d'expiration Y" est valide, passez à 1
  5. Si "Date d'expiration Y" est déjà invalide, définissez "Date d'expiration X" avec GETSET.
  6. Si la "date d'expiration Z" obtenue par GETSET n'est pas valide, le verrouillage réussit.
  7. Si "Date d'expiration Z" est valide, passez à 1 car un autre processus a été GETSET.

L'utilisation est la suivante.

usage.py


>>> from mutex import Mutex
>>> with Mutex(':'.join(['EmitAccessToken', user_id]):
>>>     # do something ...
>>>     pass

>>> @Mutex(':'.join(['EmitAccessToken', user_id]):
>>> def emit_access_token():
>>>     # do something ...
>>>     pass

>>> mutex = Mutex(':'.join(['EmitAccessToken', user_id])
>>> mutex.lock()
>>> # do something ...
>>> mutex.unlock()

tester

test.py


import unittest
import redis
import time
from multiprocessing import Process

from .mutex import Mutex
from .exception import (DuplicateLockError,
                        HasNotLockError,
                        ExpiredLockError,
                        LockError)


class TestMutex(unittest.TestCase):
    def setUp(self):
        self.key = 'spam'
        self.r = redis.StrictRedis()
        self.mutex = Mutex(self.r, self.key)

    def tearDown(self):
        mutex = self.mutex
        if mutex._lock:
            mutex.unlock()
        mutex._r.delete('ham')

    def test_lock(self):
        mutex = self.mutex
        mutex.lock()
        self.assertIsNotNone(mutex._r.get(mutex._key))

        with self.assertRaises(DuplicateLockError):
            mutex.lock()

    def test_unlock(self):
        self.test_lock()

        mutex = self.mutex
        self.mutex.unlock()
        self.assertIsNone(mutex._r.get(mutex._key))

        with self.assertRaises(HasNotLockError):
            mutex.unlock()

        self.test_lock()
        time.sleep(10.5)
        with self.assertRaises(ExpiredLockError):
            mutex.unlock()
        mutex._lock = None #Initialisation forcée

    def test_expire(self):
        mutex1 = self.mutex

        mutex2 = Mutex(self.r, self.key, expire=2)
        mutex2.lock() #Gardez verrouillé pendant 2 secondes

        with self.assertRaises(LockError):
            mutex1.lock() #réessayer 6 fois* sleep 0.25 secondes= 1.5 secondes

        time.sleep(0.6) #prime
        mutex1.lock()
        self.assertIsNotNone(mutex1._r.get(mutex1._key))

    def test_with(self):
        mutex1 = self.mutex
        with mutex1:
            self.assertIsNotNone(mutex1._r.get(mutex1._key))
        self.assertIsNone(mutex1._r.get(mutex1._key))

        mutex2 = Mutex(self.r, self.key, expire=2)
        mutex2.lock() #Gardez verrouillé pendant 2 secondes

        with self.assertRaises(LockError):
            with mutex1: #réessayer 6 fois* sleep 0.25 secondes= 1.5 secondes
                pass

        mutex2.unlock()

        with mutex1:
            with self.assertRaises(DuplicateLockError):
                with mutex1:
                    pass

    def test_decorator(self):
        mutex = self.mutex
        @mutex
        def egg():
            self.assertIsNotNone(mutex._r.get(mutex._key))
        egg()
        self.assertIsNone(mutex._r.get(mutex._key))

    def test_multi_process(self):
        procs = 20
        counter = 100

        def incr():
            mutex = Mutex(redis.StrictRedis(), self.key, retry_count=100)
            for n in xrange(0, counter):
                mutex.lock()

                ham = mutex._r.get('ham') or 0
                mutex._r.set('ham', int(ham) + 1)

                mutex.unlock()

        ps = [Process(target=incr) for n in xrange(0, procs)]
        for p in ps:
            p.start()

        for p in ps:
            p.join()

        self.assertEqual(int(self.mutex._r.get('ham')), counter * procs)

Recommended Posts

Implémenter Redis Mutex en Python
Mettre en œuvre des recommandations en Python
Implémenter XENO avec python
Implémenter sum en Python
Implémenter Traceroute dans Python 3
Implémenter Naive Bayes dans Python 3.3
Implémenter d'anciens chiffrements en python
Implémenter l'extension en Python
Doublure de pipe Redis en Python
Mettre en œuvre un RPC rapide en Python
Implémenter l'algorithme de Dijkstra en python
Implémenter le bot de discussion Slack en Python
Mettre en œuvre l'apprentissage de l'empilement en Python [Kaggle]
Implémenter la fonction power.prop.test de R en python
Implémenter le modèle Singleton en Python
Implémentez rapidement l'API REST en Python
Python en optimisation
CURL en Python
J'ai essayé d'implémenter PLSA en Python
Métaprogrammation avec Python
Python 3.3 avec Anaconda
Géocodage en python
J'ai essayé d'implémenter la permutation en Python
Méta-analyse en Python
Unittest en Python
Implémenter le filtre FIR en langage Python et C
Mettre en œuvre collectivement des tests d'hypothèses statistiques en Python
Époque en Python
Discord en Python
Allemand en Python
DCI en Python
tri rapide en python
nCr en python
N-Gram en Python
Programmation avec Python
Plink en Python
J'ai essayé d'implémenter ADALINE en Python
Constante en Python
J'ai essayé d'implémenter PPO en Python
FizzBuzz en Python
Sqlite en Python
Étape AIC en Python
Faites quelque chose comme les transactions Redis en Python
LINE-Bot [0] en Python
CSV en Python
Assemblage inversé avec Python
Réflexion en Python
Constante en Python
nCr en Python.
format en python
Scons en Python 3
Puyopuyo en python
python dans virtualenv
PPAP en Python
Quad-tree en Python
Réflexion en Python
Chimie avec Python