对异步端点的调用被另一个线程阻止

2024-04-25 08:15:15 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个tornado网站服务,它将提供大约每分钟500个请求。所有这些请求都将命中一个特定端点。我用Cython编译了一个C++程序,并在tornado服务中使用它作为我的处理器引擎。每个到/check/的请求都将触发C++程序中的函数调用(我将其称为handler),返回值将作为响应发送给用户。你知道吗

我就是这样包装handler类的。重要的一点是我没有在__init__中实例化handler。在我的tornado代码中还有另一个路由,我想在authroyed请求命中该路由之后开始加载数据结构。(例如/reload/


executors = ThreadPoolExecutor(max_workers=4)


class CheckerInstance(object):
    def __init__(self, *args, **kwargs):
        self.handler = None
        self.is_loading = False
        self.is_live = False

    def init(self):
        if not self.handler:
            self.handler = pDataStructureHandler()
            self.handler.add_words_from_file(self.data_file_name)
            self.end_loading()
            self.go_live()

    def renew(self):
        self.handler = None
        self.init()



class CheckHandler(tornado.web.RequestHandler):
    async def get(self):
        query = self.get_argument("q", None).encode('utf-8')
        answer = query

        if not checker_instance.is_live:
            self.write(dict(answer=self.get_argument("q", None), confidence=100))
            return

        checker_response = await checker_instance.get_response(query)
        answer = checker_response[0]
        confidence = checker_response[1]

        if self.request.connection.stream.closed():
            return
        self.write(dict(correct=answer, confidence=confidence, is_cache=is_cache))

    def on_connection_close(self):
        self.wait_future.cancel()


class InstanceReloadHandler(BasicAuthMixin, tornado.web.RequestHandler):
    def prepare(self):
        self.get_authenticated_user(check_credentials_func=credentials.get, realm='Protected')

    def new_file_exists(self):
        return True

    def can_reload(self):
        return not checker_instance.is_loading

    def get(self):
        error = False
        message = None

        if not self.can_reload():
            error = True
            message = 'another job is being processed!'
        else:
            if not self.new_file_exists():
                    error = True
                    message = 'no new file found!'
            else:
                checker_instance.go_fake()
                checker_instance.start_loading()
                tornado.ioloop.IOLoop.current().run_in_executor(executors, checker_instance.renew)
                message = 'job started!'

        if self.request.connection.stream.closed():
            return
        self.write(dict(
            success=not error, message=message
        ))

    def on_connection_close(self):
        self.wait_future.cancel()


def main():
    app = tornado.web.Application(
        [
            (r"/", MainHandler),
            (r"/check", CheckHandler),
            (r"/reload", InstanceReloadHandler),
            (r"/health", HealthHandler),
            (r"/log-event", SubmitLogHandler),
        ],
        debug=options.debug,
    )
    checker_instance = CheckerInstance()

我希望此服务在checker_instance.renew开始在另一个线程中运行后继续响应。但事实并非如此。当我点击/reload/端点,renew函数开始工作时,对/check/的任何请求都会停止,等待重新加载过程完成,然后它再次开始工作。在加载数据结构时,服务应该处于fake模式,并使用用户作为输入发送的相同查询来响应用户。你知道吗

我已经用i5cpu(4个CPU核)在我的开发环境中测试了这段代码,它运行得很好!但是在生产环境(3个双线程CPU核)中,/check/端点停止请求。你知道吗


Tags: instanceselfnonemessagegetifisdef
1条回答
网友
1楼 · 发布于 2024-04-25 08:15:15

很难完全跟踪正在处理的事件,因为为了简洁起见,您已经删去了一些代码。例如,我在这里没有看到get_response实现,所以我不知道它是否在等待某个依赖于checker\u实例状态的东西。你知道吗

我要探讨的一个领域是在将checker_instance.renew传递到run_in_executor时的线程安全(或者似乎没有)。我觉得这有问题,因为您正在从一个单独的线程中改变checkernstance的单个实例的状态。虽然它可能不会显式地破坏事物,但它确实可能会引入奇怪的竞争条件或意外的内存拷贝,从而解释您正在经历的意外行为

如果可能的话,我会使您想要卸载到线程的任何加载行为都是完全自包含的,并且在加载数据时,将其作为函数结果返回,然后可以将其反馈到您的checker\u实例中。如果要按原样使用代码,则需要等待其结果的run_in_executor调用,然后更新checker\u实例。这意味着reload GET请求将等待数据加载。或者,在reload GET请求中,您可以通过这种方式ioloop.spawn_callback调用触发run_in_executor的函数,从而允许重新加载请求完成,而不是等待。你知道吗

相关问题 更多 >