threadpool in python

用python实现了一个简易的线程池

对于线程池的不用应用场景对如下几个feature作了分类

1.按工作总量分

  • 工作总量已知:

需要计算的工作在threadpool启动之前就已经定了,对于threadpool
只需要指定线程数量就好了,此类threadpool的工作流程如下,在启动前
设置好线程池的大小,分割好子工作放入workers列表中,每个线程每次
循环从列表中取出一个工作去做,如果列表已空,该线程退出。

这类线程池是最容易实现的一种,只需用一个_互斥锁_对workers队列做一个
互斥访问即可

  • 工作总量未知

这类线程池中的线程会大部分时间可能会一直处于阻塞状态,等待外部事件的到来,
与上一种不同之处在于,即使工作没了也不会退出,退出的条件需另外定义
典型的_生产者消费者_模式。workers队列的长度没有预先定义,在程序运行
的过程中动态增加。

实现的时候需要使用互斥锁和条件变量,互斥锁用于临界区(workers队列)的访问,
条件变量可以理解为workers队列非空这个条件。因此,对于生产者进程而言,在workers
中添加任务时,需要对这个条件变量作一个signal的操作,告诉阻塞在这个条件上的
线程说,”这里又有任务啦,大家来抢吧”。

2.按操作结果分

  • 操作结果需要保留:
  • 操作结果不需要保留:

代码

对于工作总量已知的情况,自己实现了一个简易的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import threading
import queue


class ThreadPool:
def __init__(self, pool_size=1):
self.pool_size = pool_size
self.queue_lock = threading.Lock()
self.cb_lock = threading.Lock()
self.workers = queue.Queue()
self.threads = list()
for i in range(pool_size):
thread = threading.Thread(target=self.thread_routine, args=[i])
self.threads.append(thread)

def add_worker(self, task, args=None, cb=None):
self.queue_lock.acquire()
self.workers.put({
'worker': task,
'args': args,
'cb': cb
})
self.queue_lock.release()
return

def thread_routine(self, thread_id=None):
while True:
# print(str(thread_id) + 'work')
self.queue_lock.acquire()
if self.workers.empty():
self.queue_lock.release()
# print(str(thread_id) + 'exit')
exit(0)
worker = self.workers.get()
self.queue_lock.release()

try:
result = worker['worker'](worker['args'])
if worker['cb'] is not None:
self.cb_lock.acquire()
worker['cb'](result)
self.cb_lock.release()
except Exception as err:
print(err)

def pool_start(self):
for thread in self.threads:
thread.start()

def pool_join(self):
for thread in self.threads:
thread.join()


if __name__ == '__main__':
def task(num):
count = 0
for i in range(num):
count += i + 1
return num, count

def cb(result):
return
# print(result)

tp = ThreadPool(100)
for i in range(1000):
tp.add_worker(task, i, cb)
tp.pool_start()
tp.pool_join()

线程池的其它实现以后有空了在更新