如何实现双向jsonrpc + twisted服务器/客户端

1 投票
1 回答
2108 浏览
提问于 2025-04-16 08:14

你好,我正在开发一个基于Twisted的RPC服务器,目的是为多个微控制器提供服务,这些微控制器会向Twisted的JSON-RPC服务器发起调用。不过,这个应用还要求服务器能够随时向每个微控制器发送信息。因此,我的问题是,怎样才能避免微控制器的远程JSON-RPC调用的响应和服务器为用户发出的JSON-RPC请求混淆在一起。

现在我遇到的后果是,微控制器收到错误的信息,因为它们无法判断从网络连接中收到的字符串是它们之前请求的响应,还是服务器发来的新请求。

以下是我的代码:

from twisted.internet import reactor
from txjsonrpc.netstring import jsonrpc
import weakref

creds  = {'user1':'pass1','user2':'pass2','user3':'pass3'}

class arduinoRPC(jsonrpc.JSONRPC):
    def connectionMade(self):
        pass

    def jsonrpc_identify(self,username,password,mac):
        """ Each client must be authenticated just after to be connected calling this rpc """
        if creds.has_key(username):
            if creds[username] == password:
                authenticated = True
            else:
                authenticated = False
        else:
            authenticated = False

        if authenticated:
            self.factory.clients.append(self)
            self.factory.references[mac] = weakref.ref(self)
            return {'results':'Authenticated as %s'%username,'error':None}
        else:
            self.transport.loseConnection()

    def jsonrpc_sync_acq(self,data,f):
        """Save into django table data acquired from sensors and send ack to gateway"""
        if not (self in self.factory.clients):
            self.transport.loseConnection()
        print f
        return {'results':'synced %s records'%len(data),'error':'null'}

    def connectionLost(self, reason):
        """ mac address is searched and all reference to self.factory.clientes are erased """  
        for mac in self.factory.references.keys():
            if self.factory.references[mac]() == self:
                print 'Connection closed - Mac address: %s'%mac
                del self.factory.references[mac]
                self.factory.clients.remove(self)


class rpcfactory(jsonrpc.RPCFactory):
    protocol = arduinoRPC
    def __init__(self, maxLength=1024):
        self.maxLength = maxLength
        self.subHandlers = {}
        self.clients    =   []
        self.references =   {}

""" Asynchronous remote calling to micros, simulating random calling from server """
import threading,time,random,netstring,json
class asyncGatewayCalls(threading.Thread):
    def __init__(self,rpcfactory):
        threading.Thread.__init__(self)
        self.rpcfactory =   rpcfactory
        """identifiers of each micro/client connected"""
        self.remoteMacList    =   ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90']
    def run(self):
        while True:
            time.sleep(10)
            while True:
                """ call to any of three potential micros connected """ 
                mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))]
                if self.rpcfactory.references.has_key(mac):
                    print 'Calling %s'%mac
                    proto   =   self.rpcfactory.references[mac]()
                    """ requesting echo from selected micro"""
                    dataToSend  = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']}))
                    proto.transport.write(dataToSend)
                    break

factory = rpcfactory(arduinoRPC)

"""start thread caller""" 
r=asyncGatewayCalls(factory)
r.start()

reactor.listenTCP(7080, factory)
print "Micros remote RPC server started"
reactor.run()

1 个回答

2

你需要给每条消息添加足够的信息,这样接收者才能理解该如何处理它。你的需求听起来和AMP的要求很相似,所以你可以选择使用AMP,或者按照AMP的结构来识别你的消息。具体来说:

  • 在请求中,放一个特定的键,比如AMP使用“_ask”来标识请求。它还会给这些请求一个独特的值,这个值在连接的整个生命周期内进一步标识该请求。
  • 在响应中,放一个不同的键,比如AMP使用“_answer”来表示响应。这个值和请求中“_ask”键的值是匹配的。

使用这样的方式,你只需要查看是否有“_ask”键或“_answer”键,就能判断你是收到了新的请求还是之前请求的响应。

另外,你的asyncGatewayCalls类不应该基于线程。没有明显的理由让它使用线程,而且这样做会错误地使用Twisted的API,导致一些不可预知的行为。大多数Twisted的API只能在你调用reactor.run的那个线程中使用。唯一的例外是reactor.callFromThread,你可以用它从任何其他线程向反应器线程发送消息。不过,asyncGatewayCalls试图写入一个传输,这会导致缓冲区损坏或者数据发送时出现任意延迟,甚至可能引发更严重的问题。相反,你可以这样编写asyncGatewayCalls

from twisted.internet.task import LoopingCall

class asyncGatewayCalls(object):
    def __init__(self, rpcfactory):
        self.rpcfactory = rpcfactory
        self.remoteMacList = [...]

    def run():
        self._call = LoopingCall(self._pokeMicro)
        return self._call.start(10)

    def _pokeMicro(self):
        while True:
            mac = self.remoteMacList[...]
            if mac in self.rpcfactory.references:
                proto = ...
                dataToSend = ...
                proto.transport.write(dataToSend)
                break

factory = ...
r = asyncGatewayCalls(factory)
r.run()

reactor.listenTCP(7080, factory)
reactor.run()

这样你就得到了一个单线程的解决方案,应该和你原本想要的asyncGatewayCalls类的行为相同。不过,它不是在一个线程中循环睡眠来安排调用,而是使用反应器的调度API(通过更高级的LoopingCall类,它可以重复调度调用)来确保_pokeMicro每十秒被调用一次。

撰写回答