使用NATS或NATS流构建数据流管道

faststan的Python项目详细描述


法斯特斯坦

Pipeline statusCoverage reportPackaging: poetryStyle: flake8Format: blackPackaging: pytestPyPIDocumentationLicense: MITGitpod ready-to-code

使用Python轻松部署nat和NATS流式订阅服务器。在

特点

  • 使用sync和async python函数定义订阅服务器
  • 使用类型注释和pydantic的自动数据解析和验证
  • 支持中提供的所有订阅配置斯坦皮伊以及自然人.py在
  • 从命令行启动订阅或服务
  • 从命令行发布消息

快速入门

  • 从pypi安装包:
pip install faststan

使用命令行

创建第一个NATS订阅服务器:

  • 创建一个名为app.py的文件,并写入以下行:
^{pr2}$
  • 启动订阅服务器:
nats sub start demo --function app:on_event
  • 发布消息:
nats pub demo --name "John Doe" --datetime 1602661983

NAT流的行为方式相同:

  • 定义订阅:
frompydanticimportBaseModelclassGreetings(BaseModel):message:strdefon_event(event:NewEvent)->Greetings:print(f"Info :: Received new request.")returnGreetings(message=f"Welcome to {event.name}!"
  • 使用stan sub start命令启动它:
stan sub start demo --function app:on_event
  • 并使用stan pub命令发布消息:
stan pub demo --name "John Doe"

使用Python API

在本例中,我们将构建一个使用ONNX模型执行预测的机器学习服务。此服务将使用[request/reply]模式实现。在

在运行示例之前,请确保已安装依赖项:

  • onnxruntime
  • numpy
  • httpx
importasynciofromtypingimportList,Dictfromfaststan.natsimportFastNATSfrompydanticimportBaseModel,validatorfromhttpximportAsyncClientimportnumpyasnpimportonnxruntimeasrtasyncdefload_predictor(app:FastNATS,url:str="https://s3-per-grenoble.ams3.digitaloceanspaces.com/models/rf_iris.onnx",)->None:"""Load an ONNX model and return a predictor for this model."""asyncwithAsyncClient()ashttp_client:http_response=awaithttp_client.get(url)sess=rt.InferenceSession(http_response.content)input_name=sess.get_inputs()[0].namelabel_name=sess.get_outputs()[0].nameproba_name=sess.get_outputs()[1].namedefpredict(data:np.ndarray):"""Perform prediction for given data."""returnsess.run([label_name,proba_name],{input_name:data})app.state["predictor"]=predictclassEvent(BaseModel):"""Incoming data expected by the predictor."""values:np.ndarraytimestamp:int@validator("values",pre=True)defvalidate_array(cls,value):"""Cast data to numpy array with float32 precision.        A ValidationError will be raise if any error is raised in this function.        """returnnp.array(value,dtype=np.float32)classConfig:# This must be set to True in order to let pydantic handle numpy typesarbitrary_types_allowed=TrueclassResult(BaseModel):"""Result returned by the predictor."""probabilities:List[Dict[int,float]]# Example: [{ 0: 0.25, 1:0.75}, {0: 0.15, 1: 0.85}]labels:List[int]# Example: [1, 1]app=FastNATS()app.state={}awaitload_predictor(app)awaitapp.connect()@app.reply("predict")defpredict(event:Event)->Result:print(f"{event.timestamp} :: Received new event data")labels,probas=app.state["predictor"](event.values)return{"probabilities":probas,"labels":labels.tolist()}awaitapp.start()
  • 您现在可以在服务上发布消息:
fromfaststanimportFastNATSasyncwithFastNATS()asnats_client:reply_msg=awaitnats_client.request_json("predict",{"values":[[0,0,0,0]],"timestamp":1602661983})print(f"Received a reply: {reply_msg}")

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何在点击JButton触发的进程仍在处理时更新JLabel?   try-catch为什么Java有嵌套的try语句?   java SSH命令执行失败,出现异常“net.schmizz.sshj.connection.ConnectionException:引发连接重置异常”   java在ApacheCamel的接口类解析器中,resolveMandatoryClass(字符串名称)有什么用途?   java如何在Eclipse远程调试器中找到有问题的线程?   java线程:containerlaunch退出代码127的异常   lambda左连接Java中的2个对象列表   Swift 2.0协议扩展和Java/C抽象类之间有区别吗?   安卓改造:使用GSON将JSON解析为多个Java对象   Spring中服务层的java角色(澄清)   html Java与网站的通信   Spring boot rest api是在不创建java类的情况下将getResultList()转换为映射以显示响应的最佳方法吗?   使用“getElementById”从javascript获取值到java   java如何在文本视图中以粗体和多色显示文本   java是设置TextView颜色的最有效方法