有没有办法从python调用rust的异步接口?

2024-04-23 18:26:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我将rust的reqwest的一些函数包装到req.lib文件中,并通过使用cffi成功地从python调用它。但是reqwest::blocking::Client迫使我在python中使用多线程。我发现reqwest可以在rust中以异步模式调用。我想知道有没有办法使req.lib异步?即使是半异步对我来说也是可以的

例如,当前存根签名为:

#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> *mut c_char

我可以写一些类似于:

#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> u64  // return request unique id

#[no_mangle]
pub extern "C" fn is_finished(req_id: u64) -> bool  // whether given request is done

#[no_mangle]
pub extern "C" fn fetch_result(req_id: u64) -> *mut c_char  // fetch response

因此cffi调用不再锁定主线程。我可以使用单个线程来调用多个请求。欢迎提供任何建议或最佳实践


Tags: noidurllibexternrustcffireq
1条回答
网友
1楼 · 发布于 2024-04-23 18:26:12

异步代码通过特殊的运行时执行,对于python和rust,它们是不同的且不兼容的库。在这里,您不能简单地在不同语言之间共享未来,它必须在创建它的同一语言中运行

对于您的示例,这意味着您需要在rust executor(例如在tokio)中运行Client,然后从中获得反馈。最简单的方法是创建一个全局的:

use lazy_static::lazy_static;
use tokio::runtime::Runtime;

lazy_static! {
    static ref RUNTIME: Runtime = Runtime::new().unwrap();
}

然后在生成之后,您需要有一个反馈,因此您可以使用两个带有状态和结果的映射:

use std::collections::HashMap;
use std::sync::RwLock;

use futures::prelude::*;
use tokio::sync::oneshot;

type FutureId = u64;
type UrlResult = reqwest::Result<String>;

type SyncMap<K, V> = RwLock<HashMap<K, V>>;

lazy_static! {
    // Map for feedback channels. Once result is computed, it is stored at `RESULTS`
    static ref STATUSES: SyncMap<FutureId, oneshot::Receiver<UrlResult>> = SyncMap::default();
    // Cache storage for results
    static ref RESULTS: SyncMap<FutureId, UrlResult> = SyncMap::default();
}

fn gen_unique_id() -> u64 { .. }

#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> FutureId {
    let url: &str = /* convert url */;

    let (tx, rx) = oneshot::channel();

    RUNTIME.spawn(async move {
        let body = reqwest::get(url).and_then(|b| b.text()).await;
        tx.send(body).unwrap(); // <- this one should be handled somehow
    });

    let id = gen_unique_id();

    STATUSES.write().unwrap().insert(id, rx);

    id
}

这里,对于正在创建的每个urlopen请求^{},这会延迟执行结果。因此,可以检查它是否完成:

#[no_mangle]
pub extern "C" fn is_finished(req_id: u64) -> bool {
    // first check in cache
    if RESULTS.read().unwrap().contains_key(&req_id) {
        true
    } else {
        let mut res = RESULTS.write().unwrap();
        let mut statuses = STATUSES.write().unwrap();

        // if nothing in cache, check the feedback channel
        if let Some(rx) = statuses.get_mut(&req_id) {
            let val = match rx.try_recv() {
                Ok(val) => val,
                Err(_) => {
                    // handle error somehow here
                    return true;
                }
            };

            // and cache the result, if available
            res.insert(req_id, val);
            true
        } else {
            // Unknown request id
            true
        }
    }
}

那么,获取结果相当简单:

#[no_mangle]
pub extern "C" fn fetch_result(req_id: u64) -> *const c_char {
    let res = RESULTS.read().unwrap();

    res.get(&req_id)
        // there `ok()` should probably be handled in some better way
        .and_then(|val| val.as_ref().ok())
        .map(|val| val.as_ptr())
        .unwrap_or(std::ptr::null()) as *const _
}

Playground链接

请记住,上述解决方案有其优点:

  • 缓存结果,可多次取数
  • API(希望)是线程安全的
  • 读写锁是分开的,这可能是比互斥锁更快的解决方案

还有一些重要的缺点:

  • RESULTS生长不定且从不清除
  • 线程安全使事情变得有点复杂,因此可能不需要,并且^{}可以用于全局变量而不是锁
  • 缺乏正确的错误处理
  • 使用RwLock,它有时可能比其他一些原语表现得更糟糕
  • STATUSESis_finished获得写访问权,不过最好先获得读访问权

相关问题 更多 >