RxPy:在(慢)扫描执行之间对热观察进行排序

2024-06-11 09:49:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在寻求帮助来实现下面的大理石图。其目的是尽可能对未排序的值进行排序,而无需在扫描执行之间等待时间。在

我并不是要求全面实施。欢迎任何指导。 not consumed min marble diagram 我有一个异步慢速扫描(出于测试目的而强制进行)无限热观测。以下是相关代码:

thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()

external_obs \
    .flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
    .scan(seed=State(0, None), accumulator=slow_scan_msg) \
    .subscribe(log, print, lambda: print("SLOW FINISHED"))

external_obs.connect()
thread.start()

def slow_scan_msg(state, msg):
    sleep(0.4)
    return state \
        ._replace(count = state.count + 1) \
        ._replace(last_msg = msg)

这是完整版本:https://pyfiddle.io/fiddle/781a9b29-c541-4cd2-88ba-ef90610f5dbd

这是当前输出(值是随机生成的):

^{pr2}$

我想在扫描执行之间对挂起的消息进行排序。因此,第一条发出的消息将始终是第一条被消费的消息,但下一条被消费的消息将是在此之前已发出和未被消费的消息的最小值(值)(所有这些消息都是在当前版本中,因为是即时发出的)。以此类推。。。我认为大理石图比我的解释好。在

请注意,扫描不是在等待完成事件,它在最后一条消息发出后没有启动的唯一原因是睡眠。Here you have another version其中睡眠已从扫描中删除并置于ExternalDummyService中。您可以看到这些值在发出时就被消耗掉了。这也显示在大理石图中。在

我尝试使用to_sorted_list,这是我在RxPy中找到的唯一的排序方法,但我无法使其工作。在

我要找的是这样的东西:

external_obs \
    .flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
############ buffered_sort() does not exist
    .buffered_sort(lambda msg: msg.timestamp) \
############
    .scan(seed=State("SLOW", 0, None), accumulator=slow_scan_msg) \
    .subscribe(log, print, lambda: print("SLOW FINISHED"))

谢谢


Tags: lambda消息scan排序msgsubscribethreadexternal
1条回答
网友
1楼 · 发布于 2024-06-11 09:49:49

如果要使用to_sorted_list,则需要重新映射在单个元素中获得的列表。将main函数更改为:

def main():
    thread_1_scheduler = ThreadPoolScheduler(1)

    thread = ExternalDummyService()
    external_obs = thread.subject.publish()

    external_obs \
        .flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
        .to_sorted_list(key_selector=lambda msg: msg.timestamp) \
        .flat_map(lambda msglist: Observable.from_iterable(msglist)) \
        .scan(seed=State(0, None), accumulator=slow_scan_msg) \
        .subscribe(log, print, lambda: print("SLOW FINISHED"))

    external_obs.connect()

    thread.start()

给出:

^{pr2}$

注意,to_sorted_list方法将等待主题流的结尾开始扫描,因此您不能使用它来实现问题中所示的大理石图。在

为了正确地实现它,我认为您需要类似于^{}的东西,它是在RxJava中实现的,而不是在RxPy中实现的。在

这并不能完全解决这个问题,因为缓冲区是FIFO(先进先出),而您需要一种自定义的方法来选择哪个消息先出。这可能需要调整一下如何处理对缓冲区的请求。在

您可能会找到一个更好的方法来实现一个名为rxbackpressure的RxPy扩展,特别是它的类dequeuablebuffer.py,您可以根据自己的需要进行调整。在

相关问题 更多 >