Try using Reladomo's MT Loader (Multi-Threaded Matcher Loader)

Introduction

Continuing from Article I wrote earlier, this time I tried using Reladomo's MT Loader.

What is MT Loader (Multi-Threaded Matcher Loader)?

Multi-Threaded matcher Loader (MT Loader) is a feature for merging changes from another source (file, feed, other DB, etc.) into the destination DB. After reading the documentation, MT Loader is often recommended when the amount of data handled by Reladomo is large.

Operation image of MT Loader

image.png

--Detects data that exists in both the Input and Output (Database) datasets --All data that exists in both will be ʻUPDATE (provided that the data has changed). --INSERT for datasets that are in Input but not in Output (Database) --Close everything that is in Output (Database) but not in Input (depending on the implementation, DELETE or expire)

MT Loader architecture

image.png The IO can be distributed using multiple threads for reading, comparing, and writing.

Features of MT Loader

--It does not have to be File-to-Database. You can easily change to Database-to-Database or Memory-to-Database. --By subclassing MatcherThread or SingleQueueExecutor, you can tune according to various requirements. --Only writes to one table. It is possible to write to multiple tables, but there is no transaction guarantee. It is a strength in use cases that handle large amounts of data, so it cannot be used in cases where transaction management is important. (Be careful when using it) --Idempotence can be guaranteed by design.

Try using MT Loader

I often write APIs in combination with Spring Boot, so I will write in that combination. The actual code can be found at https://github.com/amtkxa/spring-boot-reladomo-mt-loader.

Write test code

Advance preparation

Before executing the test code, I would like to create a state in which the test data has been read into the test DB in advance. Since it is assumed that MithraTestResource.addTestDataToDatabase will be used to read the test data, prepare the following file.

customer_data.txt


class com.amtkxa.domain.entity.Customer
customerId, name, country, businessDateFrom, businessDateTo, processingDateFrom, processingDateTo
1,"Liam","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
2,"Emma","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
3,"Noah","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
4,"Olivia","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
5,"William","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
6,"Ava","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
7,"James","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"

We have also prepared an abstract class for pre-populating H2 Database with test data.

public abstract class AbstractReladomoTest {
    private static Logger log = LoggerFactory.getLogger(AbstractReladomoTest.class);
    private MithraTestResource mithraTestResource;

    protected abstract String[] getTestDataFilenames();

    protected String getMithraConfigXmlFilename() {
        return "reladomo/config/TestReladomoRuntimeConfiguration.xml";
    }

    @BeforeEach
    public void setUp() {
        log.info("Setting up reladomo on h2");
        this.mithraTestResource = new MithraTestResource(this.getMithraConfigXmlFilename());
        ConnectionManagerForTests connectionManager = ConnectionManagerForTests.getInstanceForDbName("testdb");
        this.mithraTestResource.createSingleDatabase(connectionManager);
        for (String filename : this.getTestDataFilenames()) {
            this.mithraTestResource.addTestDataToDatabase(filename, connectionManager);
        }
        this.mithraTestResource.setUp();
    }

    @AfterEach
    public void tearDown() {
        this.mithraTestResource.tearDown();
    }
}

Run MT Loader with test code

Give the following input data to MT Loader and try to update the DB.

--customerId: Updated country for 6 users --Add one new user --Other users do not exist in the input data (expecting that the expiration date of the corresponding user is updated to expire)

The test code I actually wrote is as follows.

public class SingleQueueExecutorParallelLoadTest extends AbstractReladomoTest {
    private static int NUMBER_OF_THREADS = 2;
    private static int BATCH_SIZE = 5;
    private static int INSERT_THREADS = 3;

    @Override
    public String[] getTestDataFilenames() {
        return new String[] { "testdata/customer_data.txt" };
    }

    private List<Customer> getInputData() {
        Timestamp businessDate = DateUtil.parse("2019-12-05 00:00:00");
        CustomerList customerList = new CustomerList();
        customerList.add(new Customer(6, "Ava", "JPN", businessDate));
        customerList.add(new Customer(8, "Arthur", "USA", businessDate));
        return customerList;
    }

    private CustomerList getDbRecords() {
        return CustomerFinder.findMany(
                CustomerFinder.all()
                              .and(CustomerFinder.businessDate().equalsEdgePoint())
                              .and(CustomerFinder.processingDate().equalsInfinity())
        );
    }

    @Test
    public void testLoadDataParallel() {
        try {
            QueueExecutor queueExecutor = new SingleQueueExecutor(
                    NUMBER_OF_THREADS,
                    CustomerFinder.customerId().ascendingOrderBy(),
                    BATCH_SIZE,
                    CustomerFinder.getFinderInstance(),
                    INSERT_THREADS
            );

            MatcherThread<Customer> matcherThread = new MatcherThread<>(
                    queueExecutor,
                    new Extractor[] { CustomerFinder.customerId() }
            );
            matcherThread.start();

            // Database data load: Parallel
            DbLoadThread dbLoadThread = new DbLoadThread(getDbRecords(), null, matcherThread);
            dbLoadThread.start();

            // Input data load: Parallel
            PlainInputThread inputThread = new PlainInputThread(new InputDataLoader(), matcherThread);
            inputThread.run();
            matcherThread.waitTillDone();

            // Assert
            checkResult(queueExecutor);
        } catch (Exception e) {
            throw new ReladomoMTLoaderException("Failed to load data. " + e.getMessage(), e.getCause());
        }
    }

