sqlalchemy 编译器问题

1 投票
1 回答
995 浏览
提问于 2025-04-16 20:48

我有一个自定义的 InsertFromSelect 类,它的功能就是它名字所说的那样。输出的查询结果正是我需要的,但问题是我似乎无法执行它。

类:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Executable, ClauseElement

class InsertFromSelect( Executable, ClauseElement ):
    def __init__( self, table, select ):
        self.table = table
        self.select = select

@compiles( InsertFromSelect )
def visit_insert_from_select( element, compiler, **kw ):
    return "INSERT INTO %s (%s)" % (
        compiler.process(element.table, asfrom=True),
        compiler.process(element.select)
    )

查询:

import proxy.lib.sqlalchemy.compilers.insertFromSelect as ifs
from sqlalchemy import not_, literal_column, String, text
from proxy.database import engine

ifsTest = ifs.InsertFromSelect(
    user_ip_table,
    select(
        [
            user.id,
            ips_table.c.id, literal_column( "'inactive'", String ),
            literal_column( "'"+service_type+"'", String )
        ]
    ).where(
        not_( ips_table.c.id.in_(
            select(
                [user_ip_table.c.ip_id]
            )
        ) )
    ).where(
        ips_table.c.ip_address.in_( validIps )
    )
)

查询输出(打印 ifsTest):

INSERT INTO user_ip (SELECT 5144, ip.id, 'inactive', 'proxy' 
FROM ip 
WHERE ip.id NOT IN (SELECT user_ip.ip_id 
FROM user_ip) AND ip.ip_address IN (:ip_address_1, :ip_address_2, :ip_address_3, :ip_address_4, :ip_address_5, :ip_address_6))

我已经手动在数据库中测试过这个查询(当然是用参数替换的),结果正是我需要的,但我在用 sqlalchemy 执行时却遇到了问题。

I've tried:
connection = engine.connect()
res = connection.execute( ifsTest )
connection.close()

....但是没有任何数据被插入。你觉得我该怎么做呢?

1 个回答

2

因为你没有使用事务,所以在你的构造中添加“自动提交”选项:

class InsertFromSelect( Executable, ClauseElement ):
    _execution_options = \
        Executable._execution_options.union({'autocommit': True})
    def __init__( self, table, select ):
        self.table = table
        self.select = select

或者可以明确地调用它:

connection.execution_options(autocommit=True).execute(mystatement)

或者使用一个事务:

trans = connection.begin()
connection.execute(...)
trans.commit()

背景:

http://www.sqlalchemy.org/docs/core/connections.html#understanding-autocommit

撰写回答