wiki:hpc:mpi4py
no way to compare when less than two revisions
Differences
This shows you the differences between two versions of the page.
— | wiki:hpc:mpi4py [2021/04/13 15:41] (current) – created - external edit 127.0.0.1 | ||
---|---|---|---|
Line 1: | Line 1: | ||
+ | ======= Mpi4py ======= | ||
+ | |||
+ | ===== Parallel Python with mpi4py | ||
+ | mpi4py provides open source python bindings to most of the functionality of the MPI-2 standard of the message passing interface MPI. Version 1.3.1 of mpi4py is installed on GWDG' | ||
+ | |||
+ | Documentation for mpi4py can be found at [[https:// | ||
+ | |||
+ | This tutorial will provide a more complete description of the point to point and collective communication methods in mpi4py and demonstrate their use in code examples. | ||
+ | A knowledge of the python programming language is assumed. The basics of the message passing programming model will be explained, links to complete documentations of the message passing interface MPI can be found from the web site of the [[http:// | ||
+ | |||
+ | ===== Using mpi4py | ||
+ | Login with ssh on gwdu101 or gwdu102 and load the necessary MPI environment with | ||
+ | |||
+ | < | ||
+ | module load openmpi</ | ||
+ | |||
+ | and the python and mpi4py environment by | ||
+ | |||
+ | < | ||
+ | module load python</ | ||
+ | |||
+ | Now you are ready to execute a first parallel python program: | ||
+ | |||
+ | < | ||
+ | # hello_world_mpi4py | ||
+ | from mpi4py import MPI | ||
+ | |||
+ | comm = MPI.COMM_WORLD | ||
+ | size = comm.Get_size() | ||
+ | rank = comm.Get_rank() | ||
+ | |||
+ | print (size, rank)</ | ||
+ | |||
+ | Executing this script (no compilation is necessary for the interpreted language python) with the command | ||
+ | |||
+ | < | ||
+ | mpirun -n 3 python hello_world_parallel.py </ | ||
+ | |||
+ | then will produce the output | ||
+ | |||
+ | < | ||
+ | 3 0 | ||
+ | 3 1 | ||
+ | 3 2</ | ||
+ | |||
+ | ===== Language Elements of mpi4py for Parallel Computing | ||
+ | Parallel computing is a means to accelerate the solution of a given computational problem by employing multiple hardware ressources simulataneously. | ||
+ | The parallelism underlying MPI and therefore also mpi4py is based on the simultaneous execution of several independend programs (instruction streams), each of them operating on its own data. In [[http:// | ||
+ | The " | ||
+ | |||
+ | In the [[http:// | ||
+ | |||
+ | In MPI the communication context is named **communicator**. The communicator for the tasks generated at start time by the mpirun command is called '' | ||
+ | |||
+ | < | ||
+ | comm.Get_size() | ||
+ | comm.Get_rank() | ||
+ | |||
+ | Nearly all other language elements and library routines, which are provided in the MPI-2 Standard are accessible from the '' | ||
+ | |||
+ | ===== Point-To-Point Communication | ||
+ | Point to point communication in the message passing programming model is a two step process, consisting of sending some data from one task and of receiving these data by another task. The sending task must specify | ||
+ | |||
+ | Generic data items in python are objects, containing collections of typed data structures. The physical transmission from source to destination of these data objects occurs as a stream of bits. The necessary transformation between objects, which contain data scattered in memory, and contiguous memory areas to be streamed may consume large amounts of ressources for allocating memory and copying data. mpi4py therefore provides two sets of functions for communication: | ||
+ | |||
+ | ==== Communication of Generic Python Data Objects | ||
+ | The method for sending generic objects has the syntax | ||
+ | |||
+ | < | ||
+ | comm.send(buf, | ||
+ | |||
+ | Only the argument '' | ||
+ | |||
+ | The method for receiving is implemented slightly different. Its syntax is | ||
+ | |||
+ | < | ||
+ | comm.recv(obj=None, | ||
+ | |||
+ | and the object to be received is the return value of the function '' | ||
+ | |||
+ | |||
+ | The receiving task can provide the values '' | ||
+ | |||
+ | A simple program for communicating data from one task to another looks like | ||
+ | |||
+ | **send.py: | ||
+ | |||
+ | < | ||
+ | # send | ||
+ | from mpi4py import MPI | ||
+ | |||
+ | comm = MPI.COMM_WORLD | ||
+ | size = comm.Get_size() | ||
+ | rank = comm.Get_rank() | ||
+ | |||
+ | data = None | ||
+ | if rank == 0: | ||
+ | data = (1,' | ||
+ | comm.send(data, | ||
+ | elif rank == 1: | ||
+ | print ('on task', | ||
+ | data = comm.recv(source=0, | ||
+ | print ('on task', | ||
+ | |||
+ | and the command '' | ||
+ | |||
+ | < | ||
+ | on task 1 before recv: data = None | ||
+ | on task 1 after recv: data = (1, ' | ||
+ | |||
+ | ==== Communication of Buffer Like Objects | ||
+ | |||
+ | A buffer like object in python occupies contiguous memory space and therefore can be specified by its address and extent. An example is the NumPy array-object, | ||
+ | |||
+ | The syntax for the methods for sending and receiving buffer-type objects is | ||
+ | |||
+ | < | ||
+ | comm.Send(buf, | ||
+ | comm.Recv(buf, | ||
+ | |||
+ | The buffer arguments in the methods have to be given in the form of a list with 3 or 2 elements | ||
+ | |||
+ | < | ||
+ | [data, | ||
+ | |||
+ | where in the second case the count is implied by the byte size of the buffer object '' | ||
+ | |||
+ | An example for communicating a NumPy array is | ||
+ | |||
+ | **send_array.py: | ||
+ | |||
+ | < | ||
+ | # send_array | ||
+ | from mpi4py import MPI | ||
+ | import numpy | ||
+ | |||
+ | comm = MPI.COMM_WORLD | ||
+ | size = comm.Get_size() | ||
+ | rank = comm.Get_rank() | ||
+ | |||
+ | if rank == 0: | ||
+ | data = numpy.arange(5, | ||
+ | comm.Send([data, | ||
+ | elif rank == 1: | ||
+ | data = 10.*numpy.arange(5, | ||
+ | print 'on task', | ||
+ | comm.Recv(data, | ||
+ | print 'on task', | ||
+ | |||
+ | Starting two tasks with this program will produce the output | ||
+ | |||
+ | < | ||
+ | on task 1 before Recv: data = [ 0. 10. 20. 30. 40.] | ||
+ | on task 1 after Recv: data = [ 0. | ||
+ | |||
+ | ==== Using the '' | ||
+ | |||
+ | In order to access the status information from the Recv method, an instance of '' | ||
+ | < | ||
+ | info = MPI.Status() | ||
+ | MPI.Recv(buf, | ||
+ | |||
+ | The information about the received message is returned by calls to various methods of the class Status instance info: | ||
+ | |||
+ | < | ||
+ | info.Get_count() | ||
+ | info.Get_elements(datatype) | ||
+ | of type datatype | ||
+ | info.Get_source() | ||
+ | info.Get_tag() | ||
+ | |||
+ | An example for retrieving the status information is (**from now on the code lines for importing modules and defining comm, size and rank will be not displayed**) | ||
+ | |||
+ | **status.py: | ||
+ | < | ||
+ | data = rank*numpy.ones(5, | ||
+ | |||
+ | if rank == 0: | ||
+ | | ||
+ | if rank == 1: | ||
+ | info = MPI.Status() | ||
+ | | ||
+ | | ||
+ | tag = info.Get_tag() | ||
+ | count = info.Get_elements(MPI.DOUBLE) | ||
+ | size = info.Get_count() | ||
+ | print ' | ||
+ | |||
+ | which running in two tasks produces the output | ||
+ | |||
+ | < | ||
+ | on 1 source, tag, count, size is 0 1 3 24</ | ||
+ | |||
+ | A Recv operation will fail, if the size of the buffer passed in the '' | ||
+ | |||
+ | **probe.py: | ||
+ | < | ||
+ | if rank == 0: | ||
+ | data = rank*numpy.ones(5, | ||
+ | | ||
+ | if rank == 1: | ||
+ | info = MPI.Status() | ||
+ | | ||
+ | count = info.Get_elements(MPI.DOUBLE) | ||
+ | data = numpy.empty(count, | ||
+ | | ||
+ | print ' | ||
+ | |||
+ | ==== Blocking and Nonblocking Communications | ||
+ | |||
+ | MPI offers two basic ways for communication, | ||
+ | |||
+ | 1. //Blocking Communication//, | ||
+ | |||
+ | The mpi4py class '' | ||
+ | |||
+ | 2. // | ||
+ | |||
+ | In MPI the names for nonblocking communication calls are derived from the respective names for the blocking calls by prepending an " | ||
+ | |||
+ | The following example shows a nonblocking communication of a NumPy array, in which the two methods '' | ||
+ | |||
+ | **isend_array.py: | ||
+ | < | ||
+ | a_size = 10 | ||
+ | data = (1+rank)*numpy.arange(a_size, | ||
+ | if rank == 0: | ||
+ | req=comm.Isend([data, | ||
+ | elif rank == 1: | ||
+ | print 'on task', | ||
+ | req=comm.Irecv(data, | ||
+ | re = False | ||
+ | while re == False : | ||
+ | | ||
+ | print 'test result', | ||
+ | re=MPI.Request.Wait(req) | ||
+ | print 'wait result', | ||
+ | print 'on task', | ||
+ | |||
+ | ==== Modes of Communications | ||
+ | |||
+ | The blocking send operation can be implemented by directly streaming the message data from memory into the communcation system. In this case the send call will block until a corresponding receive call has been posted and copied the data stream into the memory of the receiving task. This is called the **synchronous** communication mode. | ||
+ | |||
+ | Alternatively, | ||
+ | |||
+ | A third possibility is an implementation, | ||
+ | |||
+ | The **standard** send operation in MPI is a combination of buffered mode for short and synchronous mode for long messages. The border between the two modes is implementation dependend, portable programs using the standard send should not depend on properties of one of the modes. | ||
+ | |||
+ | MPI provides access to the other three modes in send routines, whose names are prepended by the letters " | ||
+ | |||
+ | mpi4py provides in its '' | ||
+ | < | ||
+ | Send, Ssend, Bsend, Rsend blocking calls | ||
+ | Isend, Issend, Ibsend, Irsend | ||
+ | |||
+ | and with lower case first letter for sending generic python data objects | ||
+ | |||
+ | < | ||
+ | send, ssend, bsend | ||
+ | isend, issend, ibsend | ||
+ | no ready mode send for generic python data objects</ | ||
+ | |||
+ | Furthermore the mpi4py function '' | ||
+ | |||
+ | The next example shows a buffered send and the syntax to attach the buffer. As can be seen, the size of the buffer is slightly larger than the size of the data to be sent, because the message-data contain in addition to the data proper also information about their size, the destination and the identifier of the message. | ||
+ | |||
+ | **bsend_array.py: | ||
+ | < | ||
+ | a_sz = 1000 | ||
+ | data = rank*numpy.arange(a_sz, | ||
+ | if rank == 0: | ||
+ | buf = numpy.empty(a_sz+3, | ||
+ | MPI.Attach_buffer(buf) | ||
+ | comm.Bsend(data, | ||
+ | elif rank == 1: | ||
+ | print 'on task', | ||
+ | comm.Recv(data, | ||
+ | print 'on task', | ||
+ | |||
+ | ==== Avoiding deadlocks by using '' | ||
+ | |||
+ | Exchanging data between neighbours in a ring of tasks can lead to a deadlock, as exemplified in the following program: | ||
+ | |||
+ | **exch_deadlock.py: | ||
+ | < | ||
+ | a_size = 10 | ||
+ | send_data=rank*numpy.arange(a_size, | ||
+ | recv_data=-numpy.empty(a_size, | ||
+ | ipr = (rank +1)%size | ||
+ | ipl = (rank -1)%size | ||
+ | comm.Ssend(send_data, | ||
+ | comm.Srecv(recv_data, | ||
+ | |||
+ | print 'on task', | ||
+ | |||
+ | This program will never end, because every tasks posts a call to the synchroneous mode '' | ||
+ | |||
+ | < | ||
+ | comm.Sendrecv(send_data, | ||
+ | recvbuf=recv_data, | ||
+ | |||
+ | ===== Collective Communications | ||
+ | |||
+ | Point to point communication is the basic tool for parallel computing within the messige passing programming model. More general communication patterns involving more than two tasks can be realised by combining several of these basic communication operations. For example synchronising all tasks in a communicator can be achieved by handshakes of one tasks with all other tasks and informing all tasks about the completion of all these handshakes. This is illustrated with the code for the function '' | ||
+ | |||
+ | < | ||
+ | def synchron(): | ||
+ | buf = None | ||
+ | if rank == 0: | ||
+ | for ip in range(1, | ||
+ | | ||
+ | | ||
+ | for ip in range(1, | ||
+ | | ||
+ | else: | ||
+ | | ||
+ | | ||
+ | buf = comm.recv(source=0)</ | ||
+ | |||
+ | This kind of synchronisation can be thought of as a // | ||
+ | |||
+ | < | ||
+ | for ip in range(0, | ||
+ | if rank == ip: | ||
+ | print ' | ||
+ | | ||
+ | |||
+ | Communication involving all tasks of a communicator is called // | ||
+ | |||
+ | These patterns can be classified according to the kind of data manipulation involved | ||
+ | |||
+ | * Global Synchronisation, | ||
+ | * Global data communication, | ||
+ | * Global data reduction, data of different tasks are reduced to new data | ||
+ | |||
+ | ==== Global Synchronisation | ||
+ | |||
+ | The only global synchronisation operation in MPI is the barrier. | ||
+ | The syntax for the barrier synchronisation function in the mpi4py | ||
+ | |||
+ | < | ||
+ | Barrier().</ | ||
+ | |||
+ | A task posting a call to '' | ||
+ | |||
+ | ==== Global Data Communication | ||
+ | |||
+ | Data communication between all tasks of a communicator can have two directions: | ||
+ | * distributing data coming from one tasks to all tasks | ||
+ | * collecting data coming from all tasks to one or all tasks | ||
+ | |||
+ | For distributing data mpi4py provides functions for | ||
+ | * broadcasting: | ||
+ | * scattering from one task:'' | ||
+ | |||
+ | For collecting data mpi4py provides functions for | ||
+ | * gathering to one task: '' | ||
+ | * gathering to all tasks:'' | ||
+ | |||
+ | In addition, there are functions, which combine collection and distribution of data: | ||
+ | * '' | ||
+ | |||
+ | === broadcast | ||
+ | The syntax of the broadcast method is | ||
+ | |||
+ | < | ||
+ | comm.Bcast(buf, | ||
+ | buf = comm.bcast(obj=None, | ||
+ | |||
+ | An example for broadcasting an NumPy array is | ||
+ | |||
+ | **bcast_array.py: | ||
+ | < | ||
+ | data = numpy.empty(5, | ||
+ | if rank == 0: | ||
+ | data = numpy.arange(5, | ||
+ | comm.Bcast([data, | ||
+ | print 'on task', | ||
+ | |||
+ | An example broadcasting a python data object is | ||
+ | |||
+ | **bcast.py: | ||
+ | < | ||
+ | rootdata = None | ||
+ | if rank == 0: | ||
+ | rootdata = (1,' | ||
+ | data = comm.bcast(rootdata, | ||
+ | print 'on task', | ||
+ | |||
+ | A difference between the two broadcast functions is, that in '' | ||
+ | |||
+ | === scatter | ||
+ | |||
+ | Whereas a broadcast operation distributes the same object from the root task to all other tasks, a scattering operation sends a different object from root to every task. The syntax of the methods for scattering is | ||
+ | < | ||
+ | comm.Scatter(sendbuf, | ||
+ | comm.Scatterv(sendbuf, | ||
+ | comm.scatter(sendobj=None, | ||
+ | |||
+ | In the upper case function '' | ||
+ | |||
+ | < | ||
+ | buf = [data, data_size, data_type]</ | ||
+ | |||
+ | where '' | ||
+ | |||
+ | In the vector variant of this function, | ||
+ | |||
+ | < | ||
+ | sendfbuf = [data, counts, displacements, | ||
+ | |||
+ | where '' | ||
+ | |||
+ | The calls of '' | ||
+ | |||
+ | **scatter_array, | ||
+ | < | ||
+ | a_size = 4 | ||
+ | recvdata = numpy.empty(a_size, | ||
+ | senddata = None | ||
+ | if rank == 0: | ||
+ | | ||
+ | comm.Scatter(senddata, | ||
+ | print 'on task', | ||
+ | |||
+ | recvdata = numpy.empty(a_size, | ||
+ | counts = None | ||
+ | dspls = None | ||
+ | if rank == 0: | ||
+ | | ||
+ | | ||
+ | | ||
+ | comm.Scatterv([senddata, | ||
+ | print 'on task', | ||
+ | |||
+ | In the '' | ||
+ | |||
+ | **scatter.py: | ||
+ | < | ||
+ | rootdata = None | ||
+ | if rank == 0: | ||
+ | rootdata = [1, | ||
+ | data = comm.scatter(rootdata, | ||
+ | print 'on task', | ||
+ | |||
+ | === gather | ||
+ | |||
+ | The gather operations collects data from all tasks and delivers this collection to the root task or to all tasks. The syntax of the gathering methods is | ||
+ | |||
+ | < | ||
+ | comm.Gather(sendbuf, | ||
+ | comm.Gatherv(sendbuf, | ||
+ | comm.Allgather(sendbuf, | ||
+ | comm.Allgatherv(sendbuf, | ||
+ | comm.gather(sendobj=None, | ||
+ | comm.allgather(sendobj=None, | ||
+ | |||
+ | Compared to the scatter methods, the roles of sendbuf and recvbuf are exchanged for '' | ||
+ | |||
+ | **gather_array.py: | ||
+ | < | ||
+ | a_size = 4 | ||
+ | recvdata = None | ||
+ | senddata = (rank+1)*numpy.arange(a_size, | ||
+ | if rank == 0: | ||
+ | | ||
+ | comm.Gather(senddata, | ||
+ | print 'on task', | ||
+ | |||
+ | counts=(2, | ||
+ | dspls=(0, | ||
+ | if rank == 0: | ||
+ | | ||
+ | sendbuf = [senddata, | ||
+ | recvbuf = [recvdata, | ||
+ | comm.Gatherv(sendbuf, | ||
+ | print 'on task', | ||
+ | |||
+ | The lower case function '' | ||
+ | |||
+ | **gather.py: | ||
+ | < | ||
+ | senddata = [' | ||
+ | rootdata = comm.gather(senddata, | ||
+ | print 'on task', | ||
+ | |||
+ | The " | ||
+ | |||
+ | === alltoall | ||
+ | |||
+ | alltoall operations combine gather and scatter. In a communicator with '' | ||
+ | |||
+ | < | ||
+ | comm.Alltoall(sendbuf, | ||
+ | comm.Alltoallv(sendbuf, | ||
+ | comm.alltoall(sendobj=None, | ||
+ | |||
+ | The data objects in sendbuf and recvbuf have types depending on the method. In '' | ||
+ | |||
+ | An example for '' | ||
+ | |||
+ | **alltoall_array.py: | ||
+ | < | ||
+ | a_size = 1 | ||
+ | senddata = (rank+1)*numpy.arange(size*a_size, | ||
+ | recvdata = numpy.empty(size*a_size, | ||
+ | comm.Alltoall(senddata, | ||
+ | |||
+ | Next an example for '' | ||
+ | |||
+ | **alltoall.py: | ||
+ | < | ||
+ | data0 = [' | ||
+ | data1 = [1,' | ||
+ | senddata=(data0, | ||
+ | print 'on task', | ||
+ | recvdata = comm.alltoall(senddata) | ||
+ | print 'on task', | ||
+ | |||
+ | ==== Global Data Reduction | ||
+ | |||
+ | The data reduction functions of MPI combine data from all tasks with one of a set of predefined operations and deliver the result of this " | ||
+ | |||
+ | < | ||
+ | MPI.MIN | ||
+ | MPI.MAX | ||
+ | MPI.SUM | ||
+ | MPI.PROD | ||
+ | MPI.LAND | ||
+ | MPI.BAND | ||
+ | MPI.LOR | ||
+ | MPI.BOR | ||
+ | MPI.LXOR | ||
+ | MPI.BXOR | ||
+ | MPI.MAXLOC | ||
+ | MPI.MINLOC | ||
+ | |||
+ | The reduction operations need data of the appropriate type. | ||
+ | |||
+ | === reduce | ||
+ | |||
+ | The syntax for the reduction methods is | ||
+ | |||
+ | < | ||
+ | comm.Reduce(sendbuf, | ||
+ | comm.Allreduce(sendbuf, | ||
+ | comm.reduce(sendobj=None, | ||
+ | comm.allreduce(sendobj=None, | ||
+ | |||
+ | The followimg example shows the use of '' | ||
+ | |||
+ | **reduce_array.py: | ||
+ | < | ||
+ | a_size = 3 | ||
+ | recvdata = numpy.zeros(a_size, | ||
+ | senddata = (rank+1)*numpy.arange(a_size, | ||
+ | comm.Reduce(senddata, | ||
+ | print 'on task', | ||
+ | |||
+ | comm.Allreduce(senddata, | ||
+ | print 'on task', | ||
+ | |||
+ | The use of the lower case methods reduce and allreduce operating on generic python | ||
+ | |||
+ | === reduce_scatter | ||
+ | |||
+ | The reduce_scatter functions operate elementwise on '' | ||
+ | The syntax for the reduction methods is | ||
+ | |||
+ | < | ||
+ | comm.Reduce_scatter_block(sendbuf, | ||
+ | comm.Reduce_scatter(sendbuf, | ||
+ | |||
+ | In '' | ||
+ | |||
+ | **reduce_scatter_block: | ||
+ | < | ||
+ | a_size = 3 | ||
+ | recvdata = numpy.zeros(a_size, | ||
+ | senddata = (rank+1)*numpy.arange(size*a_size, | ||
+ | print 'on task', | ||
+ | comm.Reduce_scatter_block(senddata, | ||
+ | print 'on task', | ||
+ | |||
+ | In '' | ||
+ | |||
+ | **reduce_scatter: | ||
+ | < | ||
+ | recv_size = range(1, | ||
+ | recvdata = numpy.zeros(recv_size[rank], | ||
+ | send_size = 0 | ||
+ | for i in range(0, | ||
+ | | ||
+ | senddata = (rank+1)*numpy.arange(send_size, | ||
+ | print 'on task', | ||
+ | comm.Reduce_scatter(senddata, | ||
+ | print 'on task', | ||
+ | |||
+ | === Reduction with MINLOC and MAXLOC | ||
+ | |||
+ | The reduction operations MINLOC and MAXLOC differ from all others: they return two results, the minimum resp. maximum of the values in the different tasks and the rank of a task, which holds the extreme value. mpi4py | ||
+ | |||
+ | **reduce_minloc.py: | ||
+ | < | ||
+ | inp = numpy.random.rand(size) | ||
+ | senddata = inp[rank] | ||
+ | recvdata=comm.reduce(senddata, | ||
+ | print 'on task', | ||
+ | |||
+ | recvdata=comm.allreduce(senddata, | ||
+ | print 'on task', | ||
+ | |||
+ | ===== Code Examples | ||
+ | |||
+ | The python codes for all examples described in this tutorial are available from [[http:// | ||
+ | |||
+ | [[Kategorie: | ||
wiki/hpc/mpi4py.txt · Last modified: 2021/04/13 15:41 by 127.0.0.1