    private void checkResult(QueueExecutor queueExecutor) {
        // Whatever is in Output Set but not in Input Set will be closed out (terminated).
        CustomerList customerList = getDbRecords();
        assertEquals(2, customerList.count());

        // Whatever is in the intersection, will be updated (but only if something changed)
        Customer customer = CustomerFinder.findOne(
                CustomerFinder.customerId().eq(6)
                              .and(CustomerFinder.businessDate().equalsEdgePoint())
                              .and(CustomerFinder.processingDate().equalsInfinity())
        );
        assertAll("Check updated customer data",
                  () -> assertEquals("Ava", customer.getName()),
                  () -> assertEquals("JPN", customer.getCountry()) // Updated from USD to JPN
        );

        // Whatever in in Input Set but not in Output Set will be inserted
        Customer customer8 = CustomerFinder.findOne(
                CustomerFinder.customerId().eq(8)
                              .and(CustomerFinder.businessDate().equalsEdgePoint())
                              .and(CustomerFinder.processingDate().equalsInfinity())
        );
        assertAll("Check inserted customer data",
                  () -> assertEquals("Arthur", customer8.getName()),
                  () -> assertEquals("USA", customer8.getCountry())
        );

        assertAll("Check the count of inserts, updates, terminates",
                  () -> assertEquals(1, queueExecutor.getTotalInserts()),
                  () -> assertEquals(1, queueExecutor.getTotalUpdates()),
                  () -> assertEquals(6, queueExecutor.getTotalTerminates())
        );
    }

    private class InputDataLoader implements InputLoader {
        private boolean firstTime = true;

        @Override
        public List<? extends MithraTransactionalObject> getNextParsedObjectList() {
            return getInputData();
        }

        @Override
        public boolean isFileParsingComplete() {
            if (firstTime) {
                firstTime = false;
                return false;
            } else {
                return true;
            }
        }
    }
}

Before the update process by MT Loader works, the DB is in the following state.

-- select * from customer where business_date_to = '9999-12-01 23:59:00.000' and processing_date_to = '9999-12-01 23:59:00.000';
+-------------+---------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
| customer_id | name    | country | business_date_from      | business_date_to        | processing_date_from    | processing_date_to      |
+-------------+---------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
|           1 | Liam    | USA     | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
|           2 | Emma    | USA     | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
|           3 | Noah    | USA     | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
|           4 | Olivia  | USA     | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
|           5 | William | USA     | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
|           6 | Ava     | USA     | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
|           7 | James   | USA     | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
+-------------+---------+---------+-------------------------+-------------------------+-------------------------+-------------------------+

After the update process by MT Loader worked, the DB was in the following state, and the result was as expected.

-- select * from customer where business_date_to = '9999-12-01 23:59:00.000' and processing_date_to = '9999-12-01 23:59:00.000';
+-------------+--------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
| customer_id | name   | country | business_date_from      | business_date_to        | processing_date_from    | processing_date_to      |
+-------------+--------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
|           6 | Ava    | JPN     | 2020-10-10 14:24:52.387 | 9999-12-01 23:59:00.000 | 2020-10-10 14:24:53.250 | 9999-12-01 23:59:00.000 |
|           8 | Arthur | USA     | 2020-10-10 14:24:52.387 | 9999-12-01 23:59:00.000 | 2020-10-10 14:24:53.250 | 9999-12-01 23:59:00.000 |
+-------------+--------+---------+-------------------------+-------------------------+-------------------------+-------------------------+

at the end

Reladomo is one of my favorite technologies that I use a lot at work, but I have the impression that the hurdles to reach a point where it can be understood and used are a little high.

For example ..... There is no sample in kata that can be incorporated into Spring Boot, which is relatively often used, and when you try to use it, you need to think about how to incorporate DBConnectionManager. I'm ashamed to say that I have a lot of trouble making something that works in the first place.

Reladomo itself is a very good technology, but I couldn't find many articles about it ... I felt like it was a little wasteful, so this time I checked it myself. I decided to share it all together.

Referenced

Recommended Posts

Try using Reladomo's MT Loader (Multi-Threaded Matcher Loader)
Try using libGDX
Try using Maven
Try using powermock-mockito2-2.0.2
Try using GraalVM
Try using jmockit 1.48
Try using sql-migrate
Try using SwiftLint
Try using Log4j 2.0
Try using JobScheduler's REST-API
Try using java.lang.Math methods
Try using PowerMock's WhiteBox
Try using Talend Part 2
Try using Talend Part 1
Try using F # list
Try using each_with_index method
Try using Spring JDBC