子进程Popen与C程序通信时出现无效参数/管道破损问题

4 投票
1 回答
2170 浏览
提问于 2025-04-16 11:42

我有这段代码

所有需要的库都已经导入了

class VERTEX(Structure):
 _fields_ = [("index", c_int),
            ("x", c_float),
            ("y", c_float)]

其他内容

这段代码是从一个顶点列表创建一个数组

def writelist_buf(size, nomeID): 
 Nvert_VERTEX_Array_Type = VERTEX * len(bpy.data.objects[nomeID].data.vertices)
 passarr = Nvert_VERTEX_Array_Type()
 for i in range(len(passarr)):
  vert = bpy.data.objects[nomeID].data.vertices[i]
  passarr[i] = VERTEX(vert.index, vert.co[0], vert.co[1])
 return passarr

bpy.data.objects[nomeID].data.vertices 是一个顶点的列表。

其他内容

这段代码在一个函数里面,它把之前的数组传给一个C程序

input = writelist_buf(size, nomeID)
c_program_and_args = "here is the program with his arguments(it works)"
cproc = Popen(c_program_and_args, stdin=PIPE, stdout=PIPE)
out, err = cproc.communicate(input)
#the program returns 2 integers separed by a space
return [int(i) for i in out.decode.split()]

在调用writelist之前,size和nomeID已经声明了。

经过一番“调试”,我发现通过writelist_buf传递的类型是“合法的”(它是字节,因为是用c_types创建的数组),但是我一直收到Errno32 Broken Pipe或Errno22 Invalid argument的错误……这个C程序只是从标准输入读取所有顶点(像下面的C代码一样)……

奇怪的是,在“整合”到我正在工作的代码之前,我尝试了一个更简单的代码:就是这段,它能正常工作!

from subprocess import Popen, PIPE
from ctypes import *

class VERTEX(Structure):
 _fields_ = [("index", c_int),
            ("x", c_float),
            ("y", c_float)]

nverts = 5
vlist = [VERTEX(0,1,1), VERTEX(1,2,2), VERTEX(2,3,3), VERTEX(3,4,4), VERTEX(4,5,5)]
array = VERTEX * nverts
input = array()
for i in range(nverts):
 input[i] = vlist[i]
print(type(input))
cproc = Popen("pipeinout.exe random arg", stdin=PIPE, stdout=PIPE)
out, err = cproc.communicate(input)
print(out.decode())

还有这段C代码

#include<stdio.h>
#include<stdlib.h>
typedef struct {
    int index;
    float x;
    float y;
} vertex;

int main(int argc, char* argv[]) {
    int n=5;
    int i;
    printf("%s",argv[1]);
    vertex* VV;
    VV=(vertex*)malloc(sizeof(vertex)*n);
    fread(VV,sizeof(vertex),n,stdin);
    //fread(&VV,sizeof(VV),1,stdin);//metti nel valore di VV(non a quello che punta) l'indirizzo passato||sizeof(VV) is the size of a pointer
    for(i=0;i<n;i++)
        printf(" %i , %f , %f\n",VV[i].index,VV[i].x,VV[i].y);
}

1 个回答

2
Thread(target=write, args=[p.stdin, pack(vertices, n)]).start()

从你的评论来看,我明白你需要把几百万个项目多次传递给一个C程序。下面这种方法(使用子进程的管道输入)在你的情况下可能会太慢。可以考虑的替代方案是写一个C扩展(比如使用Cython)或者直接用ctypes调用C函数。你可以单独提问,详细描述你的使用场景,看看哪种方法更合适。

如果你已经选择了一种方法,确保在进行任何优化之前,它是正确的(写一些测试,测量性能,只有在必要时才进行优化)—— 先让它工作,再让它正确,最后让它快

另一方面,投入太多时间在那些最终会被抛弃的方法上是没有意义的—— 快速失败

如果C程序的输出是有限的,你的代码中的.communicate()方法是有效的(来源):

import struct, sys    
from subprocess import Popen, PIPE

vertex_struct = struct.Struct('i f f')

def pack(vertices, n):    
    yield struct.pack('i', n)
    for v in vertices:
        yield vertex_struct.pack(*v)

def main():
    try: n = int(sys.argv[1])
    except IndexError:
        n = 100
    vertices = ((i,i+1,i+2) for i in range(n))

    p = Popen(["./echo_vertices", "random", "arg"], stdin=PIPE, stdout=PIPE)
    out, _ = p.communicate(b''.join(pack(vertices, n)))

    index, x, y = vertex_struct.unpack(out)
    assert index == (n-1) and int(x) == n and int(y) == (n+1)

