[PYTHON] Distributed architecture implemented by Mars

This article introduces the distributed architecture implemented in Alibaba's open source ** Mars **.

What Mars looks like previous article, But after testing on an in-house system, [Open source on GitHub](https://www.alibabacloud.com/blog/mars-matrix-based-universal-distributed-computing-framework_594606?spm = a2c65.11461447.0.0.4be1c339z4ytI2). This article introduces the distributed execution architecture implemented in Mars.

Architecture introduction

Mars provides a library for distributed execution of tensors. This library is written using the actor model implemented in mars.actors and includes schedulers, workers and web services.

The graph submitted by the client to Mars Web Service consists of tensors. The web service receives the graph and submits it to the scheduler. Before submitting a job to each worker, Mars' scheduler compiles the tensor graph into a graph consisting of chunks and operands, parses and divides the graph. The scheduler then creates a set of OperandActors that control the execution of a single operand on all schedulers, based on a consistent hash. Operands are scheduled in topological order. When all operands are executed, the entire diagram is marked as complete and the client can retrieve the results from the web. The entire execution process is shown in the figure below.


Submit job

The client submits the job to the Mars service via the RESTful API. The client writes code in the tensor, transforms the tensor operation into a graph composed of tensors via session.run (tensor) and sends it to the Web API. The Web API then sends the job to the SessionActor and creates a GraphActor for graph analysis and management in the cluster. The client begins querying the execution status of the graph until it finishes executing.

GraphActor first transforms a tensor graph into a graph consisting of operands and chunks according to the chunk settings. This process allows the graph to be subdivided and run in parallel. It then performs a series of parsing on the graph to determine the operand priority and assigns a worker to the start operand. For this part, see Preparing the Execution Graph. Then, for each operand, create an OperandActor to control the specific execution of the operand. When an operand is in the READY state (as described in the section ʻOperand state`), the scheduler selects the target worker for that operand and submits a job to that worker for actual execution.

Control of execution

When the operand is sent to the worker, the OperandActor waits for a callback on the worker. If the operand is successful, the operand is scheduled to follow. If the operand fails to execute, the OperandActor will try several times. If it fails, the execution is marked as failed.

Cancel the job

The client can use the RESTful API to cancel a running job. The cancel request is written to the graph's state storage and the cancel interface on the GraphActor is called. If the job is in the preparatory stage, it ends immediately after a stop request is detected, otherwise a request is sent to each operand actor and the state is set to CANCELLING. At this time, if the operand is not operating, the operand status is set directly to CANCELLED. If the operand is running, a stop request is sent to the worker, an ExecutionInterrupted error occurs, and it is returned to the OperatingActor. At this time, the status of the operand is marked as CANCELLED.

Preparation of execution graph

When you populate the Mars scheduler with a tensor graph, it produces a finer graph of operands and chunks, depending on the chunk parameters contained in the data source.

Graph compression

After the chunk graph is generated, reduce the size of the graph by fusing the adjacent nodes in the graph. This fusing also allows you to take full advantage of acceleration libraries like numexpr to speed up computations. Currently, Mars is fusing only the operands that form a single chain. For example, if you execute the following code.

import mars.tensor as mt
a = mt.random.rand(100, chunks=100)
b = mt.random.rand(100, chunks=100)
c = (a + b).sum()

Mars fuses the ADD and SUM operands with the FUSE node. Land operands do not merge because ADD and SUM do not form a simple straight line.


Initial worker assignment

Assigning workers to operands is important for improving graph execution performance. Randomly assigning the initial operands increases network overhead and can lead to imbalanced job allocation between different workers. The assignment of nodes other than the initial node can be easily determined according to the physical distribution of the data generated by the precursor and the idle state of each worker. Therefore, the execution graph preparation stage only considers the assignment of initial operands.

There are several principles that must be followed regarding the assignment of initial workers. First, the operands assigned to each worker should be as balanced as possible. This allows computational clusters to have higher utilization during the entire execution phase, which is especially important during the final phase of execution. Second, the first node allocation requires minimal network traffic when subsequent nodes run. In other words, the initial node allocation should follow the principle completely.

Please note that the above principles may conflict with each other. Allocation solutions with minimal network traffic can be very distorted. We have developed a heuristic algorithm to balance the two goals. The algorithm is described as follows: :

  1. The first initial node and the first machine in the list are selected.
  2. In the undirected graph converted from the operand graph, the depth-first search is started from that node.
  3. For the undirected graph converted from the operand graph, start the depth-first search from that node; if another unassigned initial node is accessed, assign it to the machine selected in step 1.
  4. Also, start the depth-first search from the graph converted from the operand graph.
  5. If there are still workers who have not been assigned operands, proceed to step 1. If there are still workers with no operands assigned, go to step 1.

Scheduling policy

When a graph composed of operands is executed, a proper execution order can reduce the amount of data temporarily stored in the cluster, thus reducing the likelihood that the data will be dumped to disk. .. The right workers can reduce the total network traffic running.

Operand selection policy

A good execution sequence can significantly reduce the total amount of data temporarily stored in the cluster. The following figure shows an example of tree reduction. Circles indicate operands, squares indicate chunks, red indicates that the operand is being executed, blue indicates that the operand is executable, green indicates that the chunk generated by the operand is stored, and gray indicates that the operand and its associated data have been released. I will. The figure below shows the state when two workers are executed, assuming that each operand uses the same amount of resources, and the different policies are executed every 5 hours. The figure on the left shows that they are executed according to the hierarchy, and the figure on the right shows that they are executed in the order of depth-first. The graph on the left needs to temporarily store data for 6 chunks, and the graph on the right needs to store data for only 2 chunks.


Since our goal is to reduce the total amount of data stored in the cluster, we prioritize the operands in the READY state.

  1. The deep operand must be executed first.
  2. Operands that depend on deeper operands must be executed first. 3, the node with small output size should be executed first.

Worker selection policy

When the scheduler is ready to run the graph, the worker for the first operand has been determined. Assigns workers for subsequent operands based on the worker with input data. If there is a worker with the largest size of input data, that worker is selected to execute subsequent operands. When there are multiple workers with the same input data size, the resource state of each candidate worker plays a decisive role.

Operand state

Each Mars operator is individually scheduled by the Operating Actor. Execution processing is state transition processing. OperandActor defines a state transition function in the process of entering each state. At initialization, the initial operand is in the READY state and the non-initial operand is in the UNSCHEDULED state. If the specified condition is met, the operand transitions to another state and the corresponding operation is performed. The following figure shows the state transition process.


The following describes the meaning of each state and the operations Mars performs in these states.

--UNSCHEDULED: The state when the upstream data of the operand is not ready. --READY: The operand is in a state where the upstream data for that operand is not ready. In such cases, use Properties to change the value of Properties. The scheduler sends a stop message to other workers and sends a message to the worker to start running the job. --RUNNING: The operand will be in this state when its execution is started. When this happens, the OperatingActor checks to see if the job has been submitted. When this happens, the Operating Actor checks to see if a job has been submitted. The OperatingActor then registers the callback with the worker and gets a message that the job is complete.

--FINISHED: When the operand is in this state, a message is sent to the GraphActor to determine if the entire graph has finished executing. If the operands are in this state and there is no successor, a message is sent to his GraphActor to determine if the entire graph has finished executing. At the same time, the OperandActor sends a message to its precursor and successor indicating that the execution is complete. The precursor that receives the message checks to see if all successors have finished executing. In that case, you can free the data for the current operand. When the successor receives the message, it checks if all the precursors have completed. If so, you can transition the state of the successor to READY. --FREED: The operand will be in this state when all the data has been released. --FATAL: The operand is in this state when all its data has been released. The operand will be in this state if all rerun attempts fail. When the operand is in this state, it passes the same state to the successor node. --CANCELLING: This state occurs when the operand is cancelled. If the operand is running, send a request to the worker to cancel the run. --CANCELLED: The state when the operand is cancelled. This is the state when the operand is canceled and execution is stopped. When the execution enters this state, the Operating Actor will try to move all subsequent states to CANCELLING.

Worker content

Mars workers include multiple processes to mitigate the impact of the GIL at runtime. Certain executions are completed in a separate process. To reduce unnecessary memory copies and communication between processes, Marsworkers use shared memory to store execution results.

When a job is submitted to a worker, it is first queued to wait for memory allocation. When memory is allocated, the data on other workers and the data dumped to disk on the current worker is reloaded into memory. At this point, all the data needed for the calculation is already in memory and you are ready to start the actual calculation process. When the calculation is complete, the worker puts the job in shared storage. The transition relationship between the four execution states is shown in the figure below.


Control of execution

The Mars worker controls the execution of all operators within the worker through the ExecutionActor. The Actor itself is not involved in the actual operation or data transfer, it just submits the task to other Actors.

The scheduler's OperandActor submits the job to the worker through the ExecutionActor's ʻenqueue_graphcall. The worker accepts the operand input and caches it in the queue. At this time, the worker accepts the worker's operand posts and caches them in the queue. When the scheduler decides to execute the operand on the current worker, it calls thestart_execution method and registers the callback through ʻadd_finish_callback. This design allows execution results to be received in multiple locations, which is useful for disaster recovery.

ExecutionActor uses the mars.promise module to process execution requests from multiple operands at the same time. Specific execution steps are linked via the "then" method of the Promise class. When the final execution result is stored, the previously registered callback will be triggered. If an error occurs in any of the previous execution steps, the error is passed to the handler function registered in the catch method for processing.

Operand sort

In such cases, the scheduler sends a large number of operands to the selected worker. Therefore, for most execution times, the number of operands submitted to the worker is usually greater than the total number of operands the worker can handle. The worker must sort the operands and select some of the operands to execute from. This sorting process is performed by the TaskQueueActor, which maintains a priority queue that stores information about operands. At the same time, the TaskQueueActor runs the job assignment task on a regular basis and allocates execution resources to the topmost operand in the priority queue until there are no more resources to execute the operands. This allocation process is also triggered when a new operand is submitted or when the operand execution is complete.

Memory management

Mars workers manage two aspects of memory. The first part is the private memory of each worker process, which each process owns. The other is memory shared by all processes, held by plasma_store in Apache Arrow. I am.

To avoid process memory overflow, we have introduced a worker-level QuotaActor that allocates process memory. Before starting execution of the operand, the operand sends a batch of memory requests for chunks of input and output to QuotaActor. If the remaining memory space can meet the request, the request is accepted by the QuotaActor. Otherwise, the request will be queued to wait for free resources. When the associated memory is freed, so is the requested resource. At this point, QuotaActor can allocate resources to other operands.

Shared memory is managed by plasma_store and typically occupies 50% of total memory. Since there is no possibility of overflow, this part of memory is allocated directly via the associated plasma_store method, not via QuotaActor. When shared memory is used up, Marsworkers try to throw unused chunks to disk to free up space for new chunks.

Data in chunks dumped from shared memory to disk can be reused by subsequent operands, but reloading data from disk to shared memory is especially exhausted and loaded. If you need to dump other chunks to disk to accommodate one chunk, it can require a lot of IO resources. So if you don't need data sharing (for example, if chunks are used in only one operand), load the chunks directly into the process's private memory instead of shared memory. This can significantly reduce the total job execution time.

Future work

Mars is currently iterating rapidly. We are considering implementing worker-level failover and shuffle support in the near future, and are planning scheduler-level failover.

Recommended Posts

Distributed architecture implemented by Mars