使用不阻塞地运行多个应用程序会话autbahn.asyncio.wamp

2024-06-10 04:33:17 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图同时在python中运行两个autobahn.asyncio.wamp.ApplicationSession。以前,我使用了对autobahn库的修改,如this post's answer中所建议的那样。我现在 需要更专业的解决方案。在

在搜索了一段时间后,this post appeared quite promising,但是使用了twisted库,而不是{}。我无法为autobahn库的asyncio分支找到类似的解决方案,因为它似乎没有使用Reactors。在

我的主要问题是ApplicationRunner.run()是阻塞的(这就是我之前将它外包给线程的原因),所以我不能在它之后运行第二个ApplicationRunner。在

我确实需要同时访问2个websocket频道,这似乎不能用一个ApplicationSession来完成。在

目前我的代码:

from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.wamp import ApplicationRunner
from asyncio import coroutine
import time


channel1 = 'BTC_LTC'
channel2 = 'BTC_XMR'

class LTCComponent(ApplicationSession):

    def onConnect(self):
        self.join(self.config.realm)

    @coroutine
    def onJoin(self, details):
        def onTicker(*args, **kwargs):
            print('LTCComponent', args, kwargs)

        try:
            yield from self.subscribe(onTicker, channel1)
        except Exception as e:
            print("Could not subscribe to topic:", e)

class XMRComponent(ApplicationSession):

    def onConnect(self):
        self.join(self.config.realm)

    @coroutine
    def onJoin(self, details):
        def onTicker(*args, **kwargs):
            print('XMRComponent', args, kwargs)

        try:
            yield from self.subscribe(onTicker, channel2)
        except Exception as e:
            print("Could not subscribe to topic:", e)

def main():
    runner = ApplicationRunner("wss://api.poloniex.com:443", "realm1", extra={})
    runner.run(LTCComponent)
    runner.run(XMRComponent) # <- is not being called


if __name__ == "__main__":

    try:
        main()
    except keyboardInterrupt:
        quit()

    except Exception as e:
        print(time.time(), e)

我对autobahn库的了解是有限的,恐怕文档并不能改善我的状况。我是不是在看什么?一个函数,一个参数,它能让我组合我的组件或者同时运行它们?在

也许是一个类似于provided here的解决方案,它实现了一个可选的ApplicationRunner?在


相关主题

Running two ApplicationSessions in twisted

Running Autobahn ApplicationRunner in Thread

Autobahn.wamp.ApplicationSession Source

Autobahn.wamp.Applicationrunner Source


根据要求,使用multithreading代码从@stovfl的答案进行回溯:

^{pr2}$

Tags: fromimportselfasynciodefargssubscribekwargs
2条回答

正如我从traceback中看到的,我们只到达第2步,共4步

From the asyncio docs:
This module provides infrastructure for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources

因此,我放弃了我的第一个建议使用multithreading
我可以想象以下三种选择:

  1. multiprocessing代替multithreading
  2. coroutineasyncio loop内完成
  3. def onJoin(self, details)中的channels之间切换

第二个建议,第一个选项使用multiprocessing
我可以启动两个asyncio loops,所以appRunner.run(...)应该可以工作。在

如果channel是唯一不同的,那么可以使用一个class ApplicationSession。 如果需要传递不同的class ApplicationSession,请将其添加到args=

class __ApplicationSession(ApplicationSession):
        # ...
        try:
            yield from self.subscribe(onTicker, self.config.extra['channel'])
        except Exception as e:
            # ...

import multiprocessing as mp
import time

def ApplicationRunner_process(realm, channel):
        appRunner = ApplicationRunner("wss://api.poloniex.com:443", realm, extra={'channel': channel})
        appRunner.run(__ApplicationSession)

if __name__ == "__main__":
    AppRun = [{'process':None, 'channel':'BTC_LTC'},
              {'process': None, 'channel': 'BTC_XMR'}]

    for app in AppRun:
        app['process'] =  mp.Process(target = ApplicationRunner_process, args = ('realm1', app['channel'] ))
        app['process'].start()
        time.sleep(0.1)

    AppRun[0]['process'].join()
    AppRun[1]['process'].join()

按照这个方法you linked for twisted我设法用asyncio settingstart_loop=False获得相同的行为

import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

runner1 = ApplicationRunner(url, realm, extra={'cli_id': 1})
coro1 = runner1.run(MyApplicationSession, start_loop=False)

runner2 = ApplicationRunner(url, realm, extra={'cli_id': 2})
coro2 = runner2.run(MyApplicationSession, start_loop=False)

asyncio.get_event_loop().run_until_complete(coro1)
asyncio.get_event_loop().run_until_complete(coro2)
asyncio.get_event_loop().run_forever()

class MyApplicationSession(ApplicationSession):

    def __init__(self, cfg):
        super().__init__(cfg)
        self.cli_id = cfg.extra['cli_id']

   def onJoin(self, details):
        print("session attached", self.cli_id)

相关问题 更多 >