if __name__ == '__main__':
    main()

这是来自问题评论的代码。在我的机器上,对于大的n值,它没有错误地工作:

import struct, sys
from subprocess import Popen, PIPE
from threading import Thread

def pack(vertices, n):
    yield struct.pack('i', n)
    s = struct.Struct('i f f')
    for v in vertices:
        yield s.pack(*v)

def write(output_file, chunks):
    for chunk in chunks:
        output_file.write(chunk)
    output_file.close()

def main():
    try: n = int(sys.argv[1])
    except IndexError:
        n = 100
    vertices = ((i,i+1,i+2) for i in range(n))

    p = Popen(["./echo_vertices", "random", "arg"], stdin=PIPE, stdout=PIPE)

    Thread(target=write, args=[p.stdin, pack(vertices, n)]).start()

    for line in iter(p.stdout.readline, b''):
        pass
    p.stdout.close()
    sys.stdout.buffer.write(line)
    p.wait()

if __name__ == '__main__':
    main()

问答

问:我不太理解pack函数(我知道yield返回一个只能迭代一次的可迭代对象,但在你的代码中你用了两个yield,所以我不明白它返回什么)。

pack()是一个生成器。生成器的工作方式并不是你描述的那样,例如:

>>> def f():
...     yield 1
...     yield 2
... 
>>> for i in f():
...     print(i)
...     
1
2

注意每个yield都会产生一个值。

>>> def g(n):
...     for i in range(n):
...         yield i
... 
>>> list(g(10))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

这里yield在文本中只出现一次,但它执行了10次,每次都会产生一个值(在这个例子中是一个整数)。可以查看Python教程中的生成器“系统程序员的生成器技巧”包含了从简单到高级使用生成器的多个例子。


问:另外,我不知道第10行的(*v)是什么意思。

s.pack(*v)使用参数解包调用pack方法:

>>> def h(a, b):
...     print(a, b)
... 
>>> h(*[1, 'a'])
1 a
>>> h(*range(2))
0 1
>>> h(0, 1)
0 1

问:我不明白第25行的线程是怎么工作的。

这一行启动了一个新线程,调用write()函数,使用args关键字参数中的参数,即output_file=p.stdinchunks=pack(vertices, n)。在这种情况下,write()函数等同于:

p.stdin.write(struct.pack('i', n))
p.stdin.write(s.pack(0, 1, 2))
p.stdin.write(s.pack(1, 2, 3))
...
p.stdin.write(s.pack(n-1, n, n+1))
p.stdin.close()

之后线程就退出了。


问:...程序的所有输出都没有存储在变量中,对吗?

整个输出没有存储在任何地方。代码:

for line in iter(p.stdout.readline, b''):
    pass

逐行读取p.stdout,直到.readline()返回空字符串b'',并将当前行存储在line变量中(见iter()文档)。所以:

sys.stdout.buffer.write(line)

只是打印输出的最后一行。


问:1) 启动线程后,python脚本会等到它完成,对吗?

不,主线程会退出。启动的线程不是守护线程。它会一直运行直到完成,也就是说,脚本(程序)在完成之前不会退出。


问:2) 我明白了你是如何从C程序的stdout读取的,但我不明白你什么时候启动它。根据我的理解,使用write函数我们将想要的数据写入一个缓冲区(或者类似于RAM中的文件),当我们运行C程序时,它可以从中读取我们写入的数据。但是在你的代码中,我们什么时候启动C程序? :)

C程序是通过p = Popen(...)启动的。

p.stdin.write()写入C程序的stdin(中间有几个缓冲区,但我们暂时可以不考虑)。这个过程和以下内容是一样的:

$ echo abc | some_program

问:3)最后一个问题:为什么要在p上使用wait?有一个警告http://docs.python.org/library/subprocess.html?#subprocess.Popen.wait

对于提供的C代码,实际上不需要在单独的线程中写入p.stdin。我使用线程正是为了避免警告中描述的情况,即C程序在脚本完成写入其stdin之前产生了足够的输出(你的C代码在完成读取之前不会写任何东西,所以这个线程并不是必要的)。

换句话说,在这种情况下p.wait()是安全的。

如果没有p.wait(),C程序的标准错误输出可能会丢失。不过我只能在jython上重现标准错误输出丢失的情况,使用这些脚本。不过对于提供的C代码来说,这并不重要,因为它没有写入任何标准错误输出。

撰写回答