我已经在这里调查了几个“太多客户”的相关话题,但仍然无法解决我的问题,所以我不得不再次问这个问题,为我的具体情况。
基本上,我设置了本地Postgres服务器,需要执行数万个查询,所以我使用了Python psycopg2包。以下是我的代码:
import psycopg2
import pandas as pd
import numpy as np
from flashtext import KeywordProcessor
from psycopg2.pool import ThreadedConnectionPool
from concurrent.futures import ThreadPoolExecutor
df = pd.DataFrame({'S':['California', 'Ohio', 'Texas'], 'T':['Dispatcher', 'Zookeeper', 'Mechanics']})
# df = pd.concat([df]*10000) # repeat df 10000 times
DSN = "postgresql://User:password@localhost/db"
tcp = ThreadedConnectionPool(1, 800, DSN)
def do_one_query(inputS, inputT):
conn = tcp.getconn()
c = conn.cursor()
q = r"SELECT * from eridata where "State" = 'California' and "Title" = 'Dispatcher' limit 1;"
c.execute(q)
all_results = c.fetchall()
for row in all_results:
return row
tcp.putconn(conn, close=True)
cnt=0
for idx, row in df.iterrows():
cnt+=1
with ThreadPoolExecutor(max_workers=1) as pool:
ret = pool.submit(do_one_query, row["S"], row["T"])
print ret.result()
print cnt
代码在一个小的df下运行良好。如果我重复df 10000次,就会收到一条错误消息,说连接池已耗尽 . 我想我用过的连接已经被这条线切断了:
tcp.putconn(conn,close=True) 但我想事实上他们没有关门?我怎样才能避开这个问题?
你的问题是,你实际上没有返回到池的连接,而是用
请参阅此处的文档http://initd.org/psycopg/docs/pool.html
因此,如果将800个连接放入池中,在801个循环之后,会出现“耗尽错误”,因为连接池大小为零。
我一直在努力寻找有关ThreadedConnectionPool如何工作的真正详细信息。https://bbengfort.github.io/observations/2017/12/06/psycopg2-transactions.html还不错,但事实证明,getconn在连接可用之前阻塞的声明是不正确的。检查代码时,所有ThreadedConnectionPool添加的都是围绕AbstractConnectionPool方法的锁,以防止竞争条件。如果在任何一点尝试使用超过maxconn连接,将引发连接池耗尽池错误。
如果您想要比the accepted answer简单一点的东西,那么进一步将方法包装在一个提供阻塞的信号量中,直到连接可用为止,就可以做到这一点:
你需要在游泳池上面排队。
类似于以下的方法应该有效:
然后您可以通过以下方式拨打您的连接:
基本上,我借用了异步postgres的gevent示例,并对其进行了修改,以通过pyscopg2支持线程池。
https://github.com/gevent/gevent/blob/master/examples/psycopg2_pool.py
我在模块中添加了psycogreen的功能,所以您只需要导入并调用类。对类的每次调用都会在队列上堆叠一个新查询,但只使用特定大小的池。这样你就不会失去联系。这基本上类似于PGBouncer所做的,我认为这也可以消除你的问题。
https://pgbouncer.github.io/
相关问题 更多 >
编程相关推荐