Cassandra在阻塞同步请求的多进程中的同步执行

2024-04-24 00:41:17 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个应用程序,可以读取一系列XML文件,其中包含道路上车辆通行的日志。然后,应用程序处理每个记录,转换一些信息以匹配数据库列,并将其插入cassandra数据库(在远程服务器中运行单个节点[它位于内部网络中,因此连接不是真正的问题])。在数据库中插入数据后,每个文件的进程接着读取这些数据并为摘要表生成信息,这样就为在应用程序的不相关部分进行深入分析做好了准备。在

我使用多处理来并行处理许多XML文件,我遇到的问题是与cassandra服务器通信。流程示意图如下:

  1. 从XML文件读取记录
  2. 过程记录数据
  3. 将处理过的数据插入数据库(使用.execute_async(query)
  4. 重复1到3,直到XMl文件结束
  5. 等待我所做的所有插入查询的响应
  6. 从数据库读取数据
  7. 处理读取的数据
  8. 在汇总表中插入已处理的数据

现在,它在多个并行进程中平稳地运行,直到一个进程进入第6步时,它的请求(使用.execute(query)发出,这意味着我将等待响应)总是面临超时。我收到的错误是:

 Process ProcessoImportacaoPNCT-1:
Traceback (most recent call last):
  File "C:\Users\Lucas\Miniconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\ImportacaoArquivosPNCT.py", line 231, in run
    core.CalculoIndicadoresPNCT.processa_equipamento(sessao_cassandra, equipamento, data, sentido, faixa)
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 336, in processa_equipamento
    desvio_medias(sessao_cassandra, equipamento, data_referencia, sentido, faixa)
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 206, in desvio_medias
    veiculos = sessao_cassandra.execute(sql_pronto)
  File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 1594, in execute
    result = future.result(timeout)
  File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 3296, in result
    raise self._final_exception
ReadTimeout: code=1200 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 1, 'consistency': 'ONE'}

我已经将服务器中的超时更改为荒谬的时间量(例如500000000毫秒),我还尝试在客户端设置超时限制,使用.execute(query, timeout=3000),但仍然没有成功。在

现在,当更多的进程遇到相同的问题,并且多个进程中第1-3步的密集写入停止时,最后一个到达步骤6的进程成功地遵循了该过程,这让我觉得问题是cassandra优先处理我每秒请求的数万个insert请求,要么忽略我的读请求,要么把它放回行中。在

在我看来,解决这个问题的一个方法是,我可以要求cassandra优先处理我的read请求,这样我就可以继续处理,即使这意味着减慢其他进程。在

现在,作为补充说明,您可能会认为我的过程建模不是最佳的,我很乐意听取您的意见,但是对于这个应用程序的实际情况,这是我们认为的最好的方法。所以我们实际上已经考虑过优化这个过程了,但是(如果cassandra服务器能够处理它的话)这对我们的现实来说是最佳的。在

那么,TL;DR:在执行数万个同步查询时,有没有一种方法可以优先处理一个查询?如果没有,是否有一种方法可以在请求不超时的情况下每秒执行数万个插入查询和读取查询?另外,你建议我怎么解决这个问题?并行运行较少的进程显然是一种解决方案,但我正试图避免。所以,很想听听大家的想法。在

不可能在插入时存储数据,这样就不需要再次读取以进行摘要,因为XML文件非常庞大,内存也是一个问题。在


Tags: 文件数据inpy服务器数据库execute进程
1条回答
网友
1楼 · 发布于 2024-04-24 00:41:17

我不知道一种优先读取查询的方法。我相信在内部Cassandra有独立的线程池用于读写操作,所以这些线程池是并行运行的。如果看不到您正在执行的模式和查询,很难说您是在执行一个非常昂贵的读取操作,还是因为系统的写操作太多而无法跟上读取的速度。在

您可能希望在应用程序运行时监视Cassandra中发生的事情。有几种工具可以用来监视正在发生的事情。例如,如果您ssh到Cassandra节点并运行:

watch -n 1 nodetool tpstats

这将显示线程池统计信息(每秒更新一次)。您将能够看到队列是否已满,或者操作是否被阻塞。如果任何“丢弃”的计数器增加,则表明您没有足够的容量来执行您正在尝试的操作。如果是这样,那么可以通过添加更多节点来增加容量,或者更改模式和方法,使节点的工作更少。在

要监视的其他有用的东西(在linux上使用watch-n1连续监视):

^{pr2}$

使用top和iostat之类的linux命令监视节点,以检查CPU利用率和磁盘利用率。在

你所说的话给我的印象是,你的单个节点没有足够的容量来完成你给它的所有工作,所以你要么每单位时间处理更少的数据,要么添加更多的Cassandra节点来分散工作负载。在

由于分区有太多的行,我目前正面临自己的超时错误,所以我可能不得不向分区键添加基数,以使每个分区的内容更小。在

相关问题 更多 >