Django's model doesn't have a mechanism for optimistic concurrency control. (No ... right?) This section describes an implementation sample for optimistic concurrency control in PostgreSQL.
xmin
for the row version (row_version) management fieldnote)
Create a subclass that extends the query expression to get the PostgreSQL system columns.
from django.db.models import Expression, PositiveIntegerField
class XMin(Expression):
output_field = PositiveIntegerField()
def as_postgresql(self, compiler, connection):
return f'"{compiler.query.base_table}"."xmin"', ()
Override get_queryset
and add a row version (row_version) column with ʻannotate`.
from django.db.models import Manager
class ConcurrentManager(Manager):
def get_queryset(self):
super_query = super().get_queryset().annotate(row_version=XMin())
return super_query
If an error occurs during concurrency control, the following custom Exception is issued.
class DbUpdateConcurrencyException(Exception):
pass
Modify the concurrency control manager class to override the save
method and implement concurrency control.
If the row to be updated is not found, issue a DbUpdateConcurrencyException
.
from django.db.models import Model
class ConcurrentModel(Model):
objects = ConcurrentManager()
class Meta:
abstract = True
base_manager_name = 'objects'
def save(self, **kwargs):
cls = self.__class__
if self.pk and not kwargs.get('force_insert', None):
rows = cls.objects.filter(
pk=self.pk, row_version=self.row_version)
if not rows:
raise DbUpdateConcurrencyException(cls.__name__, self.pk)
super().save(**kwargs)
Change the inheritance source from Model
to ConcurrentModel
.
class Customer(ConcurrentModel):
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
code = models.CharField(verbose_name='code', help_text='code', max_length=10)
name = models.CharField(verbose_name='name', help_text='name', max_length=50)
Add a row version (row_version).
class CustomerSerializer(DynamicFieldsModelSerializer):
row_version = serializers.IntegerField()
class Meta:
model = Customer
fields = (
'id',
'code',
'name',
'row_version',
)
You can confirm that the row version has been obtained.
curl -s -X GET "http://localhost:18000/api/customers/" -H "accept: application/json" | jq .
[
{
"id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1",
"code": "001",
"name": "test1",
"row_version": 588
},
{
"id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx2",
"code": "002",
"name": "test2",
"row_version": 592
}
]
curl -s -X GET "http://localhost:18000/api/customers/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1/" -H "accept: application/json" | jq .
{
"id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1",
"code": "001",
"name": "test",
"row_version": 588
}
An error will occur due to concurrency control, so 500 will be returned.
curl -X PUT "http://localhost:18000/api/customers/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1/" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"code\": \"001\", \"name\": \"test2\", \"row_version\": 0}"
<!doctype html>
<html lang="en">
<head>
<title>Server Error (500)</title>
</head>
<body>
<h1>Server Error (500)</h1><p></p>
</body>
</html>
I was able to register successfully.
curl -X PUT "http://localhost:18000/api/customers/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1/" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"code\": \"001\", \"name\": \"test2\", \"row_version\": 588}" | jq .
{
"id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1",
"code": "001",
"name": "test2",
"row_version": 588
}
If nothing is done, it will return with a 500 error, so control it so that it returns with 400.
django-rest-framework
has the ability to customize exception handling.
Use this to control the response of concurrency control errors that occur in the API view.
api/handlers.custom_exception_handler
from rest_framework import status
from rest_framework.validators import ValidationError
from rest_framework.response import Response
from rest_framework.views import exception_handler
from xxxxx import DbUpdateConcurrencyException
def custom_exception_handler(exc, context):
response = exception_handler(exc, context)
if isinstance(exc, DbUpdateConcurrencyException):
return Response(ValidationError({'db_update_concurrency': ['It has been modified by another user.']}).detail, status=status.HTTP_400_BAD_REQUEST)
return response
app/settings.py
REST_FRAMEWORK = {
'EXCEPTION_HANDLER': 'api.handlers.custom_exception_handler',
}
It will be returned at 400 in the following form.
curl -X PUT "http://localhost:18000/api/customers/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1/" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"code\": \"001\", \"name\": \"test2\", \"row_version\": 0}"
{
"db_update_concurrency": [
"It has been modified by another user."
]
}