如何用autobahn asyncio实现交互式websocket客户端?

1 投票
2 回答
1875 浏览
提问于 2025-05-01 05:38

我正在尝试使用autobahn|python和asyncio来实现一个websocket/wamp客户端,虽然现在有些功能可以用,但还有一些地方我搞不清楚。

我真正想做的是在qt5/QML中实现WAMP,但目前看来这条路似乎更简单一些。

这个简化的客户端大部分是从网上复制过来的,基本上是可以工作的。当onJoin事件发生时,它会读取时间服务。

我想要做的是从外部触发这个读取操作。

我采取的复杂方法是把asyncio事件循环放在一个线程中,然后通过一个socket发送命令来触发读取。目前我还没搞清楚应该把例程/协程放在哪里,以便从读取例程中找到它。

我怀疑还有更简单的方法,但我还没有找到。欢迎提出建议。

#!/usr/bin/python3
try:
    import asyncio
except ImportError:
    ## Trollius >= 0.3 was renamed
    import trollius as asyncio

from autobahn.asyncio import wamp, websocket
import threading
import time
from socket import socketpair

rsock, wsock = socketpair()

def reader() :
    data = rsock.recv(100)
    print("Received:", data.decode())

class MyFrontendComponent(wamp.ApplicationSession):
    def onConnect(self):
        self.join(u"realm1")



    @asyncio.coroutine
    def onJoin(self, details):
        print('joined')
        ## call a remote procedure
        ##
        try:
           now = yield from self.call(u'com.timeservice.now')
        except Exception as e:
           print("Error: {}".format(e))
        else:
           print("Current time from time service: {}".format(now))



    def onLeave(self, details):
        self.disconnect()

    def onDisconnect(self):
        asyncio.get_event_loop().stop()



def start_aloop() :
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    transport_factory = websocket.WampWebSocketClientFactory(session_factory,
                    debug = False,
                    debug_wamp = False)
    coro = loop.create_connection(transport_factory, '127.0.0.1', 8080)
    loop.add_reader(rsock,reader)
    loop.run_until_complete(coro)
    loop.run_forever()
    loop.close()

if __name__ == '__main__':
    session_factory = wamp.ApplicationSessionFactory()
    session_factory.session = MyFrontendComponent

    ## 4) now enter the asyncio event loop
    print('starting thread')
    thread = threading.Thread(target=start_aloop)
    thread.start()
    time.sleep(5)
    print("IN MAIN")
    # emulate an outside call
    wsock.send(b'a byte string')
暂无标签

2 个回答

0

谢谢你的回复,dano。虽然这不是我需要的解决方案,但它让我找到了正确的方向。是的,我希望客户端能够通过外部触发来进行远程RPC调用。

我想出了以下方法,这样我可以传递一个字符串来进行特定的调用(不过现在只实现了一个)。

这是我想到的,虽然我不太确定这个方法是否优雅。

import asyncio
from autobahn.asyncio import wamp, websocket
import threading
import time
import socket


rsock, wsock = socket.socketpair()

class MyFrontendComponent(wamp.ApplicationSession):
    def onConnect(self):
        self.join(u"realm1")

    @asyncio.coroutine
    def setup_socket(self):
        # Create a non-blocking socket
        self.sock = rsock
        self.sock.setblocking(0)
        loop = asyncio.get_event_loop()
        # Wait for connections to come in. When one arrives,
        # call the time service and disconnect immediately.
        while True:
            rcmd = yield from loop.sock_recv(rsock,80)
            yield from self.call_service(rcmd.decode())

    @asyncio.coroutine
    def onJoin(self, details):
        # Setup our socket server
        asyncio.async(self.setup_socket())


    @asyncio.coroutine
    def call_service(self,rcmd):
        print(rcmd)
        try:
           now = yield from self.call(rcmd)
        except Exception as e:
           print("Error: {}".format(e))
        else:
           print("Current time from time service: {}".format(now))



    def onLeave(self, details):
        self.disconnect()

    def onDisconnect(self):
        asyncio.get_event_loop().stop()



def start_aloop() :
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    transport_factory = websocket.WampWebSocketClientFactory(session_factory,
                    debug = False,
                    debug_wamp = False)
    coro = loop.create_connection(transport_factory, '127.0.0.1', 8080)
    loop.run_until_complete(coro)
    loop.run_forever()
    loop.close()

if __name__ == '__main__':
    session_factory = wamp.ApplicationSessionFactory()
    session_factory.session = MyFrontendComponent

    ## 4) now enter the asyncio event loop
    print('starting thread')
    thread = threading.Thread(target=start_aloop)
    thread.start()
    time.sleep(5)
    wsock.send(b'com.timeservice.now')
    time.sleep(5)
    wsock.send(b'com.timeservice.now')
    time.sleep(5)
    wsock.send(b'com.timeservice.now')
0

你可以在事件循环中异步地监听一个套接字,使用 loop.sock_accept。你只需要在 onConnectonJoin 里面调用一个协程来设置这个套接字:

try:
    import asyncio
except ImportError:
    ## Trollius >= 0.3 was renamed
    import trollius as asyncio

from autobahn.asyncio import wamp, websocket
import socket

class MyFrontendComponent(wamp.ApplicationSession):
    def onConnect(self):
        self.join(u"realm1")

    @asyncio.coroutine
    def setup_socket(self):
        # Create a non-blocking socket
        self.sock = socket.socket()
        self.sock.setblocking(0)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(('localhost', 8889))
        self.sock.listen(5)
        loop = asyncio.get_event_loop()
        # Wait for connections to come in. When one arrives,
        # call the time service and disconnect immediately.
        while True:
            conn, address = yield from loop.sock_accept(self.sock)
            yield from self.call_timeservice()
            conn.close()

    @asyncio.coroutine
    def onJoin(self, details):
        print('joined')
        # Setup our socket server
        asyncio.async(self.setup_socket())

        ## call a remote procedure
        ##
        yield from self.call_timeservice()

    @asyncio.coroutine
    def call_timeservice(self):
        try:
           now = yield from self.call(u'com.timeservice.now')
        except Exception as e:
           print("Error: {}".format(e))
        else:
           print("Current time from time service: {}".format(now))

    ... # The rest is the same

撰写回答