轮询MySQL数据库未获取新结果

0 投票
1 回答
24 浏览
提问于 2025-04-14 16:05

关于链接的那个所谓重复问题。那个问题问的是为什么在执行INSERT查询后,数据没有被提交到数据库。
在我的情况下,我是通过SELECT查询来查看表格。新的数据项是由另一个进程发布的(并且通过Adminer确认这些数据确实已经提交到数据库)。
如下面所说,第一次循环时,SELECT查询返回了所有预期的记录。在第二次及之后的循环中,即使其他进程添加了新记录,也什么都没有返回。
如果我停止并重新启动这个循环,第一次又能找到所有新记录,但除非再次重启,否则再也找不到其他记录。

我有一个由旧系统创建的记录表。我需要定期检查这个表,并在另一台服务器上的另一个数据库中创建新记录。
我能够添加一个新字段(posted_in_prodmon),默认值为0。
我创建了一个服务,查询原始表,找到新字段中值为0的记录。然后我在新数据库中创建新记录(使用一些来自原始记录的数据),并将新字段更新为1。如果表中没有新记录,进程会休眠10秒,然后再试一次。
新记录大约每20秒出现一次。

我使用的是python3.9和mysql-connector-python==8.0.33来做这个服务,服务器上运行的是MySQL 8.0.23。

我的问题是,无论我尝试什么,似乎在找到初始记录后就再也找不到任何新记录了。以下是我代码中相关的部分:

class Mysql_DB():
    def __init__(self, logger):
        self.logger = logger
        self.connected = False
        self.connection = None

    def is_connected(self):
        if self.connection:
            if self.connection.is_connected():
                return True
        try:
            self.logger.info(f'Not connected to mysql server... reconnecting')  
            self.connection= mysql.connector.connect(**self.dbconfig)
            return True

        except (mysql.connector.Error, IOError) as err:
            self.logger.error(f'Mysql connection failed: {err}')
            return False                


class Source(Mysql_DB):        
    def get_records(self):
        if self.is_connected():
            sql  = 'SELECT id, inspection_data '
            sql += 'FROM `1730_Vantage` '
            sql += 'WHERE part_fail = 1 '
            sql += 'AND posted_in_GFxPRoduction = 0 '
            sql += 'AND created_at > "2023-06-01" '
            sql += 'ORDER BY created_at DESC; '

        # with self.connection.cursor(buffered=True) as cursor:
        #     cursor.reset() # didn't help
        #     cursor.execute(sql)
        cursor = self.connection.cursor(buffered=True)
        cursor.reset() # didn't help
        cursor.execute(sql)
    return cursor


while True:
    for result in source.get_records():
        id = result[0]
        data = result[1]
        data_list = data.split("\t",3)
        
        created_at = f'{data_list[0]} {data_list[1]}'
        created_at = created_at.replace('_', '-')
        part = data_list[2]
        
        ts = int(datetime.timestamp(datetime.fromisoformat(created_at)))
        target.insert(ts, part)
        rowcount = source.update_record(id, created_at)
        if not rowcount:
            raise Exception(f'Failed to update id {id}')

    results.close()   # added after removing with block above... no change
    source.connection.close()  # this is the only thing that works... 
    sleep(10)

我确实读到可以用触发器来做这个,但那样似乎太复杂了。

我尝试过cursor.reset()。

我尝试过添加LIMIT 1(最开始就是这样)。

我需要关闭连接吗?这似乎有点浪费。

更新:我尝试去掉了下面Marcin建议的with块。他的逻辑听起来不错,但没有效果。
唯一有效的方法似乎是每次循环都关闭连接。

希望有人能想出一个更简单的解决方案。

1 个回答

1

你的 get_records() 方法并没有在它自己的范围内获取结果,这可能会导致问题,因为一旦你离开 with 这个代码块,游标就会被关闭。

撰写回答