A set of functions that extend the language towards a paradigm where each process has its own memory space and can share and receive messages.
Message Passing
To send a message to another process, the process puts the data into a buffer and calls a send function. The receiving process must have acknowledged that it wants to receive it; send and receive function calls are always paired.
MPI is responsible for the internals of actually sending the data.
The programs are initially two completely separate programs; MPI must open a communication channel that can communicate between programs.
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size() # number of processes connected to the world
rank = comm.Get_rank() # process identifier. Starts at 0
comm.Send(var_to_send , dest=${process_to_send_to})
comm.Recv(var_to_overwrite, dest=${process_to_receive_to})
The variable you send/receive must map to a byte buffer (e.g. numpy arrays). The receive buffer can be larger than the send buffer but not the reverse (MPI_ERR_TRUNCATE in Python - if it’s in C it will probably just write past the end of the structure).
The send and receive functions also have a tag argument: a value to identify the type of message being sent.
There is also a special MPI.ANY_SOURCE to receive messages from any process. To figure out what rank the message came from:
message = numpy.zeros(1)
status = MPI.Status()
comm.Recv(message, source=MPI.ANY_SOURCE, status=status)
print(status.source)
You can also use comm.Probe() to get the status and tag values before actually receiving the value; this can be useful if the rank determines the index of the array to write to.
To run the program:
mpiexec -n ${num_processes} python file.py
# For Colab:
mpiexec --allow-run-as-root -n ${num_processes} python file.py # --allow-run-as-root flag required
In the Jupyter code block, add the magic command %%writefile filename.py to the top of the file; when the code block is run, it saves the contents to the file.
Non-Blocking Communication
Allows you to do a small amount of communication while you wait for the message to be sent or response to be received.
req = comm.Isend(data, dest=RANK)
# or
req = comm.Irecv(data, source=RANK)
req.Wait()
Problem Decomposition
TODO rename section/task to partition?
Assignment of large data set into small tasks.
There will often be communication at the boundary region of the set(s) each task is responsible for.
Can partition into blocks, where each task is given a slice of the block in a given dimension.
| | | | | ^
| a | b | c | d | | n
| | | | | v
Each border requires
NB: partitioning can be done in multiple dimensions, although it does not reduce the amount of communication.
Could also partition cyclically where each task is given multiple slices of the block in a given dimension.
| 1 | 2 |
| | | | | | | | |
|a|b|c|d|a|b|c|d|
| | | | | | | | |
Much boundaries, much communication. However, if you can’t fit the whole problem set in memory, you can split the problem down and solve each section sequentially, splitting each section down further into tasks and running them in parallel. This allows you to checkpoint the results of each section.
This is also useful in GPUs where cache is limited - do a cheap bulk read operation to put as much as you can in GPU cache? Do a sequential read operation rather than a read for every single task.
It is also useful for load balancing: some areas of the problem may take much longer than others so by making tasks smaller, the difficult area is likely to be split between multiple tasks.
Domain Decomposition
Master-worker decomposition: one master task responsible for decomposition and assigning the tasks. If task takes too long or isn’t accurate enough, decompose that task further and reassign.
Collective Operations
A way to send and collect a lot of data.
Broadcasting: sending message to all processes; comm.Bcast
Scatter: sending chunks of an array to each process; comm.Scatter
Gather: taking chunks of arrays from many processes and combining them; comm.Gather
Computational Science
- Experiment
- Theory
- Simulate (‘theoretical experiments’)
Simulating requires much more computation than communication.
Parallelism
Often, a large group of operations can be done concurrently without memory conflicts.
Sometimes, each cell update affects cells with neighboring faces. Hence, cells that do share faces can be updated simultaneously. Communication is required to reduce the forces from neighboring cells.
e.g. brick wall: if each person has their own horizontal slice of the wall, at the boundaries of their section, they must ensure their neighbors have completed their own layer so that there are no gaps.
Multiple-scale applications: sometimes, different sections of the simulation take longer - some cells may more or less updates than neighboring cells, or there may be more ‘interesting’ areas that need to be simulated in higher resolution.
N-body problem
Finding the position and velocities of a collection of interacting particles.
Conway’s Game of Life
- 2 neighbors: no state change
- 3 neighbors: becomes alive
- Anything else: becomes dead
Store data in 1D array - makes sharing data between processes a bit easier.
Have two copies of the world: world in time-step
comm.Bcast(source_board, root=0) # broadcasts the data from process with rank 0 to every other node. MPI does this in a distributed way.
# update rows the process is responsible for
for j in range([rank * rows_per_process, (rank + 1) * rows_per_process]):
update_row(source_board, target_board, j)
# now, ensure each process has a consistent view of the world
if rank != 0:
comm.Send(target_board[\
rank * rows_per_process * COLS,\
(rank + 1) * rows_per_process * COLS\
], dest = 0)
else:
# in process zero, receive data from all other processes
for j in range(num_processes - 1): # already has data from itself
comm.Probe(source = MPI.ANY_SOURCE, status = status)
# Use probe to determine rank and hence the memory location to write to
comm.Recv(target_board[\
status.rank * rows_per_process * COLS,\
(status.rank + 1) * rows_per_process * COLS\
], source=status.source)
Inefficient: only neighboring processes need to communicate with each other; top row to left/previous process, bottom row to right/next process (and vice versa):
comm.Send(top_row, left_process)
comm.Send(bottom_row, right_process)
comm.Recv(left_process_bottom_row, left_process)
comm.Recv(right_process_top_row, right_process)
This would cause a deadlock.
Deadlock
Many function calls are blocking; if the condition the function is waiting for never happens, then the process will deadlock.
Because every send has to have a matching receive, the Send call will wait until the receiving process has a matching Recv call. Since all processes are trying to send and then receive, no process will be waiting for a message and the program will deadlock.
If too many Isend calls occur quickly, it will fill up the memory buffer (in the network stack) and so it must wait before it can send. If the message the receiver is waiting for cannot fit (with a blocking Recv) in the buffer, it will deadlock.
A quick fix for this is using Irecv but this inelegant as it requires polling.
Solution: paired send-and-receive.
if rank % 2:
comm.Send(..., left_process)
comm.Recv(..., left_process)
else:
comm.Recv(..., right_process)
comm.Send(..., right_process)
MPI also has a built in sendrecv method.