将子进程输出传递给多处理函数

2024-04-26 09:18:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试两个并行运行下面的函数,但是其中一个函数需要作为参数传递给另一个

功能1:

def beginRecvTest():

  incoming = Popen("receivetest -f=/pcan33".split(), stdout = PIPE)
  processing = iter(incoming.stdout.readline, "")
  lines = list(processing)
  print processing
  return lines

功能2:

def readByLine():


  i = 0
  while (i < len(lines)):
    system("clear")

    if(lines[i][0].isdigit()):
        line = lines[i].split()
        dictAdd(line)
    else:
        next


    print ; print "-" *80
    for _i in mydict.keys():
        printMsg(mydict, _i)

    print "Keys: ", ;  print mydict.keys()
    print ; print "-" *80
    sleep(0.3)
    i += 1

函数调用:

if __name__ == "__main__":

   process1 = Process(target=beginRecvTest)
   process1.start()
   process2 = Process(target=readByLine, args=(process1,))
   process2.start()
   process1.join()
   process2.join()

不幸的是,我不知道如何将其作为参数传递,process1中的代码似乎挂起在:

lines = list(processing)

出于某种原因,程序需要将已经读入列表的行传递给process2,同时仍在process1中读取和排序。你知道吗

有人能帮忙吗?你知道吗


Tags: 函数功能defstdoutmydictlistsplitlines
1条回答
网友
1楼 · 发布于 2024-04-26 09:18:26

我一直在做别的事情。但很相似。我在别的地方找到了这个,并对它做了一些修改。你知道吗

因此,您将两个队列和许多进程放在第一个队列上,然后填充第二个队列。你知道吗

我很难让作家活着。所以我在第二个队列的末尾加了一个保护,一个无。你知道吗

import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform

def doCalculation(par):
    t = uniform(0,2)
    sleep(t)
    return par * par  # just to simulate some calculation

def feed(queue, parlist):
    for par in parlist:
            queue.put(par)

def calc(queueIn, queueOut):
    while True:
        try:
            par = queueIn.get(block = False)
            print "calculating ", par
            res = doCalculation(par)
            queueOut.put((par,res))
        except:
            break

def write(queue, fname):
    fhandle = open(fname, "w")
    breaks=0
    while True:
        try:
            par, res = queue.get(block = False)
            print "writing ", par
            # if par is None terminate the while loop / then we know the queue is empty for sure
            if par is None:
                break
            print >>fhandle, "TEST", par, res
        except:
            # could terminate the writer process prematurely if the queue gets empty in between
            sleep(0.01)
    fhandle.close()

if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    print nthreads
    fname = "foo"

    workerQueue = Queue() 
    writerQueue = Queue()

    parlist = range(100) #[1,2,3,4,5,6,7,8,9,10]
    # fill the workerQueue (1 Processes)
    feedProc = Process(target = feed , args = (workerQueue, parlist))
    # start working Processes (nthreads) that work on workerQueue and send to writerQueue
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
    # work on writerQueue (1 Processes)
    writProc = Process(target = write, args = (writerQueue, fname))

    feedProc.start()
    for p in calcProc:
        p.start()
    writProc.start()

    # Join Subprocesses with the Main one again
    feedProc.join ()
    for p in calcProc:
        p.join()       

    # terminate writer queue manually 
    # add empty element to the queue
    writerQueue.put( (None,None) )

    # terminate now or close if you continue after this!

我想你可以根据自己的需要调整一下。你知道吗

致以最良好的祝愿

相关问题 更多 >