用于管理网络工作负载的连接群的包

pynetwork的Python项目详细描述


pynetwork

用于网络工作负载的高性能套接字群集

重要更新:

由于pypi中存在名称冲突,我不得不将库的名称从"pynet"更改为"pynetwork"

摘要

pynet包旨在帮助编写脚本/项目,这些脚本/项目依赖于网络上的进程间通信,而对套接字的了解很少甚至完全没有。此外,这些数据传输/传输可以以群方式完成,这意味着多个或多个分散的TCP连接群可以为单个目标提供高性能。 使用此软件包可以实现以下目标:

  1. 编写可以使用回调(不需要轮询)将数据流传送到客户端的子例程/函数
  2. 编写可以将成批数据发送到客户端的子例程/函数
  3. 通过网络向子程序/函数发送数据
  4. 使用群集连接将文件从服务器发送到服务器以获得更高的传输速率
  5. < > >

    安装

    我已将此包添加到pypi包索引中,因此现在可以使用p i p安装它:

    pip安装pynetwork

    前奏曲

    作为一个有趣的项目,我想要一辆基于覆盆子pi的rc车,可以通过wi-fi进行控制。在接收端,一个 不同的深层神经网络结构,可以消耗来自树莓pi型摄像机馈送(到cnn模型)的数据,馈送来自 各种传感器,例如接近、速度编码器等(到RNN),然后可以对其进行控制。但其中一个关键问题是 在实现它时,面临的问题是如何在我的笔记本电脑和PI之间以低成本实时地来回传输所有这些数据 潜伏期。这给了我一个想法:编写一个简单的基于json的网络代码,可以来回传输自定义python对象,但是 这还不够,因为我正在处理多个数据源,管理这些多个TCP连接变得越来越困难。后 在github上进行了一些无用的搜索,我决定从头开始编写一个网络库,它可以管理 连接还可以帮助从python子例程(这些子例程是pythonian生成器,可以 无休止地读取和传输各个传感器数据)。除了基于子例程的流之外,我还想添加批处理 子例程数据传输(正如您可能已经意识到的那样,这些想法听起来与用于提供数据的keras生成器类似 到机器学习模型)。经过一个月的后台编码和测试,我终于发布了pynetwork的v2。这个版本应该 不仅适用于物联网,还可用于任何网络工作负载。

    在跳到一些用例之前,下面是一些在使用之前应该注意的术语和组件:

    网关<;-->;控制器

    涉及的主要组件有4个:网关、控制器、处理程序和客户端。网关充当一个小型服务器来监听 控制器在指定端口上发出的请求。网关和控制器共享多对多关系(一个网关可以响应 对于多个控制器,一个控制器可以连接到多个网关)。网关控制器本身不执行任何工作负载 操作(如数据/文件传输),但它们负责生成和管理要处理的大量TCP连接 你的工作量。每当控制器请求网关时,网关生成处理程序并向控制器发送确认 然后作为响应,控制器为该处理程序创建一个客户端。因此,一个请求会产生一对"handler<;->;client"。 处理程序和客户机协同工作以处理您的自定义工作负载。为了提高操作速度,可以生成多对 处理者和客户。网关上拥有的每个处理程序都运行在一个独立的线程上(这是性能提升COMES<BR/> from)&;这些处理程序线程随后由网关管理。可能生成的处理程序<;->;客户端的数量取决于 处理程序池的大小(基本上是自定义线程池)。默认情况下,池大小为5,但可以将其增加到任意数量 取决于您的硬件。

    Swarm架构类似于这样

    在主机设备上使用网关

    gw=pynet.Gateway(port=1857)# Define port on which gateway will listen to controller requests.#Then register your subroutines (Either generators/ non-generators)gw.add_subroutine('mount_usage',get_usage_percentage)gw.add_subroutine('remove_logs',remove_logs)#Its not necessary to add your subroutines before the gateway starts listening..gw.start(blocking=True)#start listening to controller#When blocking is True, then gateway will block the main thread (Similar to App.mainloop() in wxpython), untill it is closed #by controller, If you are sure that this main thread wont run to an end, then you can set it too false or add your custom#logic to keep this main thread alive.

    在客户端设备上使用网关

    controller=pynet.Controller(gateway_ip='GATEWAY_IP',port=1857,download_dir='mydir',verbose=True)#create a controllerclient1=controller.get_client()#When you call get_client, the controller requests for a dedicated handler with the Gateway, if the gateway 'Handler pool'#is not full then you'll get a client instance else controller will raise an error for overflowing the pool.#Once connected, do your stuff..client1.ping('Hello there..')#Once you are done, you can either close the dedicated handler for your client of the gateway itself.client1.close_handler()#/ or client1.close_gateway() at which the gw.start(blocking = True) will stop blocking

    处理程序<;-->;客户端

    一对独立的handler客户端提供各种数据传输选项:

    1. 从处理程序子例程接收数据流

      您可以编写python generator函数来无限地产生数据(字节数组),并将它们注册/添加到网关。一旦添加到 网关,这些子例程将传递给任何新生成的处理程序。然后,客户机可以要求处理程序从 生成器子例程,直到它引发StopIteration异常或不返回任何异常。 这种流媒体的例子可以是: a.多传感器数据流 b.通过网络向密钥记录器发送流式按键 从生成器流式传输的数据是字节数组类型的数据,与此字节数组数据一起,函数还需要 返回"id"(或idetity)。当您使用一个连接群来传输数据时,这个id可能非常有用,因为 如果我的多个处理程序正在使用同一个函数,那么就没有与数据的顺序相同的gaurantee 函数返回的将保留在接收端。在这种情况下,如果 需要。

      使用此功能的步骤:

    2. < > >

      a.编写一个python"generator"函数来读取字节数组数据 此函数需要的返回签名是一个"python set",其中包含2个元素(ident,buffer) 其中ident是一个整数,表示来自steam&buffer的块的索引是一个字节数组(即数据 流媒体)。

      importpynetwork.backend2asbkdefyour_function_name(arg1,arg2,..,**kwargs):#parameters are optionalindex=1whilecondition:passobj=your_logic_to_generate_data()#lets your data is a custom python obj#if its an object, then you can use your own custom json encode/decoderobj_str=your_json_encoder(obj)yield(index,bk.str_to_bytes(obj_to_str))

      您可以使用包的backend2模块中的任何字节转换方法。可以转换int、str或json str, 或对象(使用默认值构造函数)到字节数组

      c.当生成器引发"StopIteration"错误或标识符<;0或缓冲区为"None"时,流将终止 d.您也可以通过在流回调(用户请求的流)中返回"false"来终止来自客户端的流 中止)

      e.如果流结束(没有用户请求的流中止),则客户端调用"eos_callback"

      f.启动流时,还可以将位置参数&kwargs传递给此生成器。 (包序列化参数,然后在执行函数的网关反序列化这些参数 参数相同)

      1. 从处理程序子例程接收数据批

        如果您需要基于事务的数据而不需要蒸汽,那么可以选择批处理子例程。这些都很简单 python函数(不需要作为生成器编写)返回字节数据(该批具有唯一id),然后 处理程序将把这些数据发送给客户。

      2. 将批数据发送到处理程序子例程

        您也可以将数据发送到在gateway中注册的子例程。此数据可用作参数 缓冲区在函数的输入中。这是附加的字节数组输入,除了通常的参数&kwargs 你可以传递给你的子程序。代位执行后ne,处理程序要求传递整数返回 作为输出返回到客户端。

        使用此功能的步骤:

      3. < > >

        a.编写一个函数(使用您想要的任何位置/kwarg)并使用gateway注册它 包希望此函数返回一个int,然后将其发送回客户端,但是此int返回是 眼科的如果您不返回任何内容,在客户端,您的脚本将收到0。

        defyour_function_name(arg1,arg2,..,**kwargs):result=2passreturnresult#return your integer result if required

        c.启动流时,还可以将位置参数&kwargs传递给此生成器。 (包序列化参数,然后在执行函数的网关反序列化这些参数 参数相同)

        1. 将文件从网关下载到客户端设备(从文件夹名®ex或完整路径下载到文件)

          这里不需要描述,除了这样一个事实,您可以通过在多个 连接。这里要注意的一件重要的事情是,如果文件不太多,速度的提高可以忽略不计 &在这种情况下,带宽将成为限制因素。但是如果有很多小文件,您可以 可能会看到一个可分的改进(所以swarm在你传输日志文件的时候会特别有用)。 文件计数通常很高的应用程序(每个日志的大小限制为几百MB) 我很快就会上传一些数据来支持上述参数

        2. 将文件发送到网关(文件下载到相关文件夹)

          同上,但方向相反。

        3. ping处理程序以检查连接

          可能有助于测试连接。

        4. < > >

          下面是一些使用pynet并增加复杂性的用例教程:

          教程1:使用批处理数据传输的条件文件备份

          考虑这样一个场景:您在虚拟机节点或服务器上的存储空间有限,并且希望从此设备中移动较旧的文件 当存储使用率超过20%时(从旧文件开始)。这一目标可以实现如下:

          在网关侧
          importpynetworkaspynetimportpynetwork.backend2asbkimportstructimportshutilimportosdefget_usage_percentage(request_id:int,**kwargs):'''  Logic to calculate mount/overall hdd usage  '''total,used,free=shutil.disk_usage("\\")usage_per=used/totalbk.safe_print('usage',usage_per)# safe_print make sure that output from multiple client/handler threads to console do not mix-upb=struct.pack('f',usage_per)#convert the data bytes that needs to be sent backreturnbdefremove_logs(dirpath:str,**kwargs):files=[os.path.join(dirpath,f)forfinos.listdir(dirpath)]forfinfiles:#iterate over files delete each oneos.remove(f)bk.safe_print(len(files),' files deleted')returnbk.int_to_bytes(len(files))if__name__=='__main__':gw=pynet.Gateway(1857)#start gateway at port 1857gw.add_subroutine('mount_usage',get_usage_percentage)gw.add_subroutine('remove_logs',remove_logs)#add above two subroutines with a key, that client can pass to request executiongw.start(blocking=True)#start listening to controller

          在控制器侧
          importpynetworkaspynetimportpynetwork.backend2asbkimportstructif__name__=='__main__':controller=pynet.Controller(gateway_ip='',port=1857,download_dir='mydir',verbose=True)client1=controller.get_client()client1.ping('Hello there..')b=client1.get_subroutine_batch(name='mount_usage',arguments=[123,])usage=struct.unpack('f',b)[0]bk.safe_print('mount usage:',usage)ifusage>0.2:bk.safe_print('downloading files..')client1.get_files_from_gateway(folder='D:\logs',regex='.+RFIN703235761L')#using regex, u can filter files within a folder#in this case, we are only downloading log file belonging to tensorflow as a testcount_bytes=client1.get_subroutine_batch(name='remove_logs',arguments=['D:\logs',])bk.safe_print(bk.bytes_to_int(count_bytes),' files deleted')else:passclient1.close_handler()#close handler so the handler pool at gateway will remain empty for others#additionally you can also stop Gatway by: client1.close_gateway()

          此外,您还可以在"删除日志"功能中添加逻辑,以便在特定日期之前删除文件,依此类推。因为您可以在python子例程中集成任何自定义逻辑,所以可能性是无穷的。

          编写接收批处理数据的子例程(handler->;client)时需要考虑的一些事项:

          a.函数可以将参数&;em>kwargs作为输入,这里的参数表示位置参数(与通常的python转换相反) 很抱歉,我将在下一版本中更改名称 b.函数需要以字节格式返回数据。所以函数可以用backed中的任何一种方法来转换int, str到byes(我很快会添加支持float,但同时可以使用struct包)。此外,如果您想返回 自定义类,然后使用自己的自定义编码将对象序列化为json字符串,然后使用backed将此字符串转换为 字节,然后在控制器端执行相反的操作。 如果要使用后端可用的对象序列化,则可以将类添加到"握手文件"中 使用内置序列化,但类中必须有基元数据类型,并将默认值指定给 类的init方法中的所有输入参数。我将对序列化进行一些更改,以便在下一个版本中为类定义提供更多的自由

          编写用于传输批处理数据的子例程时需要考虑的一些事项(客户机->;处理程序):

          a.函数可以有参数;kwargs作为输入,这里的参数表示位置参数(与通常的python相反 输送) b.函数需要返回字节格式的整数结果,结果将发送回客户端

          教程2:WiFi上的覆盆子PI控制(同时使用流媒体和批处理数据传输)

          正在工作…将在1-2天内发布!< /P>

          欢迎加入QQ群-->: 979659372 Python中文网_新手群

          推荐PyPI第三方库


热门话题
java如何在Android上选择/强制使用移动数据(或wifi)进行网络通话?   java与多线程并行运行多个测试   java如何修复javax。加密。IllegalBlockSizeException:在安卓中输入必须小于64字节?   java未找到适合jdbc:ucanaccess://(从终端运行)的驱动程序   将所有类嵌套在单个公共类中的java优缺点   尝试将字符串格式的日期转换为java时获取意外日期。util。日期   java在Jackson的改装请求中插入注释   java错误:日期为的对象验证失败   java FirebaseError异常电子邮件/密码身份验证   java在ApacheTika中MediaType和MimeType类之间有什么区别?   java双精度损失   java Android:应用程序处于后台时,未从BroadcastReceiver调用活动   仅当填写了所有字段时,java才使用JAXB将JSON转换为对象   java抽象模式类型“User_Book”未知   java如何将spring属性值直接读入xml文件   xpath中的java搜索   当使用相同的连接创建PreparedStatement时,java Derby获取RETURN_生成的_键似乎不是线程安全的   为什么泛型超类型的Java类型推断在此中断?