如何在不复制select查询结果的情况下使用带线程的MYSQL select查询?

2024-03-29 10:37:39 发布

您现在位置:Python中文网/ 问答频道 /正文

短上下文:我使用mysql表选择一个值,通过使用API+值获取一个结果,结果保存到同一个表中

问题:如何同时处理多行?每当我使用线程启动函数时,它都会为每个线程选择相同的值(即游标为每个线程返回相同的值)。我需要为每个线程处理不同的值。这样我会减少一些时间

我的节目是

import requests
import os
import json
import pymysql
import threading

conn = pymysql.connect(host='localhost', user=USER, passwd=PASSWORD, db='sampledb',charset='utf8mb4',autocommit=True)

url = "http://www.someapi.com/somelink/"

cur = conn.cursor()

def main():
    cur.execute("select asset_id from getprocessid where status =%s LIMIT 1",("uploaded",))
    idofassets = cur.fetchone()[0]
    req = requests.Session()
    resp = req.get(url+str(idofassets))
    resp_json = json.loads(resp.text)
    actual = resp_json['getResponse']
    cur.execute("update getprocessid set class = %s ,status =%s where asset_id = %s",(str(actual),"completed",str(idofasset),))

while True:

    # For threading purpose i added

    thread1 = threading.Thread(target=main)
    thread2 = threading.Thread(target=main)
    thread3 = threading.Thread(target=main)

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join()
    thread2.join()
    thread3.join()


Tags: importjsontargetmain线程threadrespstart
2条回答

最简单的方法之一(语法是近似的)

每个线程在my_number变量中必须有自己的编号,该编号在所有线程中都是唯一的

thread INT DEFAULT NULL字段添加到结构中

线程试图通过以下方式保留一条非保留记录:

cur.execute("UPDATE getprocessid SET thread = %s WHERE thread IS NULL AND status=%s LIMIT 1",(my_number,"uploaded",))

然后线程处理此保留记录:

cur.execute("select asset_id from getprocessid where thread=%s",(my_number,))
row = cur.fetchone()
if row is not None:
    process the record

如果保留成功,则处理保留的记录。如果另一个线程覆盖了保留值,则不会返回任何记录,并且它将被IF检测到-跳过处理代码,线程尝试保留另一条记录

您的问题似乎分为两个主要的不同任务:

1-从getprocessidMySQL表获取结果

2-处理结果并更新相同的表(但不同的字段)

因此,优化代码的一种方法是让一个线程(可能是主线程)执行步骤1,然后将步骤2中的问题分配给3个线程:

import requests
import os
import json
import pymysql
import threading
#you can create these dynamically if you 
#want more (or less) threads
batches = [[], [], []]

conn = pymysql.connect(host='localhost', user=USER, 
  passwd=PASSWORD, 
db='sampledb',charset='utf8mb4',autocommit=True)

url = "http://www.someapi.com/somelink/"

cur = conn.cursor()

def fetch_and_split():
    cur.execute("select asset_id from getprocessid 
      where status =%s LIMIT 1",("uploaded",))
    results = cur.fetchall()
    count = 0
    #this populates the lists to be processed with the ids
    while count < size(results):
        cur_batch = batches[size(batches) % count ]
        cur_batch.append(results[count][0])
        count++

def process_and_update(batch):
    #each thread receives its own list
    for idofassets in batch:
        req = requests.Session()
        resp = req.get(url+str(idofassets))
        resp_json = json.loads(resp.text)
        actual = resp_json['getResponse']
        cur.execute("update getprocessid set class = %s 
          ,status =%s where asset_id = %s", 
          (str(actual),"completed",str(idofasset),))


while True:

    # For threading purpose i added
    # The main thread splits the results
    fetch_and_split()    
    # The other threads process the 
    # results and update the values
    thread1 = threading.Thread(target=process_and_update, args=(batches[0],))
    thread2 = threading.Thread(target=process_and_update, args=(batches[1],))
    thread3 = threading.Thread(target=process_and_update, args=(batches[2],))

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join()
    thread2.join()
    thread3.join()

相关问题 更多 >