我有一个tornado网站服务,它将提供大约每分钟500个请求。所有这些请求都将命中一个特定端点。我用Cython
编译了一个C++
程序,并在tornado服务中使用它作为我的处理器引擎。每个到/check/
的请求都将触发C++
程序中的函数调用(我将其称为handler
),返回值将作为响应发送给用户。你知道吗
我就是这样包装handler
类的。重要的一点是我没有在__init__
中实例化handler
。在我的tornado代码中还有另一个路由,我想在authroyed请求命中该路由之后开始加载数据结构。(例如/reload/
)
executors = ThreadPoolExecutor(max_workers=4)
class CheckerInstance(object):
def __init__(self, *args, **kwargs):
self.handler = None
self.is_loading = False
self.is_live = False
def init(self):
if not self.handler:
self.handler = pDataStructureHandler()
self.handler.add_words_from_file(self.data_file_name)
self.end_loading()
self.go_live()
def renew(self):
self.handler = None
self.init()
class CheckHandler(tornado.web.RequestHandler):
async def get(self):
query = self.get_argument("q", None).encode('utf-8')
answer = query
if not checker_instance.is_live:
self.write(dict(answer=self.get_argument("q", None), confidence=100))
return
checker_response = await checker_instance.get_response(query)
answer = checker_response[0]
confidence = checker_response[1]
if self.request.connection.stream.closed():
return
self.write(dict(correct=answer, confidence=confidence, is_cache=is_cache))
def on_connection_close(self):
self.wait_future.cancel()
class InstanceReloadHandler(BasicAuthMixin, tornado.web.RequestHandler):
def prepare(self):
self.get_authenticated_user(check_credentials_func=credentials.get, realm='Protected')
def new_file_exists(self):
return True
def can_reload(self):
return not checker_instance.is_loading
def get(self):
error = False
message = None
if not self.can_reload():
error = True
message = 'another job is being processed!'
else:
if not self.new_file_exists():
error = True
message = 'no new file found!'
else:
checker_instance.go_fake()
checker_instance.start_loading()
tornado.ioloop.IOLoop.current().run_in_executor(executors, checker_instance.renew)
message = 'job started!'
if self.request.connection.stream.closed():
return
self.write(dict(
success=not error, message=message
))
def on_connection_close(self):
self.wait_future.cancel()
def main():
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/check", CheckHandler),
(r"/reload", InstanceReloadHandler),
(r"/health", HealthHandler),
(r"/log-event", SubmitLogHandler),
],
debug=options.debug,
)
checker_instance = CheckerInstance()
我希望此服务在checker_instance.renew
开始在另一个线程中运行后继续响应。但事实并非如此。当我点击/reload/
端点,renew
函数开始工作时,对/check/
的任何请求都会停止,等待重新加载过程完成,然后它再次开始工作。在加载数据结构时,服务应该处于fake
模式,并使用用户作为输入发送的相同查询来响应用户。你知道吗
我已经用i5cpu(4个CPU核)在我的开发环境中测试了这段代码,它运行得很好!但是在生产环境(3个双线程CPU核)中,/check/
端点停止请求。你知道吗
很难完全跟踪正在处理的事件,因为为了简洁起见,您已经删去了一些代码。例如,我在这里没有看到
get_response
实现,所以我不知道它是否在等待某个依赖于checker\u实例状态的东西。你知道吗我要探讨的一个领域是在将
checker_instance.renew
传递到run_in_executor
时的线程安全(或者似乎没有)。我觉得这有问题,因为您正在从一个单独的线程中改变checkernstance的单个实例的状态。虽然它可能不会显式地破坏事物,但它确实可能会引入奇怪的竞争条件或意外的内存拷贝,从而解释您正在经历的意外行为如果可能的话,我会使您想要卸载到线程的任何加载行为都是完全自包含的,并且在加载数据时,将其作为函数结果返回,然后可以将其反馈到您的checker\u实例中。如果要按原样使用代码,则需要等待其结果的
run_in_executor
调用,然后更新checker\u实例。这意味着reload GET请求将等待数据加载。或者,在reload GET请求中,您可以通过这种方式ioloop.spawn_callback
调用触发run_in_executor
的函数,从而允许重新加载请求完成,而不是等待。你知道吗相关问题 更多 >
编程相关推荐