mpi4py发送/接收带标签

2024-05-16 14:05:46 发布

您现在位置:Python中文网/ 问答频道 /正文

如何将进程的等级作为标记传递给mpi4py.MPI.COMM_WORLD.Send()函数,并使用mpi4py.MPI.COMM_WORLD.Recv()正确接收它?

我指的是sending and receiving messages between two processes using Send and Recv functions的以下代码示例

#passRandomDraw.py
import numpy
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()

randNum = numpy.zeros(1)

if rank == 1:
    randNum = numpy.random.random_sample(1)
    print "Process", rank, "drew the number", randNum[0]
    comm.Send(randNum, dest=0)

if rank == 0:
    print "Process", rank, "before receiving has the number", randNum[0]
    comm.Recv(randNum, source=1)
    print "Process", rank, "received the number", randNum[0]

我想将发送进程的秩作为标记传递,以便在有多个发送者的情况下接收进程可以识别它。我就是这么做的

#passRandomDraw.py
import numpy
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()

randNum = numpy.zeros(1)
rnk = -1 # EDIT

if rank == 1:
    randNum = numpy.random.random_sample(1)
    print "Process", rank, "drew the number", randNum[0]
    comm.Send(randNum, dest=0, tag=rank) # EDIT

if rank == 0:
    print "Process", rank, "before receiving has the number", randNum[0]
    print "Sender rank:", rnk
    comm.Recv(randNum, 1, rnk) # EDIT
    print "Process", rank, "received the number", randNum[0]
    print "Sender rank:", rnk # EDIT

我希望接收进程的rnk值为1(rank=0),但它仍然是-1。

有人能告诉我我做错了什么吗?谢谢!


Tags: thenumpysendnumberworld进程mpi4pyprocess
2条回答

函数Recv将在变量中存储接收到的消息。您必须提供预期发件人的排名。所以你总是知道发送者是谁。消息传递接口不需要标识某人,该信息始终是系统固有的。

如果您希望同一发件人发送多封邮件,可以使用标记来区分这些邮件。你需要自己提供这些标签,没有自然的方式获得这些。把这些信息贴上标签,编号。

如果您有一个标记,Recv函数将只在接收到具有合适的源标记和标记的消息时返回。这是一个阻塞函数调用。

在您的例子中,tag=-1等于通用常量MPI.ANY_TAG(通过print MPI.ANY_TAG验证),因此Recv将接受任何标记。但它绝不会覆盖它的输入变量rnk。试试rnk = -2 # EDIT你就会明白了。

您可以以不同的方式编写代码,尽管这不会改变底层逻辑(即作为程序员,您必须始终了解发送者),但它只是隐藏它,使其隐式化:

#passRandomDraw.py
import numpy
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()

randNum = numpy.zeros(1)
rnk = -1 # EDIT

if rank == 1:
    randNum = numpy.random.random_sample(1)
    print "Process", rank, "drew the number", randNum[0]
    comm.Send(randNum, dest=0, tag=rank) # EDIT

if rank == 0:
    print "Process", rank, "before receiving has the number", randNum[0]
    print "Sender rank:", rnk
    status = MPI.Status()
    comm.Recv(randNum, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) # EDIT
    rnk = status.Get_source()
    print "Process", rank, "received the number", randNum[0]
    print "Sender rank:", rnk # EDIT

下面的示例演示如何将mpi4py中的sendrecv函数与列组和标记一起使用。同样的方法也适用于SendRecv函数。一个MPI.Status对象用于获取每个接收到的消息的源和标记。当mpi4py文档不足时,查阅examples and tutorials written in C通常会有帮助。

from mpi4py import MPI

def enum(*sequential, **named):
    """Handy way to fake an enumerated type in Python
    http://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python
    """
    enums = dict(zip(sequential, range(len(sequential))), **named)
    return type('Enum', (), enums)

# Define MPI message tags
tags = enum('READY', 'DONE', 'EXIT', 'START')

# Initializations and preliminaries
comm = MPI.COMM_WORLD   # get MPI communicator object
size = comm.Get_size()  # total number of processes
rank = comm.Get_rank()  # rank of this process
name = MPI.Get_processor_name()
status = MPI.Status()   # get MPI status object

if rank == 0:
    # Master process executes code below
    tasks = range(2*size)
    task_index = 0
    num_workers = size - 1
    closed_workers = 0
    print("Master starting with {} workers".format(num_workers))
    while closed_workers < num_workers:
        data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
        source = status.Get_source()
        tag = status.Get_tag()
        if tag == tags.READY:
            # Worker is ready, so send it a task
            if task_index < len(tasks):
                comm.send(tasks[task_index], dest=source, tag=tags.START)
                print("Sending task {} to worker {}".format(task_index, source))
                task_index += 1
            else:
                comm.send(None, dest=source, tag=tags.EXIT)
        elif tag == tags.DONE:
            results = data
            print("Got data from worker {}".format(source))
        elif tag == tags.EXIT:
            print("Worker {} exited.".format(source))
            closed_workers += 1

    print("Master finishing")
else:
    # Worker processes execute code below
    print("I am a worker with rank {} on {}.".format(rank, name))
    while True:
        comm.send(None, dest=0, tag=tags.READY)
        task = comm.recv(source=0, tag=MPI.ANY_SOURCE, status=status)
        tag = status.Get_tag()

        if tag == tags.START:
            # Do the work here
            result = task**2
            comm.send(result, dest=0, tag=tags.DONE)
        elif tag == tags.EXIT:
            break

    comm.send(None, dest=0, tag=tags.EXIT)

相关问题 更多 >