用于管理网络工作负载的连接群的包
pynetwork的Python项目详细描述
pynetwork
用于网络工作负载的高性能套接字群集
重要更新:
由于pypi中存在名称冲突,我不得不将库的名称从"pynet"更改为"pynetwork"
摘要
pynet包旨在帮助编写脚本/项目,这些脚本/项目依赖于网络上的进程间通信,而对套接字的了解很少甚至完全没有。此外,这些数据传输/传输可以以群方式完成,这意味着多个或多个分散的TCP连接群可以为单个目标提供高性能。 使用此软件包可以实现以下目标:
- 编写可以使用回调(不需要轮询)将数据流传送到客户端的子例程/函数
- 编写可以将成批数据发送到客户端的子例程/函数
- 通过网络向子程序/函数发送数据
- 使用群集连接将文件从服务器发送到服务器以获得更高的传输速率 < > >
从处理程序子例程接收数据流
您可以编写python generator函数来无限地产生数据(字节数组),并将它们注册/添加到网关。一旦添加到 网关,这些子例程将传递给任何新生成的处理程序。然后,客户机可以要求处理程序从 生成器子例程,直到它引发StopIteration异常或不返回任何异常。 这种流媒体的例子可以是: a.多传感器数据流 b.通过网络向密钥记录器发送流式按键 从生成器流式传输的数据是字节数组类型的数据,与此字节数组数据一起,函数还需要 返回"id"(或idetity)。当您使用一个连接群来传输数据时,这个id可能非常有用,因为 如果我的多个处理程序正在使用同一个函数,那么就没有与数据的顺序相同的gaurantee 函数返回的将保留在接收端。在这种情况下,如果 需要。
使用此功能的步骤:
< > >从处理程序子例程接收数据批
如果您需要基于事务的数据而不需要蒸汽,那么可以选择批处理子例程。这些都很简单 python函数(不需要作为生成器编写)返回字节数据(该批具有唯一id),然后 处理程序将把这些数据发送给客户。
将批数据发送到处理程序子例程
您也可以将数据发送到在gateway中注册的子例程。此数据可用作参数 缓冲区在函数的输入中。这是附加的字节数组输入,除了通常的参数&kwargs 你可以传递给你的子程序。代位执行后ne,处理程序要求传递整数返回 作为输出返回到客户端。
使用此功能的步骤:
< > >将文件从网关下载到客户端设备(从文件夹名®ex或完整路径下载到文件)
这里不需要描述,除了这样一个事实,您可以通过在多个 连接。这里要注意的一件重要的事情是,如果文件不太多,速度的提高可以忽略不计 &在这种情况下,带宽将成为限制因素。但是如果有很多小文件,您可以 可能会看到一个可分的改进(所以swarm在你传输日志文件的时候会特别有用)。 文件计数通常很高的应用程序(每个日志的大小限制为几百MB) 我很快就会上传一些数据来支持上述参数
将文件发送到网关(文件下载到相关文件夹)
同上,但方向相反。
ping处理程序以检查连接
可能有助于测试连接。
< > >
安装
我已将此包添加到pypi包索引中,因此现在可以使用p i p安装它:
pip安装pynetwork
前奏曲
作为一个有趣的项目,我想要一辆基于覆盆子pi的rc车,可以通过wi-fi进行控制。在接收端,一个 不同的深层神经网络结构,可以消耗来自树莓pi型摄像机馈送(到cnn模型)的数据,馈送来自 各种传感器,例如接近、速度编码器等(到RNN),然后可以对其进行控制。但其中一个关键问题是 在实现它时,面临的问题是如何在我的笔记本电脑和PI之间以低成本实时地来回传输所有这些数据 潜伏期。这给了我一个想法:编写一个简单的基于json的网络代码,可以来回传输自定义python对象,但是 这还不够,因为我正在处理多个数据源,管理这些多个TCP连接变得越来越困难。后 在github上进行了一些无用的搜索,我决定从头开始编写一个网络库,它可以管理 连接还可以帮助从python子例程(这些子例程是pythonian生成器,可以 无休止地读取和传输各个传感器数据)。除了基于子例程的流之外,我还想添加批处理 子例程数据传输(正如您可能已经意识到的那样,这些想法听起来与用于提供数据的keras生成器类似 到机器学习模型)。经过一个月的后台编码和测试,我终于发布了pynetwork的v2。这个版本应该 不仅适用于物联网,还可用于任何网络工作负载。
在跳到一些用例之前,下面是一些在使用之前应该注意的术语和组件:
网关<;-->;控制器
涉及的主要组件有4个:网关、控制器、处理程序和客户端。网关充当一个小型服务器来监听 控制器在指定端口上发出的请求。网关和控制器共享多对多关系(一个网关可以响应 对于多个控制器,一个控制器可以连接到多个网关)。网关控制器本身不执行任何工作负载 操作(如数据/文件传输),但它们负责生成和管理要处理的大量TCP连接 你的工作量。每当控制器请求网关时,网关生成处理程序并向控制器发送确认 然后作为响应,控制器为该处理程序创建一个客户端。因此,一个请求会产生一对"handler<;->;client"。 处理程序和客户机协同工作以处理您的自定义工作负载。为了提高操作速度,可以生成多对 处理者和客户。网关上拥有的每个处理程序都运行在一个独立的线程上(这是性能提升COMES<BR/> from)&;这些处理程序线程随后由网关管理。可能生成的处理程序<;->;客户端的数量取决于 处理程序池的大小(基本上是自定义线程池)。默认情况下,池大小为5,但可以将其增加到任意数量 取决于您的硬件。
Swarm架构类似于这样
在主机设备上使用网关 在客户端设备上使用网关 一对独立的handler客户端提供各种数据传输选项: a.编写一个python"generator"函数来读取字节数组数据
此函数需要的返回签名是一个"python set",其中包含2个元素(ident,buffer)
其中ident是一个整数,表示来自steam&buffer的块的索引是一个字节数组(即数据
流媒体)。 您可以使用包的backend2模块中的任何字节转换方法。可以转换int、str或json str,
或对象(使用默认值构造函数)到字节数组 c.当生成器引发"StopIteration"错误或标识符<;0或缓冲区为"None"时,流将终止
d.您也可以通过在流回调(用户请求的流)中返回"false"来终止来自客户端的流
中止) e.如果流结束(没有用户请求的流中止),则客户端调用"eos_callback" f.启动流时,还可以将位置参数&kwargs传递给此生成器。
(包序列化参数,然后在执行函数的网关反序列化这些参数
参数相同) a.编写一个函数(使用您想要的任何位置/kwarg)并使用gateway注册它
包希望此函数返回一个int,然后将其发送回客户端,但是此int返回是
眼科的如果您不返回任何内容,在客户端,您的脚本将收到0。 c.启动流时,还可以将位置参数&kwargs传递给此生成器。
(包序列化参数,然后在执行函数的网关反序列化这些参数
参数相同) 下面是一些使用pynet并增加复杂性的用例教程: 考虑这样一个场景:您在虚拟机节点或服务器上的存储空间有限,并且希望从此设备中移动较旧的文件
当存储使用率超过20%时(从旧文件开始)。这一目标可以实现如下: 此外,您还可以在"删除日志"功能中添加逻辑,以便在特定日期之前删除文件,依此类推。因为您可以在python子例程中集成任何自定义逻辑,所以可能性是无穷的。 a.函数可以将参数&;em>kwargs作为输入,这里的参数表示位置参数(与通常的python转换相反)
很抱歉,我将在下一版本中更改名称
b.函数需要以字节格式返回数据。所以函数可以用backed中的任何一种方法来转换int,
str到byes(我很快会添加支持float,但同时可以使用struct包)。此外,如果您想返回
自定义类,然后使用自己的自定义编码将对象序列化为json字符串,然后使用backed将此字符串转换为
字节,然后在控制器端执行相反的操作。
如果要使用后端可用的对象序列化,则可以将类添加到"握手文件"中
使用内置序列化,但类中必须有基元数据类型,并将默认值指定给
类的init方法中的所有输入参数。我将对序列化进行一些更改,以便在下一个版本中为类定义提供更多的自由 a.函数可以有参数;kwargs作为输入,这里的参数表示位置参数(与通常的python相反
输送)
b.函数需要返回字节格式的整数结果,结果将发送回客户端gw=pynet.Gateway(port=1857)# Define port on which gateway will listen to controller requests.#Then register your subroutines (Either generators/ non-generators)gw.add_subroutine('mount_usage',get_usage_percentage)gw.add_subroutine('remove_logs',remove_logs)#Its not necessary to add your subroutines before the gateway starts listening..gw.start(blocking=True)#start listening to controller#When blocking is True, then gateway will block the main thread (Similar to App.mainloop() in wxpython), untill it is closed #by controller, If you are sure that this main thread wont run to an end, then you can set it too false or add your custom#logic to keep this main thread alive.
controller=pynet.Controller(gateway_ip='GATEWAY_IP',port=1857,download_dir='mydir',verbose=True)#create a controllerclient1=controller.get_client()#When you call get_client, the controller requests for a dedicated handler with the Gateway, if the gateway 'Handler pool'#is not full then you'll get a client instance else controller will raise an error for overflowing the pool.#Once connected, do your stuff..client1.ping('Hello there..')#Once you are done, you can either close the dedicated handler for your client of the gateway itself.client1.close_handler()#/ or client1.close_gateway() at which the gw.start(blocking = True) will stop blocking
处理程序<;-->;客户端
importpynetwork.backend2asbkdefyour_function_name(arg1,arg2,..,**kwargs):#parameters are optionalindex=1whilecondition:passobj=your_logic_to_generate_data()#lets your data is a custom python obj#if its an object, then you can use your own custom json encode/decoderobj_str=your_json_encoder(obj)yield(index,bk.str_to_bytes(obj_to_str))
defyour_function_name(arg1,arg2,..,**kwargs):result=2passreturnresult#return your integer result if required
教程1:使用批处理数据传输的条件文件备份
在网关侧
importpynetworkaspynetimportpynetwork.backend2asbkimportstructimportshutilimportosdefget_usage_percentage(request_id:int,**kwargs):''' Logic to calculate mount/overall hdd usage '''total,used,free=shutil.disk_usage("\\")usage_per=used/totalbk.safe_print('usage',usage_per)# safe_print make sure that output from multiple client/handler threads to console do not mix-upb=struct.pack('f',usage_per)#convert the data bytes that needs to be sent backreturnbdefremove_logs(dirpath:str,**kwargs):files=[os.path.join(dirpath,f)forfinos.listdir(dirpath)]forfinfiles:#iterate over files delete each oneos.remove(f)bk.safe_print(len(files),' files deleted')returnbk.int_to_bytes(len(files))if__name__=='__main__':gw=pynet.Gateway(1857)#start gateway at port 1857gw.add_subroutine('mount_usage',get_usage_percentage)gw.add_subroutine('remove_logs',remove_logs)#add above two subroutines with a key, that client can pass to request executiongw.start(blocking=True)#start listening to controller
在控制器侧
importpynetworkaspynetimportpynetwork.backend2asbkimportstructif__name__=='__main__':controller=pynet.Controller(gateway_ip='',port=1857,download_dir='mydir',verbose=True)client1=controller.get_client()client1.ping('Hello there..')b=client1.get_subroutine_batch(name='mount_usage',arguments=[123,])usage=struct.unpack('f',b)[0]bk.safe_print('mount usage:',usage)ifusage>0.2:bk.safe_print('downloading files..')client1.get_files_from_gateway(folder='D:\logs',regex='.+RFIN703235761L')#using regex, u can filter files within a folder#in this case, we are only downloading log file belonging to tensorflow as a testcount_bytes=client1.get_subroutine_batch(name='remove_logs',arguments=['D:\logs',])bk.safe_print(bk.bytes_to_int(count_bytes),' files deleted')else:passclient1.close_handler()#close handler so the handler pool at gateway will remain empty for others#additionally you can also stop Gatway by: client1.close_gateway()
编写接收批处理数据的子例程(handler->;client)时需要考虑的一些事项:
编写用于传输批处理数据的子例程时需要考虑的一些事项(客户机->;处理程序):
教程2:WiFi上的覆盆子PI控制(同时使用流媒体和批处理数据传输)