txmsgppackrpc是一个支持msgpack rpc的twisted库。
txmsgpackrpc的Python项目详细描述
有关最新的源代码,请参见http://github.com/jakm/txmsgpackrpc
txmsgpackrpc是用于编写异步的库 msgpack-rpc python中的服务器和客户机,使用Twisted framework。库基于 txMsgpack,但有些 进行了改进和修复。
功能
- 用户友好的API
- 模块化对象模型
- 工作超时和重新连接
- 连接池支持
- TCP、SSL、UDP和Unix套接字
python 3注释
要在python 3中使用unix套接字,请使用twisted framework 15.3.0及更高版本。
依赖关系
TCP示例
子过程中用chudnovsky算法计算pi。关于细节, 见http://www.craig-wood.com/nick/articles/pi-chudnovsky/。
结果
Computation of PI with 5 places finished in 0.022390 seconds Computation of PI with 100 places finished in 0.037856 seconds Computation of PI with 1000 places finished in 0.038070 seconds Computation of PI with 10000 places finished in 0.073907 seconds Computation of PI with 100000 places finished in 6.741683 seconds Computation of PI with 5 places finished in 0.001142 seconds Computation of PI with 100 places finished in 0.001182 seconds Computation of PI with 1000 places finished in 0.001206 seconds Computation of PI with 10000 places finished in 0.001230 seconds Computation of PI with 100000 places finished in 0.001255 seconds Computation of PI with 1000000 places finished in 432.574457 seconds Computation of PI with 1000000 places finished in 402.551226 seconds DONE
服务器
from__future__importprint_functionfromcollectionsimportdefaultdictfromtwisted.internetimportdefer,reactor,utilsfromtwisted.pythonimportfailurefromtxmsgpackrpc.serverimportMsgpackRPCServerpi_chudovsky_bs=''' """ Python3 program to calculate Pi using python long integers, binary splitting and the Chudnovsky algorithm See: http://www.craig-wood.com/nick/articles/pi-chudnovsky/ for more info Nick Craig-Wood <nick@craig-wood.com> """ import math from time import time def sqrt(n, one): """ Return the square root of n as a fixed point number with the one passed in. It uses a second order Newton-Raphson convgence. This doubles the number of significant figures on each iteration. """ # Use floating point arithmetic to make an initial guess floating_point_precision = 10**16 n_float = float((n * floating_point_precision) // one) / floating_point_precision x = (int(floating_point_precision * math.sqrt(n_float)) * one) // floating_point_precision n_one = n * one while 1: x_old = x x = (x + n_one // x) // 2 if x == x_old: break return x def pi_chudnovsky_bs(digits): """ Compute int(pi * 10**digits) This is done using Chudnovsky's series with binary splitting """ C = 640320 C3_OVER_24 = C**3 // 24 def bs(a, b): """ Computes the terms for binary splitting the Chudnovsky infinite series a(a) = +/- (13591409 + 545140134*a) p(a) = (6*a-5)*(2*a-1)*(6*a-1) b(a) = 1 q(a) = a*a*a*C3_OVER_24 returns P(a,b), Q(a,b) and T(a,b) """ if b - a == 1: # Directly compute P(a,a+1), Q(a,a+1) and T(a,a+1) if a == 0: Pab = Qab = 1 else: Pab = (6*a-5)*(2*a-1)*(6*a-1) Qab = a*a*a*C3_OVER_24 Tab = Pab * (13591409 + 545140134*a) # a(a) * p(a) if a & 1: Tab = -Tab else: # Recursively compute P(a,b), Q(a,b) and T(a,b) # m is the midpoint of a and b m = (a + b) // 2 # Recursively calculate P(a,m), Q(a,m) and T(a,m) Pam, Qam, Tam = bs(a, m) # Recursively calculate P(m,b), Q(m,b) and T(m,b) Pmb, Qmb, Tmb = bs(m, b) # Now combine Pab = Pam * Pmb Qab = Qam * Qmb Tab = Qmb * Tam + Pam * Tmb return Pab, Qab, Tab # how many terms to compute DIGITS_PER_TERM = math.log10(C3_OVER_24/6/2/6) N = int(digits/DIGITS_PER_TERM + 1) # Calclate P(0,N) and Q(0,N) P, Q, T = bs(0, N) one = 10**digits sqrtC = sqrt(10005*one, one) return (Q*426880*sqrtC) // T if __name__ == "__main__": import sys digits = int(sys.argv[1]) pi = pi_chudnovsky_bs(digits) print(pi) '''defset_timeout(deferred,timeout=30):defcallback(value):ifnotwatchdog.called:watchdog.cancel()returnvaluedeferred.addBoth(callback)watchdog=reactor.callLater(timeout,defer.timeout,deferred)classComputePI(MsgpackRPCServer):def__init__(self):self.waiting=defaultdict(list)self.results={}defremote_PI(self,digits,timeout=None):ifdigitsinself.results:returndefer.succeed(self.results[digits])d=defer.Deferred()ifdigitsnotinself.waiting:subprocessDeferred=self.computePI(digits,timeout)defcallWaiting(res):waiting=self.waiting[digits]delself.waiting[digits]ifisinstance(res,failure.Failure):func=lambdad:d.errback(res)else:func=lambdad:d.callback(res)fordinwaiting:func(d)subprocessDeferred.addBoth(callWaiting)self.waiting[digits].append(d)returnddefcomputePI(self,digits,timeout):d=utils.getProcessOutputAndValue('/usr/bin/python',args=('-c',pi_chudovsky_bs,str(digits)))defcallback((out,err,code)):ifcode==0:pi=int(out)self.results[digits]=pireturnpielse:returnfailure.Failure(RuntimeError('Computation failed: '+err))iftimeoutisnotNone:set_timeout(d,timeout)d.addCallback(callback)returnddefmain():server=ComputePI()reactor.listenTCP(8000,server.getStreamFactory())if__name__=='__main__':reactor.callWhenRunning(main)reactor.run()
客户
from__future__importprint_functionimportsysimporttimefromtwisted.internetimportdefer,reactor,taskfromtwisted.pythonimportfailure@defer.inlineCallbacksdefmain():try:fromtxmsgpackrpc.clientimportconnectc=yieldconnect('localhost',8000,waitTimeout=900)defcallback(res,digits,start_time):ifisinstance(res,failure.Failure):print('Computation of PI with %d places failed: %s'%(digits,res.getErrorMessage()),end='\n\n')else:print('Computation of PI with %d places finished in %f seconds'%(digits,time.time()-start_time),end='\n\n')sys.stdout.flush()defers=[]for_inrange(2):fordigitsin(5,100,1000,10000,100000,1000000):d=c.createRequest('PI',digits,600)d.addBoth(callback,digits,time.time())defers.append(d)# wait for 30 secondsyieldtask.deferLater(reactor,30,lambda:None)yielddefer.DeferredList(defers)print('DONE')exceptException:importtracebacktraceback.print_exc()finally:reactor.stop()if__name__=='__main__':reactor.callWhenRunning(main)reactor.run()
多播udp示例
示例服务器加入组224.0.0.5并侦听端口8000。他们唯一的 方法echo返回其参数。
客户端加入组到224.0.0.5,向端口8000上的组发送多播请求 等待5秒的响应。如果收到一些回复, 检查带有结果元组和单个部分的协议回调 错误。如果未收到响应,则协议将返回timeouterror。
因为没有通用的方法来确定组中的对等点数量, msgpackmulticast datagramprotocol总是等待响应,直到waittimeout 到期。
$ # setup multicast routing $ ip route add 224.0.0.0/4 dev eth0 $ echo1 > /proc/sys/net/ipv4/ip_forward $ $ # start servers listening on port 8000 $ python examples/tx_rpc_server_udp_multicast.py &[1]3584 $ python examples/tx_rpc_server_udp_multicast.py &[2]3585 $ python examples/tx_rpc_server_udp_multicast.py &[3]3586 $ python examples/tx_rpc_server_udp_multicast.py &[4]3587 $ python examples/tx_rpc_server_udp_multicast.py &[5]3588 $ $ # execute client $ python examples/tx_rpc_client_udp_multicast.py Received results from 5 peers $
服务器
fromtwisted.internetimportdefer,reactor,taskfromtxmsgpackrpc.serverimportMsgpackRPCServerclassEchoRPC(MsgpackRPCServer):@defer.inlineCallbacksdefremote_echo(self,value,delay=None,msgid=None):ifdelayisnotNone:yieldtask.deferLater(reactor,delay,lambda:None)defer.returnValue(value)defmain():server=EchoRPC()reactor.listenMulticast(8000,server.getMulticastProtocol('228.0.0.5',ttl=5),listenMultiple=True)if__name__=='__main__':reactor.callWhenRunning(main)reactor.run()
客户
from__future__importprint_functionfromtwisted.internetimportdefer,reactor@defer.inlineCallbacksdefmain():try:fromtxmsgpackrpc.clientimportconnect_multicastc=yieldconnect_multicast('228.0.0.5',8000,ttl=5,waitTimeout=5)data={'firstName':'John','lastName':'Smith','isAlive':True,'age':25,'height_cm':167.6,'address':{'streetAddress':"21 2nd Street","city":'New York',"state":'NY','postalCode':'10021-3100'},'phoneNumbers':[{'type':'home','number':'212 555-1234'},{'type':'office','number':'646 555-4567'}],'children':[],'spouse':None}results=yieldc.createRequest('echo',data)assertisinstance(results,tuple)print('Received results from %d peers'%len(results))fori,resultinenumerate(results):ifresult!=data:print('Result %d mismatch'%i)print(result)exceptException:importtracebacktraceback.print_exc()finally:reactor.stop()if__name__=='__main__':reactor.callWhenRunning(main)reactor.run()