Lors de la construction d'un service, je souhaite acquérir un verrou par exclusion mutuelle, mais je souhaite utiliser Redis car il ne suffit pas d'utiliser RDS. J'ai cherché une bibliothèque qui implémente "Design pattern: Locking with SETNX" dans SETNX - Redis en Python, mais je n'ai pas trouvé ce que je cherchais, alors je l'ai implémenté. J'ai essayé de. La traduction japonaise de SETNX - Redis est type de chaîne de caractères - documentationredis 2.0.3. Voir .html # commande-SETNX).
Source Code
mutex.py
from datetime import datetime
import time
from functools import wraps
from .exception import (DuplicateLockError,
                        HasNotLockError,
                        ExpiredLockError,
                        SetnxError,
                        LockError)
class Mutex(object):
    def __init__(self, client, key,
                 expire=10,
                 retry_count=6, # retry_count * retry_sleep_sec =Temps d'attente maximum
                 retry_setnx_count=100,
                 retry_sleep_sec=0.25):
        self._lock = None
        self._r = client
        self._key = key
        self._expire = expire
        self._retry_count = retry_count
        self._retry_setnx_count = retry_setnx_count
        self._retry_sleep_sec = retry_sleep_sec
    def _get_now(self):
        return float(datetime.now().strftime('%s.%f'))
    def lock(self):
        if self._lock:
            raise DuplicateLockError(self._key)
        self._do_lock()
    def _do_lock(self):
        for n in xrange(0, self._retry_count):
            is_set, old_expire = self._setnx()
            if is_set:
                self._lock = self._get_now()
                return
            if self._need_retry(old_expire):
                continue
            if not self._need_retry(self._getset()):
                self._lock = self._get_now()
                return 
        raise LockError(self._key)
    def _setnx(self):
        for n in xrange(0, self._retry_setnx_count):
            is_set = self._r.setnx(self._key, self._get_now() + self._expire)
            if is_set:
                return True, 0
            old_expire = self._r.get(self._key)
            if old_expire is not None:
                return False, float(old_expire)
        raise SetnxError(self._key)
    def _need_retry(self, expire):
        if expire < self._get_now():
            return False
        time.sleep(self._retry_sleep_sec)
        return True
    def _getset(self):
        old_expire = self._r.getset(self._key, self._get_now() + self._expire)
        if old_expire is None:
            return 0
        return float(old_expire)
    def unlock(self):
        if not self._lock:
            raise HasNotLockError(self._key)
        elapsed_time = self._get_now() - self._lock
        if self._expire <= elapsed_time:
            raise ExpiredLockError(self._key, elapsed_time)
        self._r.delete(self._key)
        self._lock = None
    def __enter__(self):
        self.lock()
        return self
    def __exit__(self, exc_type, exc_value, traceback):
        if self._lock:
            self.unlock()
        return True if exc_type is None else False
    def __call__(self, func):
        @wraps(func)
        def inner(*args, **kwargs):
            with self:
                return func(*args, **kwargs)
        return inner
exception.py
class MutexError(Exception):
    pass
class DuplicateLockError(MutexError):
    """
Déjà verrouillé()Verrouiller sur un objet Mutex exécuté()Se produit lors de la réexécution.
une fois, unlock()Est-ce que tu cours,Besoin de créer un autre objet Mutex.
    """
    pass
class HasNotLockError(MutexError):
    """
encore, lock()Déverrouiller sur les objets Mutex qui ne sont pas en cours d'exécution()Se produit lorsque vous exécutez.
    lock()Doit être fait plus tard.
    """
    pass
class ExpiredLockError(MutexError):
    """
    lock()Après exécution,déverrouiller avec le verrou libéré par expire()Se produit lorsque vous exécutez.
    """
    pass
class SetnxError(MutexError):
    pass
class LockError(MutexError):
    pass
Le flux de verrouillage approximatif est le suivant.
L'utilisation est la suivante.
usage.py
>>> from mutex import Mutex
>>> with Mutex(':'.join(['EmitAccessToken', user_id]):
>>>     # do something ...
>>>     pass
>>> @Mutex(':'.join(['EmitAccessToken', user_id]):
>>> def emit_access_token():
>>>     # do something ...
>>>     pass
>>> mutex = Mutex(':'.join(['EmitAccessToken', user_id])
>>> mutex.lock()
>>> # do something ...
>>> mutex.unlock()
test.py
import unittest
import redis
import time
from multiprocessing import Process
from .mutex import Mutex
from .exception import (DuplicateLockError,
                        HasNotLockError,
                        ExpiredLockError,
                        LockError)
class TestMutex(unittest.TestCase):
    def setUp(self):
        self.key = 'spam'
        self.r = redis.StrictRedis()
        self.mutex = Mutex(self.r, self.key)
    def tearDown(self):
        mutex = self.mutex
        if mutex._lock:
            mutex.unlock()
        mutex._r.delete('ham')
    def test_lock(self):
        mutex = self.mutex
        mutex.lock()
        self.assertIsNotNone(mutex._r.get(mutex._key))
        with self.assertRaises(DuplicateLockError):
            mutex.lock()
    def test_unlock(self):
        self.test_lock()
        mutex = self.mutex
        self.mutex.unlock()
        self.assertIsNone(mutex._r.get(mutex._key))
        with self.assertRaises(HasNotLockError):
            mutex.unlock()
        self.test_lock()
        time.sleep(10.5)
        with self.assertRaises(ExpiredLockError):
            mutex.unlock()
        mutex._lock = None #Initialisation forcée
    def test_expire(self):
        mutex1 = self.mutex
        mutex2 = Mutex(self.r, self.key, expire=2)
        mutex2.lock() #Gardez verrouillé pendant 2 secondes
        with self.assertRaises(LockError):
            mutex1.lock() #réessayer 6 fois* sleep 0.25 secondes= 1.5 secondes
        time.sleep(0.6) #prime
        mutex1.lock()
        self.assertIsNotNone(mutex1._r.get(mutex1._key))
    def test_with(self):
        mutex1 = self.mutex
        with mutex1:
            self.assertIsNotNone(mutex1._r.get(mutex1._key))
        self.assertIsNone(mutex1._r.get(mutex1._key))
        mutex2 = Mutex(self.r, self.key, expire=2)
        mutex2.lock() #Gardez verrouillé pendant 2 secondes
        with self.assertRaises(LockError):
            with mutex1: #réessayer 6 fois* sleep 0.25 secondes= 1.5 secondes
                pass
        mutex2.unlock()
        with mutex1:
            with self.assertRaises(DuplicateLockError):
                with mutex1:
                    pass
    def test_decorator(self):
        mutex = self.mutex
        @mutex
        def egg():
            self.assertIsNotNone(mutex._r.get(mutex._key))
        egg()
        self.assertIsNone(mutex._r.get(mutex._key))
    def test_multi_process(self):
        procs = 20
        counter = 100
        def incr():
            mutex = Mutex(redis.StrictRedis(), self.key, retry_count=100)
            for n in xrange(0, counter):
                mutex.lock()
                ham = mutex._r.get('ham') or 0
                mutex._r.set('ham', int(ham) + 1)
                mutex.unlock()
        ps = [Process(target=incr) for n in xrange(0, procs)]
        for p in ps:
            p.start()
        for p in ps:
            p.join()
        self.assertEqual(int(self.mutex._r.get('ham')), counter * procs)
Recommended Posts