Python 多进程 Cassandra 连接
我正在尝试让我的Cassandra连接支持多进程。
在一些简单的情况下,我用过多进程和队列的组合,比如用Multiprocessing Queues(给子进程传一堆数字,然后获取结果)。
在我现在的DataGetter中,我导入了一个Cassandra工作类。请问Python的多进程在使用之前导入的对象时会有问题吗?
这是我DataGetter中的相关代码:
def read_data_multi(self, cass_worker, work_queue, done_queue):
#cass_worker = cbcassandra.CBcassandra(self.chost, self.keyspace)
cass_worker.open_cur()
for inq in iter(work_queue.get, 'STOP'):
data = self.read_data(cass_worker, inq[0], inq[1], inq[2], inq[3])
print data
done_queue.put(data)
cass.close_cur()
return True
def multi_get(self, readtype, dcname, vmname, timebucket_list):
workers = 2
work_queue = Queue()
done_queue = Queue()
processes = []
for tb in timebucket_list:
inq = (readtype, dcname, vmname, tb)
print inq
work_queue.put(inq)
for w in xrange(workers):
cass_worker = cbcassandra.CBcassandra(self.chost, self.keyspace)
p = Process(target=self.read_data_multi, args=(cass_worker, work_queue, done_queue))
p.start()
processes.append(p)
work_queue.put('STOP')
print processes
for p in processes:
p.join()
done_queue.put('STOP')
return done_queue
read_data
在我没有使用多进程的时候运行得很好。
这是我使用多进程时的输出。我的进程启动了,但它们无法建立连接:
[<Process(Process-1, started)>, <Process(Process-2, started)>]
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "preparedata.py", line 251, in read_data_multi
cass_worker.open_cur()
File "/root/cbcassandra.py", line 40, in open_cur
cluster, cur = self.getclustsess(self.keyspace)
File "/root/cbcassandra.py", line 33, in getclustsess
session = cluster.connect(keyspace)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 471, in connect
self.control_connection.connect()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 1355, in connect
self._set_new_connection(self._reconnect_internal())
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 1390, in _reconnect_internal
raise NoHostAvailable("Unable to connect to any servers", errors)
NoHostAvailable: ('Unable to connect to any servers', {'104.130.65.178': OperationTimedOut('errors=Timed out creating connection, last_host=None',)})
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "preparedata.py", line 251, in read_data_multi
cass_worker.open_cur()
File "/root/cbcassandra.py", line 40, in open_cur
cluster, cur = self.getclustsess(self.keyspace)
File "/root/cbcassandra.py", line 33, in getclustsess
session = cluster.connect(keyspace)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 471, in connect
self.control_connection.connect()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 1355, in connect
self._set_new_connection(self._reconnect_internal())
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 1390, in _reconnect_internal
raise NoHostAvailable("Unable to connect to any servers", errors)
NoHostAvailable: ('Unable to connect to any servers', {'104.130.65.178': OperationTimedOut('errors=Timed out creating connection, last_host=None',)})
1 个回答
0
我今天才发现,使用Cassandra进行多进程处理时,在启动新进程之前需要先关闭Cassandra的会话或集群。这和在多进程中使用django.db连接的情况很相似。
例如:
cass_worker.session.cluster.shutdown()
cass_worker.session.shutdown()
cass_worker = cbcassandra.CBcassandra(self.chost, self.keyspace)