SqlAlchemy和pyMysql连接池在具有多个DB连接的lambda上

2024-05-23 19:22:50 发布

您现在位置:Python中文网/ 问答频道 /正文

所以问题是我有多个数据库,我想在SqlAlchemy中使用相同的数据库池。这驻留在Lambda上,池在Lambda启动时创建。我希望后续的DB连接使用现有池

工作正常的是初始池连接bpConnect和对该连接的任何后续查询

不起作用的是companyConnect连接。我得到以下错误:

sqlalchemy.exc.StatementError: (builtins.AttributeError) 'XRaySession' object has no attribute 'cursor'

我为我的联系人准备了以下内容:

# Pooling
import sqlalchemy.pool as pool

#################### Engines ###################################################
def bpGetConnection():
    engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}"
    engine = create_engine(engine_endpoint, echo_pool=True)
    session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
    db = session()
    return db

bpPool = pool.StaticPool(bpGetConnection)

def companyGetConnection(database):
    engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}"
    compEngine = create_engine(engine_endpoint, pool=bpPool)
    session = XRaySessionMaker(bind=compEngine, autoflush=True, autocommit=False)
    db = Session()
    return db

#################### POOLING #############################################

def bpConnect():
    conn = bpPool.connect()
    return conn

def companyConnect(database):
    conn = companyGetConnection(database)
    return conn

#################################################################

在本例中,它们被称为:

from connections import companyConnect, bpConnect
from models import Company, Customers

def getCustomers(companyID):
    db = bpConnect()
    myQuery = db.query(Company).filter(Company.id == companyID).one()

    compDB = companyConnect(myQuery.database)
    customers = compDB.query(Customers).all()
    return customers

Tags: importtruedbreturnosdefenvironconn
1条回答
网友
1楼 · 发布于 2024-05-23 19:22:50

我找到了在lambda上使用动态池的方法:

class DBRegistry(object):
    _db = {}

    def get(self, url, **kwargs):
        if url not in self._db:
            engine = create_engine(url, **kwargs)
            Session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
            session = scoped_session(Session)
            self._db[url] = session
        return self._db[url]

compDB = DBRegistry()

def bpGetConnection():
    engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}?charset=utf8"
    engine = create_engine(engine_endpoint)
    session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
    db = session()
    return db

bpPool = pool.QueuePool(bpGetConnection, pool_size=500, timeout=11)

def bpConnect():
    conn = bpPool.connect()
    return conn

def companyConnect(database):
    engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}?charset=utf8"
    conn = compDB.get(engine_endpoint, poolclass=QueuePool)
    return conn

因此,基本上,它将使用一个池来实现主数据库上所需的持续连接,并使用另一个池来动态更改所需的数据库。当需要连接到其中一个公司数据库时,它将检查池的注册表中是否已经存在该池。如果该池不存在,它将创建一个并注册它

相关问题 更多 >