Implementieren Sie Redis Mutex in Python

Ursprung

Beim Erstellen eines Dienstes möchte ich eine Sperre durch gegenseitigen Ausschluss erwerben, aber ich möchte Redis verwenden, da die Verwendung von RDS nicht ausreicht. Ich habe in Python in SETNX - Redis nach einer Bibliothek gesucht, die "Entwurfsmuster: Sperren mit SETNX" implementiert, aber ich konnte nicht finden, wonach ich gesucht habe, und habe sie implementiert. Ich versuchte zu. Die japanische Übersetzung von SETNX - Redis lautet Dokumentation zum Zeichenkettentyp --redis 2.0.3. Siehe .html # command-SETNX).

Implementierung

Source Code

mutex.py


from datetime import datetime
import time
from functools import wraps

from .exception import (DuplicateLockError,
                        HasNotLockError,
                        ExpiredLockError,
                        SetnxError,
                        LockError)


class Mutex(object):
    def __init__(self, client, key,
                 expire=10,
                 retry_count=6, # retry_count * retry_sleep_sec =Maximale Wartezeit
                 retry_setnx_count=100,
                 retry_sleep_sec=0.25):
        self._lock = None
        self._r = client
        self._key = key
        self._expire = expire
        self._retry_count = retry_count
        self._retry_setnx_count = retry_setnx_count
        self._retry_sleep_sec = retry_sleep_sec

    def _get_now(self):
        return float(datetime.now().strftime('%s.%f'))

    def lock(self):
        if self._lock:
            raise DuplicateLockError(self._key)
        self._do_lock()

    def _do_lock(self):
        for n in xrange(0, self._retry_count):
            is_set, old_expire = self._setnx()
            if is_set:
                self._lock = self._get_now()
                return

            if self._need_retry(old_expire):
                continue

            if not self._need_retry(self._getset()):
                self._lock = self._get_now()
                return 

        raise LockError(self._key)

    def _setnx(self):
        for n in xrange(0, self._retry_setnx_count):
            is_set = self._r.setnx(self._key, self._get_now() + self._expire)
            if is_set:
                return True, 0

            old_expire = self._r.get(self._key)
            if old_expire is not None:
                return False, float(old_expire)

        raise SetnxError(self._key)

    def _need_retry(self, expire):
        if expire < self._get_now():
            return False

        time.sleep(self._retry_sleep_sec)
        return True

    def _getset(self):
        old_expire = self._r.getset(self._key, self._get_now() + self._expire)
        if old_expire is None:
            return 0

        return float(old_expire)

    def unlock(self):
        if not self._lock:
            raise HasNotLockError(self._key)

        elapsed_time = self._get_now() - self._lock
        if self._expire <= elapsed_time:
            raise ExpiredLockError(self._key, elapsed_time)

        self._r.delete(self._key)
        self._lock = None

    def __enter__(self):
        self.lock()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if self._lock:
            self.unlock()
        return True if exc_type is None else False

    def __call__(self, func):
        @wraps(func)
        def inner(*args, **kwargs):
            with self:
                return func(*args, **kwargs)
        return inner

exception.py


class MutexError(Exception):
    pass


class DuplicateLockError(MutexError):

    """
Bereits gesperrt()Sperren Sie ein laufendes Mutex-Objekt()Tritt beim erneuten Ausführen auf.
einmal, unlock()Rennst du,Sie müssen ein anderes Mutex-Objekt erstellen.
    """

    pass


class HasNotLockError(MutexError):

    """
noch, lock()Entsperren Sie Mutex-Objekte, die nicht ausgeführt werden()Tritt auf, wenn Sie laufen.
    lock()Muss später gemacht werden.
    """

    pass


class ExpiredLockError(MutexError):

    """
    lock()Nach der Ausführung,Entsperren Sie mit der durch abgelaufenen Sperre freigegebenen Sperre()Tritt auf, wenn Sie laufen.
    """

    pass


class SetnxError(MutexError):
    pass


class LockError(MutexError):
    pass

Kommentar

Der grobe Sperrfluss ist wie folgt.

  1. Stellen Sie in SETNX "Ablaufdatum X" ein
  2. Wenn es eingestellt werden kann, ist die Sperre erfolgreich
  3. Wenn es nicht eingestellt werden kann, erhalten Sie "Ablaufdatum Y" von GET
  4. Wenn "Ablaufdatum Y" gültig ist, fahren Sie mit 1 fort
  5. Wenn "Ablaufdatum Y" bereits ungültig ist, setzen Sie mit GETSET "Ablaufdatum X".
  6. Wenn das von GETSET erhaltene "Ablaufdatum Z" ungültig ist, ist die Sperre erfolgreich.
  7. Wenn "Ablaufdatum Z" gültig ist, fahren Sie mit 1 fort, da ein anderer Prozess GETSET war.

Die Verwendung ist wie folgt.

usage.py


