动态生成新线程

2024-06-16 12:24:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我希望能够运行多个线程,而无需为每个要运行的线程实际生成新行。在下面的代码中,我不能动态地添加更多的accountid,也不能仅仅通过更改线程计数来增加线程数

例如,这是我现在的代码:

    import threading
    def get_page_list(account,thread_count):
        return list_of_pages_split_by_threads

    def pull_data(page_list,account_id):
        data = api(page_list,account_id)
        return data

    if __name__ == "__main__":
        accountIDs = [100]

        #of threads to make:
        thread_count = 3

        #Returns a list of pages ie : [[1,2,3],[4,5,6],[7,8,9,10]]
        page_lists =  get_page_list(accountIDs[0],thread_count)

        t1 = threading.Thread(target=pull_data, args=(page_list[0],accountIDs[0]))
        t2 = threading.Thread(target=pull_data, args=(page_list[1],accountIDs[0]))
        t3 = threading.Thread(target=pull_data, args=(page_list[2],accountIDs[0]))

        t1.start()
        t2.start()
        t3.start()
        t1.join()
        t2.join()
        t3.join()

这就是我想去的地方:

每当我想添加一个额外的线程,如果服务器可以处理它或添加额外的accountid,我不必复制代码?你知道吗

IE(这个例子是我想做的,但是下面的不起作用,它试图在进入下一个线程之前完成一个完整的页面列表)

if __name__ == "__main__":
    accountIDs = [100,101,103]
    thread_count = 3
    for account in accountIDs:
        page_lists =  get_page_list(account,thread_count)
        for pg_list in page_list:
            t1 = threading.Thread(target=pull_data, args=(pg_list,account))
            t1.start()
            t1.join()

Tags: targetdatacountpageargsaccount线程thread
2条回答

一种方法是使用PoolQueue。你知道吗

当队列中有项目时,池将继续工作,而不保留主线程。你知道吗

选择以下导入之一:

import multiprocessing as mp (for process based parallelization)
import multiprocessing.dummy as mp (for thread based parallelization)

创建工人、池和队列:

the_queue = mp.Queue() #store the account ids and page lists here


def worker_main(queue):
    while waiting == True:
        while not queue.empty():
            account, pageList = queue.get(True) #get an id from the queue
            pull_data(pageList, account)


waiting = True
the_pool = mp.Pool(num_parallel_workers, worker_main,(the_queue,))
#                                  don't forget the coma here  ^

accountIDs = [100,101,103]
thread_count = 3
for account in accountIDs:
    list_of_page_lists =  get_page_list(account, thread_count)
    for pg_list in page_list:
        the_queue.put((account, pg_list))

....

waiting = False #while you don't do this, the pool will probably never end. 
                #not sure if it's a good practice, but you might want to have
                #the pool hanging there for a while to receive more items   
the_pool.close()
the_pool.join()

另一个选项是先填充队列,然后创建池,仅当队列中有项目时才使用辅助进程。你知道吗

然后,如果有更多数据到达,则创建另一个队列、另一个池:

import multiprocessing.dummy as mp
#if you are not using dummy, you will probably need a queue for the results too
#as the processes will not access the vars from the main thread
#something like worker_main(input_queue, output_queue):   
#and pull_data(pageList,account,output_queue)
#and mp.Pool(num_parallel_workers, worker_main,(in_queue,out_queue))    
#and you get the results from the output queue after pool.join()

the_queue = mp.Queue() #store the account ids and page lists here


def worker_main(queue):
    while not queue.empty():
        account, pageList = queue.get(True) #get an id from the queue
        pull_data(pageList, account)

accountIDs = [100,101,103]
thread_count = 3
for account in accountIDs:
    list_of_page_lists =  get_page_list(account, thread_count)
    for pg_list in page_list:
        the_queue.put((account, pg_list))


the_pool = mp.Pool(num_parallel_workers, worker_main,(the_queue,))
#                                  don't forget the coma here  ^

the_pool.close()
the_pool.join()

del the_queue
del the_pool   

我不能让MP正常工作,所以我做了这个,而它似乎工作得很好。但MP可能是解决这个问题的更好方法

    #Just keeps track of the threads
    threads = []
    #Generates a thread for whatever variable thread_count = N
    for thread in range(thread_count):
        #function retrns a list of pages stored in page_listS, this ensures each thread gets a unique list.
        page_list = page_lists[thread]
        #actual fucntion for each thread to work
        t = threading.Thread(target=pull_data, args=(account,thread))
        #puts all threads into a list
        threads.append(t)
        #runs all the treads up
        t.start()
    #After all threads are complete back to the main thread.. technically this is not needed
    for t in threads:
        t.join()

我也不明白为什么你会“需要”。join()很好的答案是: what is the use of join() in python threading

相关问题 更多 >