[GO] Notes on transactions in the Java client library in the datastore

background:

Using the Java client library (Google Cloud Client Library for Java) to use the datastore There is. We investigated how this transaction works for multithreaded processing. The official document had the following specification explanation, so I checked it while running the code.

For one or more common entity groups, if multiple transactions try to change an entity at the same time, only the first transaction that commits the change will succeed and all other transactions will fail to commit. Become.

We also confirmed what happens when processing via Transaction object and processing via Datastore object are mixed.

Documentation Transaction docs

What i did

Check what happens when you try to update an entity from multiple threads in the following three cases

Scenario 1

Get entity from Thread1 via Transaction and sleep -> Get entity from Thread2 via Transaction & update -> Update via Transaction with Thread1

Scenario 2

Get entity from Thread1 via Transaction and sleep -> Get entity from Thread2 via Datastore & update -> Update via Transaction with Thread1

Scenario 3

Get entity from Thread1 via Datastore and sleep -> Get entity from Thread2 via Transaction & update -> Update via Datastore with Thread1

Conclusion

In case of transaction.update, update after confirming whether the entity has changed from the time of acquisition. The exception is when the state of the entity has changed. It doesn't matter if the update process is via a transaction object or a datastore object.

Confirmation code

Scenario 1

code


package jp.ne.opt.spinapp.runner;

import com.google.cloud.datastore.*;

public final class DatastoreTest {
    final static String NAME_SPACE = "dataflow";
    final static String LOCK_KIND = "lock";
    final static String LOCK_PROP_VALUE = "value";
    final static Datastore DATASTORE = DatastoreOptions.getDefaultInstance().getService();
    final static KeyFactory KEY_FACTORY = DATASTORE.newKeyFactory().setNamespace(NAME_SPACE).setKind(LOCK_KIND);
    final static Transaction TRANSACTION = DATASTORE.newTransaction();

    public static void main(final String[] args) throws InterruptedException {
        MultiThread1 mt1 = new MultiThread1();
        MultiThread2 mt2 = new MultiThread2();
        mt1.start();
        Thread.sleep(1000);
        mt2.start();
    }
}

class MultiThread1 extends Thread {
    public void run() {
        Entity lock = DatastoreTest.TRANSACTION.get(DatastoreTest.KEY_FACTORY.newKey("test"));
        System.out.println("got lock from 1: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
        try {
            Thread.sleep(3000);
            System.out.println("sleep 1 ended");
            Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 1).build();
            DatastoreTest.TRANSACTION.update(entity);
            DatastoreTest.TRANSACTION.commit();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (DatastoreTest.TRANSACTION.isActive()) {
                DatastoreTest.TRANSACTION.rollback();
            }
        }
        System.out.println("thread1 done.");
    }
}

class MultiThread2 extends Thread {
    public void run() {
        Entity lock = DatastoreTest.TRANSACTION.get(DatastoreTest.KEY_FACTORY.newKey("test"));
        System.out.println("got lock from 2: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
        try {
            Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 2).build();
            DatastoreTest.TRANSACTION.update(entity);
            DatastoreTest.TRANSACTION.commit();
        } finally {
            if (DatastoreTest.TRANSACTION.isActive()) {
                DatastoreTest.TRANSACTION.rollback();
            }
        }
        System.out.println("thread2 done.");
    }
}
got lock from 2: 0
got lock from 1: 0
thread2 done.
sleep 1 ended
[WARNING]
com.google.cloud.datastore.DatastoreException: transaction is no longer active

After thread2 commits, I get a transaction is no longer active error when trying to update on thread1. Datastore is updated with Thread2.

Scenario 2


class MultiThread1 extends Thread {
    public void run() {
        Entity lock = DatastoreTest.TRANSACTION.get(DatastoreTest.KEY_FACTORY.newKey("test"));
        System.out.println("got lock from 1: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
        try {
            Thread.sleep(10000);
            System.out.println("sleep 1 ended");
            Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 1).build();
            DatastoreTest.TRANSACTION.update(entity);
            DatastoreTest.TRANSACTION.commit();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (DatastoreTest.TRANSACTION.isActive()) {
                DatastoreTest.TRANSACTION.rollback();
            }
        }
        System.out.println("thread1 done.");
    }
}

class MultiThread2 extends Thread {
    public void run() {
        Entity lock = DatastoreTest.DATASTORE.get(DatastoreTest.KEY_FACTORY.newKey("test"));
        System.out.println("got lock from 2: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
        Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 2).build();
        DatastoreTest.DATASTORE.update(entity);
        System.out.println("thread2 done.");
    }
}
got lock from 1: 0
got lock from 2: 0
thread2 done.
sleep 1 ended
[WARNING]
com.google.cloud.datastore.DatastoreException: too much contention on these datastore entities. please try again. entity groups: [(app=b~spinapptest-151310!dataflow, lock, "test")]

After updating with thread2, an error trying to update with thread1. Datastore is updated with Thread2.

Scenario 3


class MultiThread1 extends Thread {
    public void run() {
        try {
            Entity lock = DatastoreTest.DATASTORE.get(DatastoreTest.KEY_FACTORY.newKey("test"));
            Thread.sleep(10000);
            System.out.println("sleep 1 ended");
            System.out.println("got lock from 1: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
            Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 1).build();
            DatastoreTest.DATASTORE.update(entity);
            System.out.println("thread1 done.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class MultiThread2 extends Thread {
    public void run() {
        Entity lock = DatastoreTest.TRANSACTION.get(DatastoreTest.KEY_FACTORY.newKey("test"));
        System.out.println("got lock from 2: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
        try {
            Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 2).build();
            DatastoreTest.TRANSACTION.update(entity);
            DatastoreTest.TRANSACTION.commit();
        } finally {
            if (DatastoreTest.TRANSACTION.isActive()) {
                DatastoreTest.TRANSACTION.rollback();
            }
        }
        System.out.println("thread2 done.");
    }
}
got lock from 2: 0
thread2 done.
sleep 1 ended
got lock from 1: 0
thread1 done.

Both Thread processes succeed.

Recommended Posts

Notes on transactions in the Java client library in the datastore
Notes on how to use marshmallow in the schema library
Notes on imshow () in OpenCV
Notes on coloring by value in the matplotlib scatter plot
Notes on using matplotlib on the server
Notes on installing Ubuntu 18.04 on the XPS 15 7590
Try sending an email with the Gmail API Client Library for Java
Notes on nfc.ContactlessFrontend () for nfcpy in python
Notes on creating static files in Django
Implement OAuth without using client library (Java)
What is "mahjong" in the Python library? ??
Notes on using OpenCL on Linux on the RX6800
Looking back on 2016 in the Crystal language
SELECT data using client library in BigQuery
Notes on using code formatter in Python
How to use the C library in Python
Run the task in the background on the sshed server
Frequently used (personally) notes on the tar command
Twitter streaming client to enjoy in the terminal
Notes on using dict in python [Competition Pro]
ABC125_C --GCD on Blackboard [Notes solved in Python]
Use the LibreOffice app in Python (3) Add library
[Modint] Decoding the AtCoder Library ~ Implementation in Python ~
Install the machine learning library TensorFlow on fedora23
A note on the library implementation that explores hyperparameters using Bayesian optimization in Python