引言
Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间。但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持。
Executor和Future
concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。
使用submit来操作线程池/进程池
我们先通过下面这段代码来了解一下线程池的概念
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
if __name__ == '__main__':
# 创建一个最大可容纳2个task的线程池
# pool = ThreadPoolExecutor(max_workers=2)
pool = ThreadPoolExecutor(max_workers=2)
# 往线程池里面加入一个task
future1 = pool.submit(return_future_result, ("hello",))
future2 = pool.submit(return_future_result, ("world",))
# 判断task1是否结束
print("任务1状态", future1.done())
print("任务2状态", future2.done())
# 查看task1返回的结果
print(future1.result())
print(future2.result())
print("任务1状态", future1.done())
print("任务2状态", future2.done())
我们根据运行结果来分析一下。我们使用submit方法来往线程池中加入一个task,submit返回一个Future对象,对于Future对象可以简单地理解为一个在未来完成的操作。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import freeze_support
import time
def return_future_result(message):
time.sleep(2)
return message
if __name__ == '__main__':
# 创建一个最大可容纳2个task的线程池
# pool = ThreadPoolExecutor(max_workers=2)
pool = ProcessPoolExecutor(max_workers=2)
# 往线程池里面加入一个task
future1 = pool.submit(return_future_result, ("hello",))
future2 = pool.submit(return_future_result, ("world",))
# 判断task1是否结束
print("任务1状态", future1.done())
print("任务2状态", future2.done())
# 查看task1返回的结果
print(future1.result())
print(future2.result())
print("任务1状态", future1.done())
print("任务2状态", future2.done())
freeze_support()
使用map/wait来操作线程池/进程池
除了submit,Exectuor还为我们提供了map方法,和内建的map用法类似,下面我们通过两个例子来比较一下两者的区别。
使用submit操作回顾
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/', 'http://www.111.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
if __name__ == '__main__':
# 我们可以使用with语句来确保线程被及时清理。
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 启动加载操作并用它的URL标记
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
print(future_to_url)
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r 报错: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
使用map
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
return conn.read()
if __name__ == '__main__':
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(load_url, URLS)
for url, result in zip(URLS, results):
print(url, result)
从运行结果可以看出,map是按照URLS列表元素的顺序返回的,并且写出的代码更加简洁直观,我们可以根据具体的需求任选一种。
第三种选择wait
wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一个是uncompleted(未完成的)。使用wait方法的一个优势就是获得更大的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默认设置为ALL_COMPLETED。
我们通过下面这个例子来看一下三个参数的区别
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
sleep(randint(1, 5))
return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_random_secs, x))
print(wait(futures))
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成。
ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f0b06c9bc88 state=finished returned str>,
<Future at 0x7f0b06cbaa90 state=finished returned str>,
<Future at 0x7f0b06373898 state=finished returned str>,
<Future at 0x7f0b06352ba8 state=finished returned str>,
<Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())
如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成。
ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f84109edb00 state=finished returned str>,
<Future at 0x7f840e2e9320 state=finished returned str>,
<Future at 0x7f840f25ccc0 state=finished returned str>},
not_done={<Future at 0x7f840e2e9ba8 state=running>,
<Future at 0x7f840e2e9940 state=running>})
Python GIL相关
要理解GIL的含义,我们需要从Python的基础讲起。像C++这样的语言是编译型语言,所谓编译型语言,是指程序输入到编译器,编译器再根据语言的语 法进行解析,然后翻译成语言独立的中间表示,最终链接成具有高度优化的机器码的可执行程序。编译器之所以可以深层次的对代码进行优化,是因为它可以看到整 个程序(或者一大块独立的部分)。这使得它可以对不同的语言指令之间的交互进行推理,从而给出更有效的优化手段。
与此相反,Python是解释型语言。程序被输入到解释器来运行。解释器在程序执行之前对其并不了解;它所知道的只是Python的规则,以及在执行过程 中怎样去动态的应用这些规则。它也有一些优化,但是这基本上只是另一个级别的优化。由于解释器没法很好的对程序进行推导,Python的大部分优化其实是 解释器自身的优化。
现在我们来看一下问题的症结所在。要想利用多核系统,Python必须支持多线程运行。作为解释型语言,Python的解释器必须做到既安全又高效。我们都知道多线程编程会遇到的问题,解释器要留意的是避免在不同的线程操作内部共享的数据,同时它还要保证在管理用户线程时保证总是有最大化的计算资源。
那么,不同线程同时访问时,数据的保护机制是怎样的呢?答案是解释器全局锁。从名字上看能告诉我们很多东西,很显然,这是一个加在解释器上的全局(从解释器的角度看)锁(从互斥或者类似角度看)。这种方式当然很安全,但是它有一层隐含的意思(Python初学者需要了解这个):对于任何Python程序,不管有多少的处理器,任何时候都总是只有一个线程在执行。
”为什么我全新的多线程Python程序运行得比其只有一个线程的时候还要慢?“许多人在问这个问题时还是非常犯晕的,因为显然一个具有两个线程的程序要比其只有一个线程时要快(假设该程序确实是可并行的)。事实上,这个问题被问得如此频繁以至于Python的专家们精心制作了一个标准答案:”不要使用多线程,请使用多进程”。
所以,对于计算密集型的,我还是建议不要使用python的多线程而是使用多进程方式,而对于IO密集型的,还是劝你使用多进程方式,因为使用多线程方式出了问题,最后都不知道问题出在了哪里,这是多么让人沮丧的一件事情!