Gstreamer、Python 和 Appsink

4 投票
1 回答
2563 浏览
提问于 2025-04-20 09:06

我设置了一个简单的管道,使用的是Gstreamer 1.0。当我尝试从appsink中获取样本时,代码在“sample = appsink.emit('pull-sample')”这一行卡住了。奇怪的是,如果我把这一行去掉,代码就能正常运行,不断打印“尝试获取样本”。如果我尝试跳过前100个样本或者更改appsink的属性,代码也会卡住。有没有人知道这是怎么回事?

gst-launch-1.0 v4l2src device="/dev/video0" ! videorate ! video/x raw,height=480,width=640,framerate=15/1 !  appsink


def createASink():
    asink = Gst.ElementFactory.make('appsink', 'asink')
    asink.set_property('sync', False)
    asink.set_property('emit-signals', True)
    asink.set_property('drop', True)
    asink.connect('new-sample', new_sample)
    return asink

def new_sample(appsink):
    print "Trying to pull sample"
    sample = appsink.emit('pull-sample')
    return False

1 个回答

1

这其实是一个变通的方法,但它还是能用。如果有人知道更好的解决办法,请告诉我。

我可以用一个文件输出工具把字节数组输出,然后不断地从这个文件里读取数据。我在Python中使用了多进程模块,这样可以同时运行gstreamer和我的消费者。

def consumeStream():
    fp = "gst_data"
    with open(fp, "r+b") as data_file:
        data_file.truncate()
        while True:
            where = data_file.tell()
            line = data_file.readline()
            if not line:
                time.sleep(.05)
                data_file.seek(where)
            else:
                data_file.truncate()
                print "Got data"

撰写回答