Flask应用中的zerorpc在Apache服务器启动时不工作,但在PyCharm中正常。
我正在使用Flask应用程序来显示通过Zero RPC订阅者读取的数据。在开发的时候,一切都运行得很好,因为我是在Pycharm中启动应用的。但是一旦部署到Apache服务器上,订阅者就无法获取任何数据。简单来说,虽然发布者在发布数据,但订阅者中的方法根本没有被调用。
谢谢
编辑:(添加更多信息):
在app_start.py中
from gevent import monkey
monkey.patch_all()
from flask import Flask
app = Flask(__name__)
import stratgy_subscriber
startegy_state_info_provider = stratgy_subscriber.StartegyStateInfoProvider()
@app.route('/')
def strategy_info():
return startegy_state_info_provider.ip_port_to_subscriber_map[('0.0.0.0', '4249')]
if __name__ == '__main__':
startegy_state_info_provider.start_subscriber('0.0.0.0', '4249')
app.run()
我使用以下代码连接到订阅者。在strategy_subscriber.py中
from gevent import monkey
monkey.patch_all()
class StartegyStateInfoProvider(object):
def __init__(self):
self.ip_port_to_subscriber_map = {}
def start_subscriber(machine_ip, port):
strategy_state_subscriber = StrategyStateSubscriber()
ip_port_to_subscriber_map[(machine_ip, port)] = strategy_state_subscriber
end_point = 'tcp://' + machine_ip + ':' + port
strategy_state_subscriber = zerorpc.Subscriber(strategy_state_subscriber)
strategy_state_subscriber.connect(end_point)
gevent.spawn(strategy_state_subscriber.run)
class StrategyStateSubscriber():
def __init__(self):
self.strategy_id_to_info_map = {}
def update_strategy_state(self, strategy_id, updated_strategy_state):
from datetime import datetime
current_time = datetime.strftime(datetime.now(), '%H:%M:%S')
updated_strategy_state.append(current_time)
print updated_strategy_state
strategy_id_to_info_map[strategy_id] = updated_strategy_state
这是发布者
endpoint = "tcp://0.0.0.0:4249"
publisher = zerorpc.Publisher()
publisher.bind(endpoint)
for a in xrange(0, 1000):
info = [False, a * a, 3 * a, a % 20, a % 50]
publisher.update_strategy_state(a, info)
if a < 50:
gevent.sleep(1)
elif a >= 50:
gevent.sleep(2)
在订阅之后,也就是调用gevent.spawn(strategy_state_subscriber.run)后,每当发布者发布数据时,应该调用update_strategy_state,但这并没有发生。
我遇到了以下异常:
[Wed May 28 11:24:03 2014] [error] Traceback (most recent call last):
[Wed May 28 11:24:03 2014] [error] File "/home/sricharan/git_ceres_viewer/src/strategy_state_subscription.py", line 68, in subscribe
[Wed May 28 11:24:03 2014] [error] subscriber = zerorpc.Subscriber(service)
[Wed May 28 11:24:03 2014] [error] File "/usr/local/lib/python2.7/dist-packages/zerorpc/core.py", line 371, in __init__
[Wed May 28 11:24:03 2014] [error] zmq_socket=zmq.SUB)
[Wed May 28 11:24:03 2014] [error] File "/usr/local/lib/python2.7/dist-packages/zerorpc/core.py", line 312, in __init__
[Wed May 28 11:24:03 2014] [error] super(Puller, self).__init__(zmq_socket, context=context)
[Wed May 28 11:24:03 2014] [error] File "/usr/local/lib/python2.7/dist-packages/zerorpc/socket.py", line 34, in __init__
[Wed May 28 11:24:03 2014] [error] self._events = Events(zmq_socket_type, context)
[Wed May 28 11:24:03 2014] [error] File "/usr/local/lib/python2.7/dist-packages/zerorpc/events.py", line 177, in __init__
[Wed May 28 11:24:03 2014] [error] self._socket = zmq.Socket(self._context, zmq_socket_type)
[Wed May 28 11:24:03 2014] [error] File "/usr/local/lib/python2.7/dist-packages/zerorpc/gevent_zmq.py", line 61, in __init__
[Wed May 28 11:24:03 2014] [error] self.__dict__["_state_event"] = gevent.hub.get_hub().loop.io(
[Wed May 28 11:24:03 2014] [error] File "/usr/lib/python2.7/dist-packages/gevent/hub.py", line 135, in get_hub
[Wed May 28 11:24:03 2014] [error] raise NotImplementedError('gevent is only usable from a single thread')
[Wed May 28 11:24:03 2014] [error] NotImplementedError: gevent is only usable from a single thread
经过一些研究,我发现:- gevent与Apache的多线程工作模型不兼容。
1 个回答
最后我搞定了。
import zerorpc
import gevent
from subscrclass import StrategyStateSubscriber
strategy_state_subscriber = StrategyStateSubscriber()
machine_ip = "127.0.0.1"
port = "5555"
end_point = "tcp://" + machine_ip + ":" + port
strategy_state_subscriber = zerorpc.Subscriber(strategy_state_subscriber)
strategy_state_subscriber.connect(end_point)
gevent.spawn(strategy_state_subscriber.run)
gevent.sleep(5) # This line was added
原来的代码在调用订阅者时就结束了,但实际上没有给代码任何机会去做事情。
如果我加上了 gevent.sleep(5)
,协程就有机会运行5秒钟,打印的内容就出现了。
一定要使用一些 gevent
的调用,简单的 time.sleep(5)
是不行的,因为它会阻塞,导致协程没有机会执行代码或输出内容。
在你的 PyCharm IDE 中,我可以想象你可能有类似的代码行,或者你执行操作的速度比较慢,这样就给了代码运行的机会。但这只是我的猜测 - 重要的是在你的代码中给协程留出运行的机会,才能在代码结束前执行。
完整示例(不包括 Flask 部分)
这里是一个完整的可运行示例
subscrclass.py
这个类实际上会接收来自发布者的调用。重要的是:
- 要有名称
update_strategy_state
- 要有两个参数
代码如下:
from datetime import datetime
class StrategyStateSubscriber():
def update_strategy_state(self, strategy_id, state):
print datetime.strftime(datetime.now(), "%H:%M:%S"), state
pub.py
这里是发布者。
它不需要导入任何东西来获取 StrategyStateSubscriber,只是创建了一个调用,去某个远程订阅者,这个订阅者可能实现了一个名为 update_strategy_state
的方法,并且有两个参数。
import gevent
import zerorpc
publisher = zerorpc.Publisher()
publisher.bind("tcp://0.0.0.0:5555")
for a in xrange(0, 1000):
publisher.update_strategy_state(a, [False, a * a, 3 * a, a % 20, a % 50])
gevent.sleep(1 if a < 50 else 2)
client.py
最后我们来到了客户端,它订阅了发布的调用,并将这些调用传递给在 StrategyStateSubscriber
类中实现的相关方法(由发布者决定)。
import zerorpc
import gevent
from subscrclass import StrategyStateSubscriber
strategy_state_subscriber = zerorpc.Subscriber(StrategyStateSubscriber())
strategy_state_subscriber.connect("tcp://127.0.0.1:5555")
gevent.spawn(strategy_state_subscriber.run)
gevent.sleep(5)
重要的是要允许生成的进程运行,这里我们调用了 gevent.sleep(5)
,其他调用或非阻塞的 I/O 操作也可以帮助实现。
运行代码
在第一个控制台中启动发布者:
$ python pub.py
在第二个控制台中,启动客户端:
$ python client.py
22:30:06 [False, 16, 12, 4, 4]
22:30:07 [False, 25, 15, 5, 5]
22:30:08 [False, 36, 18, 6, 6]
22:30:09 [False, 49, 21, 7, 7]
22:30:10 [False, 64, 24, 8, 8]
结论
- zerorpc 是一个非常强大的工具
- 在生成进程时,程序员有时会忘记给代码运行的机会。这就是这里的问题所在。
- 如果 Flask 网页需要快速返回,使用请求/响应(远程过程调用)会比让代码睡眠等待数据更快。
- 如果策略状态需要长期传递给客户端(例如通过 WebSockets),发布/订阅消息模式是最合适的选择。