>>> from mutex import Mutex
>>> with Mutex(':'.join(['EmitAccessToken', user_id]):
>>>     # do something ...
>>>     pass

>>> @Mutex(':'.join(['EmitAccessToken', user_id]):
>>> def emit_access_token():
>>>     # do something ...
>>>     pass

>>> mutex = Mutex(':'.join(['EmitAccessToken', user_id])
>>> mutex.lock()
>>> # do something ...
>>> mutex.unlock()

Prüfung

test.py


import unittest
import redis
import time
from multiprocessing import Process

from .mutex import Mutex
from .exception import (DuplicateLockError,
                        HasNotLockError,
                        ExpiredLockError,
                        LockError)


class TestMutex(unittest.TestCase):
    def setUp(self):
        self.key = 'spam'
        self.r = redis.StrictRedis()
        self.mutex = Mutex(self.r, self.key)

    def tearDown(self):
        mutex = self.mutex
        if mutex._lock:
            mutex.unlock()
        mutex._r.delete('ham')

    def test_lock(self):
        mutex = self.mutex
        mutex.lock()
        self.assertIsNotNone(mutex._r.get(mutex._key))

        with self.assertRaises(DuplicateLockError):
            mutex.lock()

    def test_unlock(self):
        self.test_lock()

        mutex = self.mutex
        self.mutex.unlock()
        self.assertIsNone(mutex._r.get(mutex._key))

        with self.assertRaises(HasNotLockError):
            mutex.unlock()

        self.test_lock()
        time.sleep(10.5)
        with self.assertRaises(ExpiredLockError):
            mutex.unlock()
        mutex._lock = None #Erzwungene Initialisierung

    def test_expire(self):
        mutex1 = self.mutex

        mutex2 = Mutex(self.r, self.key, expire=2)
        mutex2.lock() #2 Sekunden lang gesperrt halten

        with self.assertRaises(LockError):
            mutex1.lock() #6 mal wiederholen* sleep 0.25 Sekunden= 1.5 Sekunden

        time.sleep(0.6) #Bonus
        mutex1.lock()
        self.assertIsNotNone(mutex1._r.get(mutex1._key))

    def test_with(self):
        mutex1 = self.mutex
        with mutex1:
            self.assertIsNotNone(mutex1._r.get(mutex1._key))
        self.assertIsNone(mutex1._r.get(mutex1._key))

        mutex2 = Mutex(self.r, self.key, expire=2)
        mutex2.lock() #2 Sekunden lang gesperrt halten

        with self.assertRaises(LockError):
            with mutex1: #6 mal wiederholen* sleep 0.25 Sekunden= 1.5 Sekunden
                pass

        mutex2.unlock()

        with mutex1:
            with self.assertRaises(DuplicateLockError):
                with mutex1:
                    pass

    def test_decorator(self):
        mutex = self.mutex
        @mutex
        def egg():
            self.assertIsNotNone(mutex._r.get(mutex._key))
        egg()
        self.assertIsNone(mutex._r.get(mutex._key))

    def test_multi_process(self):
        procs = 20
        counter = 100

        def incr():
            mutex = Mutex(redis.StrictRedis(), self.key, retry_count=100)
            for n in xrange(0, counter):
                mutex.lock()

                ham = mutex._r.get('ham') or 0
                mutex._r.set('ham', int(ham) + 1)

                mutex.unlock()

        ps = [Process(target=incr) for n in xrange(0, procs)]
        for p in ps:
            p.start()

        for p in ps:
            p.join()

        self.assertEqual(int(self.mutex._r.get('ham')), counter * procs)

Recommended Posts

Implementieren Sie Redis Mutex in Python
Implementieren Sie XENO mit Python
Implementieren Sie sum in Python
Implementieren Sie Traceroute in Python 3
Implementieren Sie Naive Bayes in Python 3.3
Implementieren Sie alte Chiffren in Python
Implementieren Sie die Erweiterung in Python
Redis Rohrauskleidung in Python
Implementieren Sie schnelles RPC in Python
Implementieren Sie den Dijkstra-Algorithmus in Python
Implementieren Sie den Slack Chat Bot in Python
Implementieren Sie das Stacking-Lernen in Python [Kaggle]
Implementieren Sie die Funktion power.prop.test von R in Python
Implementieren Sie das Singleton-Muster in Python
Implementieren Sie die REST-API schnell in Python
Python in der Optimierung
CURL in Python
Ich habe versucht, PLSA in Python zu implementieren
Metaprogrammierung mit Python
Python 3.3 mit Anaconda
Geokodierung in Python
Ich habe versucht, Permutation in Python zu implementieren
Metaanalyse in Python
Unittest in Python
Implementieren Sie den FIR-Filter in Python und C.
Implementieren Sie gemeinsam statistische Hypothesentests in Python
Epoche in Python
Zwietracht in Python
Deutsch in Python
DCI in Python
Quicksort in Python
nCr in Python
N-Gramm in Python
Programmieren mit Python
Plink in Python
Ich habe versucht, ADALINE in Python zu implementieren
Konstante in Python
Ich habe versucht, PPO in Python zu implementieren
FizzBuzz in Python
SQLite in Python
Schritt AIC in Python
Führen Sie so etwas wie Redis-Transaktionen in Python aus
LINE-Bot [0] in Python
CSV in Python
Reverse Assembler mit Python
Reflexion in Python
Konstante in Python
nCr in Python.
Format in Python
Scons in Python 3
Puyopuyo in Python
Python in Virtualenv
PPAP in Python
Quad-Tree in Python
Reflexion in Python
Chemie mit Python