User Tools

Site Tools


wiki:hpc:mpi4py
no way to compare when less than two revisions

Differences

This shows you the differences between two versions of the page.


wiki:hpc:mpi4py [2021/04/13 15:41] (current) – created - external edit 127.0.0.1
Line 1: Line 1:
 +======= Mpi4py =======
 +
 +=====  Parallel Python with mpi4py  =====
 +mpi4py provides open source python bindings to most of the functionality of the MPI-2 standard of the message passing interface MPI. Version 1.3.1 of mpi4py is installed on GWDG' scientific computing cluster. 
 +
 +Documentation for mpi4py can be found at [[https://mpi4py.readthedocs.io/en/stable/index.html]],where short descitions of the most   basic functions of mpi4py can be found.
 +
 +This tutorial will provide a more complete description of the point to point and collective communication methods in mpi4py and demonstrate their use in code examples.
 +A knowledge of the python programming language is assumed. The basics of the message passing programming model will be explained, links to complete documentations of the message passing interface MPI can be found from the web site of the [[http://www.mpi-forum.org/|MPI forum]], eg. the complete description of the [[http://www.mpi-forum.org/docs/mpi-2.2/mpi22-report/mpi22-report.htm|MPI-2.2 standard]].
 +
 +=====  Using mpi4py  =====
 +Login with ssh on gwdu101 or gwdu102 and load the necessary MPI environment with
 +
 +<code>
 +module load openmpi</code>
 +
 +and the python and mpi4py environment by
 +
 +<code>
 +module load python</code>
 +
 +Now you are ready to execute a first parallel python program:  
 +
 +<code>
 +# hello_world_mpi4py
 +from mpi4py import MPI
 +
 +comm = MPI.COMM_WORLD
 +size = comm.Get_size()
 +rank = comm.Get_rank()
 +
 +print (size, rank)</code>
 +
 +Executing this script (no compilation is necessary for the interpreted language python) with the command
 +
 +<code>
 +mpirun -n 3 python hello_world_parallel.py </code>
 +
 +then will produce the output
 +
 +<code>
 +3 0 
 +3 1 
 +3 2</code>
 +
 +=====  Language Elements of mpi4py for Parallel Computing  =====
 +Parallel computing is a means to accelerate the solution of a given computational problem by employing multiple hardware ressources simulataneously.
 +The parallelism underlying MPI and therefore also mpi4py is based on the simultaneous execution of several independend programs (instruction streams), each of them operating on its own data. In [[http://en.wikipedia.org/wiki/Flynn's_taxonomy|Flynn's taxonomy]] of parallel computer architectures this is **MIMD** (Multiple Instruction streams-Multiple Data streams).
 +The "hello-world" example above is a special case of an MIMD execution: The  //same// program is executed simultaneously in multiple independend processes. This kind of parallel execution is called **SPMD** (Single Program-Muliple Data streams). Actually, SPMD allows as general parallel computations as MIMD, because the actual execution path in each instance of the program execution can be chosen differently.  
 +
 +In the [[http://en.wikipedia.org/wiki/Parallel_programming_model|message passing programming model]] underlying MPI and mpi4py, each of the simultaneously running programs executes its instructions in its own address space, which is not shared with the other processes. These processes are called **tasks** in the MPI-terminology. The solution of the common computational problem requires coordination and the exchange of data between the simultaneously running tasks. This is made possible by providing a description of the **communication context**. which contains the number of tasks, which have been started and an unique identifier for each of these tasks. This identifier is realised in MPI as an integer number between 0 and n-1, if n tasks have been started. Every task can access this communication context and enquire the number of participating tasks and its own unique identifier.
 +
 +In MPI the communication context is named **communicator**. The communicator for the tasks generated at start time by the mpirun command is called ''COMM_WORLD''. mpi4py initializes COMM_WORLD automatically as an instance of its base class ''Comm''. The "hello-world" example uses the abbreviation comm=MPI.COMM_WORLD for this class instance and shows the syntax of the methods in COMM_WORLD allowing to access the context informastion:
 +
 +<code>
 +comm.Get_size()      returns the number of tasks in comm
 +comm.Get_rank()      returns the rank of the calling task</code>
 +
 +Nearly all other language elements and library routines, which are provided in the MPI-2 Standard are accessible from the ''Comm'' and other classes of mpi4py. The most important ones will be described below.
 +
 +=====  Point-To-Point Communication  =====
 +Point to point communication in the message passing programming model is a two step process, consisting of sending some data from one task and of receiving these data by another task. The sending task must specify  the data to be sent, their destination and an identifier for the message. The receiving task has to specify the location for storing the received data, their source and the identifier of the message to be received. 
 +
 +Generic data items in python are objects, containing collections of typed data structures. The physical transmission from source to destination of these data objects occurs as a stream of bits. The necessary transformation between objects, which contain data scattered in memory, and contiguous memory areas to be streamed may consume large amounts of ressources for allocating memory and copying data. mpi4py therefore provides two sets of functions for communication: the **all-lower-case** functions for communicating **generic python data objects**, which include the overhead for switching between the different data layouts, and functions with an **upper-case initial letter** for communicating  **buffer like objects** with a contiguous memory layout which can be directly transmitted without additional copying.  
 + 
 +====  Communication of Generic Python Data Objects  ====
 +The method for sending generic objects has the syntax
 +
 +<code>
 +comm.send(buf, dest=0, tag=0)</code>
 +
 +Only the argument ''buf'' is obligatory, here the python object to be sent has to be supplied. If the optional named parameters ''dest'' or ''tag'' are not supplied, they will be given the predefined values, which are 0.
 +
 +The method for receiving is implemented slightly different. Its syntax is
 +
 +<code>
 +comm.recv(obj=None, source=0, tag=0, status=None)</code>
 +
 +and the object to be received is the return value of the function ''recv''. The optional buffer object ''obj'' here allows to allocate explicitly the memory space needed to reconstruct the structure of the communicated object from its serialized form used for the actual physical transmission of the data. ''obj'' must be a writable python buffer, e.g. generated by the python function ''bytearray()'', the size of this buffer must be at least equal to the size of the transmitted serialized data object. The predefined value for ''obj'' is ''None'', which leads to an automatic dynamical allocation of memory for the needed bufferspace.
 +
 +
 +The receiving task can provide the values ''source=MPI.ANY_SOURCE'' and ''tag=MPI.ANY_TAG'' in oder to copy a message from its internal receive buffer irrespective of its source or tag. The information about source, tag, and size of the received message are available from an object of type ''Status'', which has to be supplied as argument to the parameter ''status''. This will be explained in more detail below in a section about the status argument.
 +
 +A simple program for communicating data from one task to another looks like
 +
 +**send.py:**
 +
 +<code>
 +# send
 +from mpi4py import MPI
 +
 +comm = MPI.COMM_WORLD
 +size = comm.Get_size()
 +rank = comm.Get_rank() 
 +
 +data = None
 +if rank == 0:
 +    data = (1,'a','z',3.14)
 +    comm.send(data, dest=1, tag=11)
 +elif rank == 1:
 +    print ('on task',rank,'before recv:   data = ',data)
 +    data = comm.recv(source=0, tag=11)
 +    print ('on task',rank,'after recv:    data = ',data)</code>
 +
 +and the command ''mpirun -n 2 python send.py'' will produce an output like
 +
 +<code>
 +on task 1 before recv:   data =  None
 +on task 1 after recv:    data =  (1, 'a', 'z', 3.1400000000000001)</code>
 +
 +====  Communication of Buffer Like Objects  ====
 +
 +A buffer like object in python occupies contiguous memory space and therefore can be specified by its address and extent. An example is the NumPy array-object, which is available by importing the NumPy extension of python. 
 +
 +The syntax for the methods for sending and receiving buffer-type objects is
 +
 +<code>
 +comm.Send(buf, dest=0, tag=0)
 +comm.Recv(buf, source=0, tag=0, status=None)</code>
 +
 +The buffer arguments in the methods have to be given in the form of a list with 3 or 2 elements
 + 
 +<code>
 +[data,count,MPI.type] or [data,MPI.type]</code>
 +
 +where in the second case the count is implied by the byte size of the buffer object ''data '' and the byte size of ''MPI.type''. For array objects from NumPy the argument can be the array itself, because its type and size can be obtained from the internal NumPy array description.
 +
 +An example for communicating a NumPy array is
 +
 +**send_array.py:**
 +
 +<code>
 +# send_array
 +from mpi4py import MPI
 +import numpy 
 +
 +comm = MPI.COMM_WORLD
 +size = comm.Get_size()
 +rank = comm.Get_rank()
 +
 +if rank == 0:
 +    data = numpy.arange(5,dtype=numpy.float64)
 +    comm.Send([data,3,MPI.DOUBLE],1,11)
 +elif rank == 1:
 +    data = 10.*numpy.arange(5,dtype=numpy.float64)
 +    print 'on task',rank,'before Recv:   data = ',data
 +    comm.Recv(data,source=0,tag=11)
 +    print 'on task',rank,'after Recv:    data = ',data</code>
 +
 +Starting two tasks with this program will produce the output
 +
 +<code>
 +on task 1 before Recv:   data =  [  0.  10.  20.  30.  40.]
 +on task 1 after Recv:    data =  [  0.   1.   2.  30.  40.]</code>
 +
 +====  Using the ''status'' parameter in ''Recv''  ====
 +
 +In order to access the status information from the Recv method, an instance of ''Status''-class has to be created and passed to the status parameter of Recv:
 +<code>
 +info = MPI.Status()
 +MPI.Recv(buf,source=src,tag=tg,status=info)</code>
 +
 +The information about the received message is returned by calls to various methods of the class Status instance info:
 +
 +<code>
 +info.Get_count()             returns message size in Bytes
 +info.Get_elements(datatype)  returns number of elements 
 +                             of type datatype 
 +info.Get_source()            returns message source
 +info.Get_tag()               returns message tag</code>
 +
 +An example for retrieving the status information is (**from now on the code lines for importing modules and defining comm, size and rank will be not displayed**)
 +
 +**status.py:**
 +<code>
 +data = rank*numpy.ones(5,dtype = numpy.float64)
 +
 +if rank == 0:
 +   comm.Send([data,3,MPI.DOUBLE],dest=1,tag=1)
 +if rank == 1:
 +   info = MPI.Status()
 +   comm.Recv(data,MPI.ANY_SOURCE,MPI.ANY_TAG,info)
 +   source = info.Get_source()
 +   tag = info.Get_tag()
 +   count = info.Get_elements(MPI.DOUBLE)
 +   size = info.Get_count()
 +   print 'on',rank, 'source, tag, count, size is',source, tag, count, size</code>
 +
 +which running in two tasks produces the output
 +
 +<code>
 +on 1 source, tag, count, size is 0 1 3 24</code>
 +
 +A Recv operation will fail, if the size of the buffer passed in the ''Recv'' call is smaller than the message size to be received. A possible failure of this kind can be avoided by inquiring the message status with the ''Probe'' method before executing the actual Recv operation with a buffer of appropriate size. This is demonstrated in the following example:
 +
 +**probe.py:**
 +<code>
 +if rank == 0:
 +   data = rank*numpy.ones(5,dtype = numpy.float64)
 +   comm.Send([data,3,MPI.DOUBLE],dest=1,tag=1)
 +if rank == 1:
 +   info = MPI.Status()
 +   comm.Probe(MPI.ANY_SOURCE,MPI.ANY_TAG,info)
 +   count = info.Get_elements(MPI.DOUBLE)
 +   data = numpy.empty(count,dtype = numpy.float64)
 +   comm.Recv(data,MPI.ANY_SOURCE,MPI.ANY_TAG,info)
 +   print 'on',myid, 'data: ',data</code>
 +
 +====  Blocking and Nonblocking Communications  ====
 +
 +MPI offers two basic ways for communication, both of which can be used in mpi4py.
 +
 +1. //Blocking Communication//, in which a call to a communication request returns, when the memory location, which contains the data to be communicated, can safely be used for subsequent instructions. For the Send operation this is the case, when all data have been copied from their local memory location to the receive buffer of the receiving task. The Send operation therefore in general is nonlocal, depending on the activities of the receiving tasks. The Recv call will return, when a message with the given attributes has been copied to its memory location. Also Recv is nonlocal, because its completion depends on the previous posting of a send operation in a different task. Blocking communication may lead to a [[http://en.wikipedia.org/wiki/Deadlock|deadlock]], if a communication call of one task is not matched with a corresponding call of another task.
 +
 +The mpi4py class ''Comm'' supplies the functions  ''send'' and ''recv'' for generic python data objects and ''Send'' and ''Recv'' for buffer like data objects, as described above.
 +
 +2. //Nonblocking Communication//, in which the communication call returns immediately, allowing to perform other computations not depending on the data of the actual communication. This overlapping of communication and computation can be profitable, if the underlying communication system has its own hardware support. Nonblocking communication on both ends has two stages: Posting the communication call and and subsequently enquiring the status of the communication request in order to establish the completion of the communication.
 +
 +In MPI the names for nonblocking communication calls are derived from the respective names for the blocking calls by prepending an "i" (for immediate). mpi4py provides the functions ''isend'' resp. ''Isend'' and ''irecv'' resp. ''Irecv'' for the nonblocking communication calls. These function calls return an instance of the  mpi4py class ''Request''. The methods ''Wait'' and ''Test'' and variants thereof of the Request instance can be used for enquiring the status of the communication. 
 +
 +The following example shows a nonblocking communication of a NumPy array, in which the two methods ''Wait'' and ''Test'' are used to controll the completion of the ''Irecv'' call.
 +
 +**isend_array.py:**
 +<code>
 +a_size = 10
 +data = (1+rank)*numpy.arange(a_size,dtype=numpy.float64)
 +if rank == 0:
 +    req=comm.Isend([data,a_size,MPI.DOUBLE],1,11)
 +elif rank == 1:
 +    print 'on task',rank,'before recv:   data = ',data
 +    req=comm.Irecv(data,source=0,tag=11)
 +    re = False
 +    while re == False :
 +       re=MPI.Request.Test(req)
 +    print 'test result',re
 +    re=MPI.Request.Wait(req)
 +    print 'wait result',re
 +    print 'on task',rank,'after recv:    data = ',data</code>
 +
 +====  Modes of Communications  ====
 +
 +The blocking send operation can be implemented by directly streaming the message data from memory into the communcation system. In this case the send call will block until a corresponding receive call has been posted and copied the data stream into the memory of the receiving task. This is called the **synchronous** communication mode. 
 +
 +Alternatively, the implementation can copy the message data into a local buffer, before streaming it into the communication system. This **buffered** communication mode allows the send call to return immediatly after completion of the local copy operation, irrespective of the posting of a matching receive call. 
 +
 +A third possibility is an implementation, in which the send call can be started only, if a matching receive call already has been posted, such that the overhead for establishing the availability of the receiver may be avoided. In this **ready** communication mode the send operation will return with an error, if a matching receive call has not been not posted previously.
 +
 +The **standard** send operation in MPI is a combination of buffered mode for short and synchronous mode for long messages. The border between the two modes is implementation dependend, portable programs using the standard send should not depend on properties of one of the modes. 
 +
 +MPI provides access to the other three modes in send routines, whose names are prepended by the letters "s", "b", and "r", respectively. The buffer used in the buffered mode bsend call has to be explicitely attached to the MPI environment. The receive call is the same for all modes. The four modes are available also for the nonblocking communucation calls.
 +
 +mpi4py provides in its ''Comm''-class functions with upper case first letter for sending buffer like objects 
 +<code>
 +Send, Ssend, Bsend, Rsend        blocking calls
 +Isend, Issend, Ibsend, Irsend    nonblocking calls</code>
 +
 +and with lower case first letter for sending generic python data objects
 +
 +<code>
 +send, ssend, bsend               blocking calls
 +isend, issend, ibsend            nonblocking calls
 +no ready mode send for generic python data objects</code>
 +
 +Furthermore the mpi4py function ''Attach_buffer(memory)'' provides a buffer in the application's memory space for use in a buffered mode send operation. The size of the  buffer must be large enough to receive the message data and additional information concerning the message (size, destination, identification).
 +
 +The next example shows a buffered send and the syntax to attach the buffer. As can be seen, the size of the buffer is slightly larger than the size of the data to be sent, because the message-data contain in addition to the data proper also information about their size, the destination and the identifier of the message.
 +
 +**bsend_array.py:**
 +<code>
 +a_sz = 1000
 +data = rank*numpy.arange(a_sz,dtype=numpy.int)
 +if rank == 0:
 +    buf = numpy.empty(a_sz+3,dtype = numpy.int)
 +    MPI.Attach_buffer(buf)
 +    comm.Bsend(data,dest=1,tag=11)
 +elif rank == 1:
 +    print 'on task',rank,'before recv:   data = ',data[a_sz-1]
 +    comm.Recv(data,source=0,tag=11)
 +    print 'on task',rank,'after recv:    data = ',data[a_sz-1]</code>
 +
 +====  Avoiding deadlocks by using ''Sendrecv''  ====
 +
 +Exchanging data between neighbours in a ring of tasks can lead to a deadlock, as exemplified in the following program:
 +
 +**exch_deadlock.py:** 
 +<code>
 +a_size = 10
 +send_data=rank*numpy.arange(a_size,dtype=numpy.float64)
 +recv_data=-numpy.empty(a_size,dtype=numpy.float64)
 +ipr = (rank +1)%size
 +ipl = (rank -1)%size
 +comm.Ssend(send_data,dest=ipl)
 +comm.Srecv(recv_data,source=ipr)
 +
 +print 'on task',rank,'data:',send_data[1],recv_data[1]</code>
 +
 +This program will never end, because every tasks posts a call to the synchroneous mode  ''Ssend'', which will not return before the matching receive call is posted, and will never proceed to post the receive call. Using instead the standard mode ''Send'' is not a good solution, because the program will complete or hang depending on the amount of exchanged data. MPI provides a special function combining the send and recv operation, which garantees an execution without deadlock. In the given example the calls to ''Ssend'' and ''Srecv'' should be replaced by the single call
 +
 +<code>
 +comm.Sendrecv(send_data,dest=ipl, \
 +              recvbuf=recv_data,source=ipr)</code>
 +
 +=====  Collective Communications  =====
 +
 +Point to point communication is the basic tool for parallel computing within the messige passing programming model. More general communication patterns involving more than two tasks can be realised by combining several of these basic communication operations. For example synchronising all tasks in a communicator can be achieved by handshakes of one tasks with all other tasks and informing all tasks about the completion of all these handshakes. This is illustrated with the code for the function ''synchron''
 +
 +<code>
 +def synchron():
 +   buf = None
 +   if rank == 0:
 +      for ip in range(1,size):
 +         comm.send(buf,dest=ip)
 +         buf=comm.recv(source=ip)
 +      for ip in range(1,size):
 +         comm.send(buf,dest=ip)
 +   else:
 +     buf=comm.recv(source=0)
 +     comm.send(buf,dest=0)
 +     buf = comm.recv(source=0)</code>
 +
 +This kind of synchronisation can be thought of as a //barrier//, which stops every task until all tasks have arrived. A barrier may be used for example to force the output of several tasks, which without intervention will follow one after the other in a runtime dependent order, into a prescribed seqence: 
 +
 +<code>
 +for ip in  range(0,size):
 +   if rank == ip:
 +      print 'rank',rank,'ready'
 +   synchron()</code>
 +
 +Communication involving all tasks of a communicator is called //collective// within the MPI standard. The MPI standard provides subroutines for barrier synchronisation and for a large number of other frequently used patterns of collective data manipulations.
 +
 +These patterns can be classified according to the kind of data manipulation involved
 +
 +  *  Global Synchronisation, no data are involved
 +  *  Global data communication, data are scattered ore gathered 
 +  *  Global data reduction, data of different tasks are reduced to new data
 +
 +====  Global Synchronisation  ====
 +
 +The only global synchronisation operation in MPI is the barrier.   
 +The syntax for the barrier synchronisation function in the mpi4py  ''Comm'' class equivalent to the ''synchron'' function above is 
 +
 +<code>
 +Barrier().</code>
 +
 +A task posting a call to ''Barrier()'' will halt until all other tasks of the communicator have posted the same call. Therefore in a correct mpi4py program the number of calls to ''Barrier()'' must be equal for all tasks.
 +
 +====  Global Data Communication  ====
 +
 +Data communication between all tasks of a communicator can have two directions:
 +  *  distributing data coming from one tasks to all tasks
 +  *  collecting data coming from all tasks to one  or all tasks
 +
 +For distributing data mpi4py provides functions for
 +  *  broadcasting: ''Bcast, bcast''
 +  *  scattering from one task:''Scatter, Scatterv, scatter''
 +
 +For collecting data mpi4py provides functions for
 +  *  gathering to one task: ''Gather, Gatherv, gather''
 +  *  gathering to all tasks:''Allgather, Allgatherv, allgather''
 +
 +In addition, there are functions, which combine collection and distribution of data:
 +  *  ''Alltoall, Alltoallv, Alltoallw, alltoall''
 +
 +===  broadcast  ===
 +The syntax of the broadcast method is
 +
 +<code>
 +comm.Bcast(buf, int root=0)
 +buf = comm.bcast(obj=None, int root=0)</code>
 +
 +An example for broadcasting an NumPy array is
 +
 +**bcast_array.py:**
 +<code>
 +data = numpy.empty(5,dtype=numpy.float64)
 +if rank == 0:
 +    data = numpy.arange(5,dtype=numpy.float64)
 +comm.Bcast([data,3,MPI.DOUBLE],root=0)
 +print 'on task',rank,'after recv:    data = ',data</code>
 +
 +An example broadcasting a python data object is
 +
 +**bcast.py:**
 +<code>
 +rootdata = None
 +if rank == 0:
 +    rootdata = (1,'a','z',3.14)
 +data = comm.bcast(rootdata,root=0)
 +print 'on task',rank,'after bcast:    data = ',data</code>
 +
 +A difference between the two broadcast functions is, that in ''Bcast'' the data to be broadcasted stay in place in the root task, whereas in ''bcast'', they are copied to the object returned by the call to bcast.
 +
 +===  scatter  ===
 +
 +Whereas a broadcast operation distributes the same object from the root task to all other tasks, a scattering operation sends a different object from root to every task. The syntax of the  methods for  scattering is
 +<code>
 +comm.Scatter(sendbuf, recvbuf, int root=0)
 +comm.Scatterv(sendbuf, recvbuf, int root=0)
 +comm.scatter(sendobj=None, recvobj=None, int root=0)</code>
 +
 +In the upper case function ''Scatter'' the sendbuf and recvbuf arguments must be given in terms of a list (as in the point to point function  ''Send''):
 +
 +<code>
 +buf = [data, data_size, data_type]</code>
 +
 +where ''data'' must be a buffer like object of size ''data_size'' and of type ''data_type''. The size and type descriptions can be omitted, if they are implied by the buffer object, as in the case of a NumPy array. Equally sized sections of the data in sendbuf will be sent from root to the other tasks, the //i//-th section to task //i//, where they will be stored in recvbuf.  The data_size in sendbuf therefore must be a multiple of ''size'', where ''size'' is the number of tasks in the communicator. The data_size in recvbuf must be at least as large as the size of the data section received.
 +
 +In the vector variant of this function,  ''Scatterv'', the size and the location of the sections of senddata to be distributed may be freely chosen. The sendbuf in this case must include a description of the layout of the sections in the form
 +
 +<code>
 +sendfbuf = [data, counts, displacements, type]</code>
 +
 +where ''counts'' and ''displacements'' are integer tuples with as many elements as tasks are in the communicator. ''counts[i]'' designates the size of the //i//-th segment, ''displacements[i]'' the number of the element in ''data'' used as the first element of the //i//-th section.  
 +
 +The calls of ''Scatter'' and ''Scatterv'' are illustrated in the next example
 +
 +**scatter_array,py:**
 +<code>
 +a_size = 4
 +recvdata = numpy.empty(a_size,dtype=numpy.float64)
 +senddata = None
 +if rank == 0:
 +   senddata = numpy.arange(size*a_size,dtype=numpy.float64)
 +comm.Scatter(senddata,recvdata,root=0)
 +print 'on task',rank,'after Scatter:    data = ',recvdata
 +
 +recvdata = numpy.empty(a_size,dtype=numpy.float64)
 +counts = None
 +dspls = None
 +if rank == 0:
 +   senddata = numpy.arange(100,dtype=numpy.float64)
 +   counts=(1,2,3)
 +   dspls=(4,3,10)
 +comm.Scatterv([senddata,counts,dspls,MPI.DOUBLE],recvdata,root=0)
 +print 'on task',rank,'after Scatterv:    data = ',recvdata</code>
 +
 +In the ''scatter'' function the ''sendobj'' to be scattered from the root task must be a sequence of exactly ''size'' objects, where ''size'' is the number of tasks in the communicator. Each element in this sequence can be a data object of any type, and the //i//-th object of the sequence will be sent from root to the //i//-th task and will be received in the //i//-th tasks as the value returned from ''scatter''. This is shown in the following example program
 +
 +**scatter.py:**
 +<code>
 +rootdata = None
 +if rank == 0:
 +    rootdata = [1,2,3,(4,5)]
 +data = comm.scatter(rootdata,root=0)
 +print 'on task',rank,'after bcast:    data = ',data</code>
 +
 +===  gather  ===
 +
 +The gather operations collects data from all tasks and delivers this collection to the root task or to all tasks. The syntax of the gathering methods is
 +
 +<code>
 +comm.Gather(sendbuf, recvbuf, int root=0)
 +comm.Gatherv(sendbuf, recvbuf, int root=0)
 +comm.Allgather(sendbuf, recvbuf)
 +comm.Allgatherv(sendbuf, recvbuf)
 +comm.gather(sendobj=None, recvobj=None, int root=0)
 +comm.allgather(sendobj=None, recvobj=None)</code>
 +
 +Compared to the scatter methods, the roles of sendbuf and recvbuf are exchanged for ''Gather'' and ''Gatherv''. In ''Gather'' sendbuf must contain the same number of elements in all tasks and the recvbuf on root  must contain ''size'' times that number of elements, where ''size'' is the total number of tasks. For ''Gatherv'' the integer tuples counts and displacements characterize the layout of recvbuf, as described in the scatter section. Here sendbuf must contain exactly ''counts[rank]'' elements in task  ''rank''. The following code demonstrates this.
 +
 +**gather_array.py:**
 +<code>
 +a_size = 4
 +recvdata = None
 +senddata = (rank+1)*numpy.arange(a_size,dtype=numpy.float64)
 +if rank == 0:
 +   recvdata = numpy.arange(size*a_size,dtype=numpy.float64)
 +comm.Gather(senddata,recvdata,root=0)
 +print 'on task',rank,'after Gather:    data = ',recvdata 
 +
 +counts=(2,3,4)
 +dspls=(0,3,8)
 +if rank == 0:
 +   recvdata = numpy.empty(12,dtype=numpy.float64)
 +sendbuf = [senddata,counts[rank]]
 +recvbuf = [recvdata,counts,dspls,MPI.DOUBLE]
 +comm.Gatherv(sendbuf,recvbuf,root=0)
 +print 'on task',rank,'after Gatherv:    data = ',recvdata</code>
 + 
 +The lower case function ''gather'' communicates generic python data objects analogous to the ''scatter'' function. An example is
 +
 +**gather.py:**
 +<code>
 +senddata = ['rank',rank]
 +rootdata = comm.gather(senddata,root=0)
 +print 'on task',rank,'after bcast:    data = ',rootdata</code>
 +
 +The "all" variants of the gather methods deliver the collected data not only to the root task, but to all tasks in the communicator. These methods therefore lack the ''root'' parameter.
 +
 +===  alltoall  ===
 +
 +alltoall operations combine gather and scatter. In a communicator with  ''size'' tasks every task has a sendbuf and a recvbuf  with exactly ''size'' data objects. The alltoall operation takes the //i//-th object from the sendbuf of task //j// and copies it into the //j//-th object of the recvbuf of task //i// The operation can be thought of as a transpose of the matrix with tasks as columns and data objects as rows.  The syntax of the alltoall methods is
 +
 +<code>
 +comm.Alltoall(sendbuf, recvbuf)
 +comm.Alltoallv(sendbuf, recvbuf)
 +comm.alltoall(sendobj=None, recvobj=None)</code>
 +
 +The data objects in sendbuf and recvbuf have types depending on the method. In  ''Alltoall'' the data objects are equally sized consecutive sections of buffer like objects. In ''Alltoallv'' they are sections of varying sizes and varying diplacements of buffer like objects. The layout must be described the in counts und diplacements tuples, which have to be defined for sendbuf and recvbuf in every tasks in a way consistent with the intended transposition in the (task,object) matrix. Finally in the lower case ''alltoall'' function the data objects can be of any allowed Python type, provided that the data objects in sendbuf conform to those in recvbuf, which they will replace.
 +
 +An example for ''Alltoall'' is
 +
 +**alltoall_array.py:**
 +<code>
 +a_size = 1
 +senddata = (rank+1)*numpy.arange(size*a_size,dtype=numpy.float64)
 +recvdata = numpy.empty(size*a_size,dtype=numpy.float64)
 +comm.Alltoall(senddata,recvdata)</code>
 +
 +Next an example for ''alltoall'', to be started with two tasks.
 +
 +**alltoall.py:**
 +<code>
 +data0 = ['rank',rank]
 +data1 = [1,'rank',rank]
 +senddata=(data0,data1)
 +print 'on task',rank,'    senddata = ',senddata
 +recvdata = comm.alltoall(senddata)
 +print 'on task',rank,'    recvdata = ',recvdata</code>
 +
 +====  Global Data Reduction  ====
 +
 +The data reduction functions of MPI combine data from all tasks with one of a set of predefined operations and deliver the result of this "reduction" to the root tsks or to all tasks. mpi4py provides the following operations for reduction:
 +
 +<code>
 +MPI.MIN        minimum
 +MPI.MAX        maximum
 +MPI.SUM        sum
 +MPI.PROD       product
 +MPI.LAND       logical and
 +MPI.BAND       bitwise and
 +MPI.LOR        logical or
 +MPI.BOR        bitwise or
 +MPI.LXOR       logical xor
 +MPI.BXOR       bitwise xor
 +MPI.MAXLOC     max value and location
 +MPI.MINLOC     min value and location</code>
 +
 +The reduction operations need data of the appropriate type.
 +
 +===  reduce  ===
 +
 +The syntax for the reduction methods is
 +
 +<code>
 +comm.Reduce(sendbuf, recvbuf, op=MPI.SUM, root=0)
 +comm.Allreduce(sendbuf, recvbuf, op=MPI.SUM)
 +comm.reduce(sendobj=None, recvobj=None, op=MPI.SUM, root=0)
 +comm.allreduce(sendobj=None, recvobj=None, op=MPI.SUM)</code>
 +
 +The followimg example shows the use of ''Reduce'' and ''Allreduce''. sendbuf and recvbuf must be buffer like data objects with the same number of elements of the same type in all tasks. The reduction operation is performed elementwise using the corresponding elements in sendbuf in all tasks. The result is stored into the corresponding element of recvbuf in the root task by  ''Reduce'' and in all tasks by ''Allreduce''. If the op paramter is omitted, the default operation ''MPI.SUM'' is used.
 +
 +**reduce_array.py:**
 +<code>
 +a_size = 3
 +recvdata = numpy.zeros(a_size,dtype=numpy.int)
 +senddata = (rank+1)*numpy.arange(a_size,dtype=numpy.int)
 +comm.Reduce(senddata,recvdata,root=0,op=MPI.PROD)
 +print 'on task',rank,'after Reduce:    data = ',recvdata
 +
 +comm.Allreduce(senddata,recvdata)
 +print 'on task',rank,'after Allreduce:    data = ',recvdata</code>
 +
 +The use of the lower case methods reduce and allreduce operating on generic python  data objects is limited, because the reduction operations are undefined for most of the data objects (like lists, tuples etc.).  
 +
 +===  reduce_scatter  ===
 +
 +The reduce_scatter functions operate elementwise on ''size'' sections of the buffer like data objects sendbuf. The sections must have the equal number of elements in all tasks. The result of the reductions in section //i// is copied to recvbuf in task //i//, which must have an appropriate length. 
 +The syntax for the reduction methods is
 +
 +<code>
 +comm.Reduce_scatter_block(sendbuf, recvbuf, op=MPI.SUM)
 +comm.Reduce_scatter(sendbuf, recvbuf, recvcounts=None, op=MPI.SUM)</code>
 +
 +In ''Reduce_scatter_block'' the number of elements in all sections must be equal and the number of elements in sendbuf must be ''size'' times that number. An example code is the following
 +
 +**reduce_scatter_block:**
 +<code>
 +a_size = 3
 +recvdata = numpy.zeros(a_size,dtype=numpy.int)
 +senddata = (rank+1)*numpy.arange(size*a_size,dtype=numpy.int)
 +print 'on task',rank,'senddata  = ',senddata
 +comm.Reduce_scatter_block(senddata,recvdata,op=MPI.SUM)
 +print 'on task',rank,'recvdata = ',recvdata</code>
 +
 +In ''Reduce_scatter'' the number of elements in the sections can be different. They must be given in the integer tuple recvcounts. The number of elements in sendbuf must be sum of the numbers of elements in the sections. On task //i// recvbuf must have the length of section //i// of sendbuf. The following code gives an example for this. 
 +
 +**reduce_scatter:**
 +<code>
 +recv_size = range(1,size+1)
 +recvdata = numpy.zeros(recv_size[rank],dtype=numpy.int)
 +send_size = 0
 +for i in  range(0,size):
 +   send_size =send_size + recv_size[i]
 +senddata = (rank+1)*numpy.arange(send_size,dtype=numpy.int)
 +print 'on task',rank,'senddata  = ',senddata
 +comm.Reduce_scatter(senddata,recvdata,recv_size,op=MPI.SUM)
 +print 'on task',rank,'recvdata = ',recvdata</code>
 +
 +===  Reduction with MINLOC and MAXLOC  ===
 +
 +The reduction operations MINLOC and MAXLOC differ from all others: they return two results, the minimum resp. maximum of the values in the different tasks and the rank of a task, which holds the extreme value. mpi4py  provides the two operations only for the lower case ''reduce'' and ''allreduce'' mehods for comparing a single  numerical data object in every task. An example is given in
 +
 +**reduce_minloc.py:**  
 +<code>
 +inp = numpy.random.rand(size)
 +senddata = inp[rank]
 +recvdata=comm.reduce(senddata,None,root=0,op=MPI.MINLOC)
 +print 'on task',rank,'reduce:    ',senddata,recvdata 
 +
 +recvdata=comm.allreduce(senddata,None,op=MPI.MINLOC)
 +print 'on task',rank,'allreduce: ',senddata,recvdata</code>
 +
 +=====  Code Examples  =====
 +
 +The python codes for all examples described in this tutorial are available from [[http://wwwuser.gwdg.de/~ohaan/mpi4py_examples/]]
 +
 +[[Kategorie: Scientific Computing]]
  
wiki/hpc/mpi4py.txt · Last modified: 2021/04/13 15:41 by 127.0.0.1

Donate Powered by PHP Valid HTML5 Valid CSS Driven by DokuWiki