Python 多进程 Cassandra 连接

3 投票
1 回答
1081 浏览
提问于 2025-04-18 13:05

我正在尝试让我的Cassandra连接支持多进程。

在一些简单的情况下,我用过多进程和队列的组合,比如用Multiprocessing Queues(给子进程传一堆数字,然后获取结果)。

在我现在的DataGetter中,我导入了一个Cassandra工作类。请问Python的多进程在使用之前导入的对象时会有问题吗?

这是我DataGetter中的相关代码:

def read_data_multi(self, cass_worker, work_queue, done_queue):
    #cass_worker = cbcassandra.CBcassandra(self.chost, self.keyspace)
    cass_worker.open_cur()
    for inq in iter(work_queue.get, 'STOP'):
        data = self.read_data(cass_worker, inq[0], inq[1], inq[2], inq[3])
        print data
        done_queue.put(data)
    cass.close_cur()
    return True

def multi_get(self, readtype, dcname, vmname, timebucket_list):
    workers = 2
    work_queue = Queue()
    done_queue = Queue()
    processes = []

    for tb in timebucket_list:
        inq = (readtype, dcname, vmname, tb)
        print inq
        work_queue.put(inq)

    for w in xrange(workers):
        cass_worker = cbcassandra.CBcassandra(self.chost, self.keyspace)
        p = Process(target=self.read_data_multi, args=(cass_worker, work_queue, done_queue))
        p.start()
        processes.append(p)
        work_queue.put('STOP')
    print processes
    for p in processes:
        p.join()

    done_queue.put('STOP')
    return done_queue

read_data在我没有使用多进程的时候运行得很好。

这是我使用多进程时的输出。我的进程启动了,但它们无法建立连接:

[<Process(Process-1, started)>, <Process(Process-2, started)>]  
Process Process-1:  
Traceback (most recent call last):  
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap 
self.run()  
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)  
File "preparedata.py", line 251, in read_data_multi  
cass_worker.open_cur()  
File "/root/cbcassandra.py", line 40, in open_cur 
cluster, cur = self.getclustsess(self.keyspace)  
File "/root/cbcassandra.py", line 33, in getclustsess
session = cluster.connect(keyspace)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 471, in connect
self.control_connection.connect()  
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 1355, in connect
self._set_new_connection(self._reconnect_internal())  
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 1390, in _reconnect_internal  
raise NoHostAvailable("Unable to connect to any servers", errors)
NoHostAvailable: ('Unable to connect to any servers', {'104.130.65.178': OperationTimedOut('errors=Timed out creating connection, last_host=None',)})  
Process Process-2:  
Traceback (most recent call last):  
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap 
self.run()  
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)  
File "preparedata.py", line 251, in read_data_multi
cass_worker.open_cur()  
File "/root/cbcassandra.py", line 40, in open_cur
cluster, cur = self.getclustsess(self.keyspace)  
File "/root/cbcassandra.py", line 33, in getclustsess
session = cluster.connect(keyspace)  
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 471, in connect
self.control_connection.connect()  
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 1355, in connect
self._set_new_connection(self._reconnect_internal())  
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 1390, in _reconnect_internal  
raise NoHostAvailable("Unable to connect to any servers", errors)  
NoHostAvailable: ('Unable to connect to any servers', {'104.130.65.178':   OperationTimedOut('errors=Timed out creating connection, last_host=None',)})  

1 个回答

0

我今天才发现,使用Cassandra进行多进程处理时,在启动新进程之前需要先关闭Cassandra的会话或集群。这和在多进程中使用django.db连接的情况很相似。

例如:

cass_worker.session.cluster.shutdown()
cass_worker.session.shutdown()
cass_worker = cbcassandra.CBcassandra(self.chost, self.keyspace)

撰写回答