混合发电机协同程序的消费/生产行为是否一定是个坏主意?

2024-04-29 20:49:43 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用基于生成器的协同程序编写一个数据处理代码,其中数据不断地从传感器或文件(源)读取,然后使用一个或多个函数(过滤器)进行转换,最后写入某个位置(接收器)。下面是使用一些虚拟函数的示例:

def coroutine(func):
    def primed(*args, **kwargs):
        coro = func(*args, **kwargs)
        next(coro)
        return coro
    return primed

def source(name):
    for x in range(1,4):
        yield f'{name}{x}'

config = {
    'filter1': '_F1',
    'filter2': '_F2',
    'filter3': '_F3',
}

@coroutine
def filter1(**config):
    """Coroutine that consumes AND produces"""
    data = ''
    while True:
        data = yield data + config['filter1']

@coroutine
def filter2(**config):
    """Coroutine that consumes AND produces"""
    data = ''
    while True:
        data = yield data + config['filter2']

@coroutine
def filter3(**config):
    """Coroutine that consumes AND produces"""
    data = ''
    while True:
        data = yield data + config['filter3']

@coroutine
def writer(where):
    while True:
        data = yield
        print(f'writing to {where}: {data}')

def pipeline(source, transforms, sinks):
    for data in source:
        for f in transforms:
            transformed = f(**config).send(data)
        for sink in sinks:
            sink.send(transformed)


pipeline(source('data'), 
         transforms=[
             filter1,
             filter2,
             filter3,
         ], 
         sinks=[
             writer('console'),
             writer('file'),
         ])

通常不建议在协同过程中混合消费者/生产者行为(参见here)。但是,这种方法允许我编写pipeline函数,而无需对单个转换(filter)函数进行硬编码。如果我不得不坚持团队的“消费”行为,这就是我能想到的:

def coroutine(func):
    def primed(*args, **kwargs):
        coro = func(*args, **kwargs)
        next(coro)
        return coro
    return primed

def source(name, target):
    for x in range(1,4):
        target.send(f'{name}{x}')

config = {
    'filter1': '_F1',
    'filter2': '_F2',
    'filter3': '_F3',
}

@coroutine
def filter1(target, **config):
    """This coroutine only consumes"""
    while True:
        data = yield 
        target.send(data + config['filter1'])

@coroutine
def filter2(target, **config):
    """This coroutine only consumes"""
    while True:
        data = yield 
        target.send(data + config['filter2'])

@coroutine
def filter3(target, **config):
    """This coroutine only consumes"""
    while True:
        data = yield 
        target.send(data + config['filter3'])

@coroutine
def writer(where):
    while True:
        data = yield
        print(f'writing to {where}: {data}')

def pipeline():
    f3 = filter3(writer('console'), **config)
    f2 = filter2(f3, **config)        
    f1 = filter1(f2, **config)

    source('data', f1)

pipeline()

现在,我的问题是:第一个实现是否是一个坏主意,如果是的话,会出什么问题?我喜欢它胜过第二种方法,尽管我知道我混合了生成器/协同程序行为


Tags: sendconfigtruesourcetargetdatadefcoro