如何将参数传递给线程?
我想用线程模块给范围从1到100的每个元素加5,想看看每个结果是在哪个线程里。我的代码几乎写完了,但我不知道怎么把参数传给threading.Thread。
import threading,queue
x=range(1,100)
y=queue.Queue()
for i in x:
y.put(i)
def myadd(x):
print(x+5)
for i in range(5):
print(threading.Thread.getName())
threading.Thread(target=myadd,args=x).start() #it is wrong here
y.join()
感谢dano,现在可以了。为了能以交互的方式运行,我把它重写成了:
方法1:以交互方式运行。
from concurrent.futures import ThreadPoolExecutor
import threading
x = range(1, 100)
def myadd(x):
print("Current thread: {}. Result: {}.".format(threading.current_thread(), x+5))
def run():
t = ThreadPoolExecutor(max_workers=5)
t.map(myadd, x)
t.shutdown()
run()
方法2:
from concurrent.futures import ThreadPoolExecutor
import threading
x = range(1, 100)
def myadd(x):
print("Current thread: {}. Result: {}.".format(threading.current_thread(), x+5))
def run():
t = ThreadPoolExecutor(max_workers=5)
t.map(myadd, x)
t.shutdown()
if __name__=="__main__":
run()
如果要传更多参数给ThreadPoolExecutor怎么办?我想用多进程模块计算1+3、2+4、3+45,一直到100+102。还有20+1、20+2、20+3,一直到20+100,用多进程模块怎么做呢?
from multiprocessing.pool import ThreadPool
do = ThreadPool(5)
def myadd(x,y):
print(x+y)
do.apply(myadd,range(3,102),range(1,100))
怎么解决这个问题呢?
3 个回答
来自:
import threading,queue
x=range(1,100)
y=queue.Queue()
for i in x:
y.put(i)
def myadd(x):
print(x+5)
for i in range(5):
print(threading.Thread.getName())
threading.Thread(target=myadd,args=x).start() #it is wrong here
y.join()
到:
import threading
import queue
# So print() in various threads doesn't garble text;
# I hear it is better to use RLock() instead of Lock().
screen_lock = threading.RLock()
# I think range() is an iterator or generator. Thread safe?
argument1 = range(1, 100)
argument2 = [100,] * 100 # will add 100 to each item in argument1
# I believe this creates a tuple (immutable).
# If it were a mutable object then perhaps it wouldn't be thread safe.
data = zip(argument1, argument2)
# object where multiple threads can grab data while avoiding deadlocks.
q = queue.Queue()
# Fill the thread-safe queue with mock data
for item in data:
q.put(item)
# It could be wiser to use one queue for each inbound data stream.
# For example one queue for file reads, one queue for console input,
# one queue for each network socket. Remembering that rates of
# reading files and console input and receiving network traffic all
# differ and you don't want one I/O operation to block another.
# inbound_file_data = queue.Queue()
# inbound_console_data = queue.Queue() # etc.
# This function is a thread target
def myadd(thread_name, a_queue):
# This thread-targetted function blocks only within each thread;
# at a_queue.get() and at a_queue.put() (if queue is full).
#
# Each thread targetting this function has its own copy of
# this functions local() namespace. So each thread will
# pause when the queue is empty, on queue.get(), or when
# the queue is full, on queue.put(). With one queue, this
# means all threads will block at the same time, when the
# single queue is full or when the single queue is empty
# unless we check for the number of remaining items in the
# queue before we do a queue.get() and if none remain in the
# queue just exit this function. This presumes the data is
# not a continues and slow stream like a network connection
# or a rotating log file but limited like a closed file.
# Let each thread continue to read from the global
# queue until it is empty.
#
# This is a bad use-case for using threading.
#
# If each thread had a separate queue it would be
# a better use-case. You don't want one slow stream of
# data blocking the processing of a fast stream of data.
#
# For a single stream of data it is likely better just not
# to use threads. However here is a single "global" queue
# example...
# presumes a_queue starts off not empty
while a_queue.qsize():
arg1, arg2 = a_queue.get() # blocking call
# prevent console/tty text garble
if screen_lock.acquire():
print('{}: {}'.format(thread_name, arg1 + arg2))
print('{}: {}'.format(thread_name, arg1 + 5))
print()
screen_lock.release()
else:
# print anyway if lock fails to acquire
print('{}: {}'.format(thread_name, arg1 + arg2))
print('{}: {}'.format(thread_name, arg1 + 5))
print()
# allows .join() to keep track of when queue finished
a_queue.task_done()
# create threads and pass in thread name and queue to thread-target function
threads = []
for i in range(5):
thread_name = 'Thread-{}'.format(i)
thread = threading.Thread(
name=thread_name,
target=myadd,
args=(thread_name, q))
# Recommended:
# queues = [queue.Queue() for index in range(len(threads))] # put at top of file
# thread = threading.Thread(
# target=myadd,
# name=thread_name,
# args=(thread_name, queues[i],))
threads.append(thread)
# some applications should start threads after all threads are created.
for thread in threads:
thread.start()
# Each thread will pull items off the queue. Because the while loop in
# myadd() ends with the queue.qsize() == 0 each thread will terminate
# when there is nothing left in the queue.
在这里,你需要传递一个元组,而不是只用一个单独的元素。
要创建一个元组,可以使用下面的代码。
dRecieved = connFile.readline();
processThread = threading.Thread(target=processLine, args=(dRecieved,));
processThread.start();
想了解更多,请参考 这里。
看起来你想手动创建一个线程池,用五个线程来计算所有100个结果的总和。如果是这样的话,我建议你使用 multiprocessing.pool.ThreadPool
来实现这个功能:
from multiprocessing.pool import ThreadPool
import threading
import queue
x = range(1, 100)
def myadd(x):
print("Current thread: {}. Result: {}.".format(
threading.current_thread(), x+5))
t = ThreadPool(5)
t.map(myadd, x)
t.close()
t.join()
如果你在使用Python 3.x,可以用 concurrent.futures.ThreadPoolExecutor
来替代:
from concurrent.futures import ThreadPoolExecutor
import threading
x = range(1, 100)
def myadd(x):
print("Current thread: {}. Result: {}.".format(threading.current_thread(), x+5))
t = ThreadPoolExecutor(max_workers=5)
t.map(myadd, x)
t.shutdown()
我觉得你原来的代码有两个问题。首先,你需要把一个元组传给 args
这个参数,而不是一个单独的元素:
threading.Thread(target=myadd,args=(x,))
不过,你还试图把 range(1,100)
返回的整个列表(或者如果你用的是Python 3.x,就是一个 range
对象)传给 myadd
,这其实不是你想要的。还有,你用队列是干什么的也不太清楚。也许你是想把它传给 myadd
?
最后一点要注意的是:Python使用了一个叫做全局解释器锁(GIL)的东西,这个锁会阻止多个线程同时使用CPU。这意味着在Python中,做一些需要大量计算的操作(比如加法)时,使用线程并不会提高性能,因为同一时间只有一个线程在运行。因此,在Python中,更推荐使用多个进程来并行处理这些计算密集型的操作。你可以通过把第一个例子中的 ThreadPool
替换为 from multiprocessing import Pool
来让上面的代码使用多个进程。在第二个例子中,你应该用 ProcessPoolExecutor
替代 ThreadPoolExecutor
。你可能还需要把 threading.current_thread()
替换为 os.getpid()
。
编辑:
这是处理需要传递两个不同参数的情况的方法:
from multiprocessing.pool import ThreadPool
def myadd(x,y):
print(x+y)
def do_myadd(x_and_y):
return myadd(*x_and_y)
do = ThreadPool(5)
do.map(do_myadd, zip(range(3, 102), range(1, 100)))
我们使用 zip
来创建一个列表,把范围内的每个变量配对在一起:
[(3, 1), (4, 2), (5, 3), ...]
然后我们用 map
来调用 do_myadd
,把列表中的每个元组传进去,而 do_myadd
使用元组展开(*x_and_y
),把元组展开成两个单独的参数,传给 myadd
。