使用NATS或NATS流构建数据流管道
faststan的Python项目详细描述
法斯特斯坦
使用Python轻松部署nat和NATS流式订阅服务器。在
特点
- 使用sync和async python函数定义订阅服务器
- 使用类型注释和pydantic的自动数据解析和验证
- 支持中提供的所有订阅配置斯坦皮伊以及自然人.py在
- 从命令行启动订阅或服务
- 从命令行发布消息
快速入门
- 从pypi安装包:
pip install faststan
使用命令行
创建第一个NATS订阅服务器:
- 创建一个名为
app.py
的文件,并写入以下行:
- 启动订阅服务器:
nats sub start demo --function app:on_event
- 发布消息:
nats pub demo --name "John Doe" --datetime 1602661983
NAT流的行为方式相同:
- 定义订阅:
frompydanticimportBaseModelclassGreetings(BaseModel):message:strdefon_event(event:NewEvent)->Greetings:print(f"Info :: Received new request.")returnGreetings(message=f"Welcome to {event.name}!"
- 使用
stan sub start
命令启动它:
stan sub start demo --function app:on_event
- 并使用
stan pub
命令发布消息:
stan pub demo --name "John Doe"
使用Python API
在本例中,我们将构建一个使用ONNX模型执行预测的机器学习服务。此服务将使用[request/reply]模式实现。在
在运行示例之前,请确保已安装依赖项:
onnxruntime
numpy
httpx
importasynciofromtypingimportList,Dictfromfaststan.natsimportFastNATSfrompydanticimportBaseModel,validatorfromhttpximportAsyncClientimportnumpyasnpimportonnxruntimeasrtasyncdefload_predictor(app:FastNATS,url:str="https://s3-per-grenoble.ams3.digitaloceanspaces.com/models/rf_iris.onnx",)->None:"""Load an ONNX model and return a predictor for this model."""asyncwithAsyncClient()ashttp_client:http_response=awaithttp_client.get(url)sess=rt.InferenceSession(http_response.content)input_name=sess.get_inputs()[0].namelabel_name=sess.get_outputs()[0].nameproba_name=sess.get_outputs()[1].namedefpredict(data:np.ndarray):"""Perform prediction for given data."""returnsess.run([label_name,proba_name],{input_name:data})app.state["predictor"]=predictclassEvent(BaseModel):"""Incoming data expected by the predictor."""values:np.ndarraytimestamp:int@validator("values",pre=True)defvalidate_array(cls,value):"""Cast data to numpy array with float32 precision. A ValidationError will be raise if any error is raised in this function. """returnnp.array(value,dtype=np.float32)classConfig:# This must be set to True in order to let pydantic handle numpy typesarbitrary_types_allowed=TrueclassResult(BaseModel):"""Result returned by the predictor."""probabilities:List[Dict[int,float]]# Example: [{ 0: 0.25, 1:0.75}, {0: 0.15, 1: 0.85}]labels:List[int]# Example: [1, 1]app=FastNATS()app.state={}awaitload_predictor(app)awaitapp.connect()@app.reply("predict")defpredict(event:Event)->Result:print(f"{event.timestamp} :: Received new event data")labels,probas=app.state["predictor"](event.values)return{"probabilities":probas,"labels":labels.tolist()}awaitapp.start()
- 您现在可以在服务上发布消息:
fromfaststanimportFastNATSasyncwithFastNATS()asnats_client:reply_msg=awaitnats_client.request_json("predict",{"values":[[0,0,0,0]],"timestamp":1602661983})print(f"Received a reply: {reply_msg}")
- 项目
标签: