Python高级-并发模型

并发模型

并发与并行的区别:

  • 并发:多个执行单元在一个时间段内交替执行(交替使用计算机CPU)
  • 并行:多个执行单元在同一时间同时执行(充分利用计算机多核CPU)

在 Python 中原生支持三种执行单元:进程,线程,协程,依赖这三种执行单元实现的并发模型底层原理各不相同,所试用的场景也不尽相同。

  • 进程:一个进程对应一个应用程序实例,被分配一块专属的内存以及共享部分 CPU 的执行时间;进程间通过管道,套接字,内存映射文件进行通信,过程中需要将数据进行序列化,通信成本较高
  • 线程:进程开启后,会启动一个初始化线程称为主线程,在主线程中可以调用操作系统 API 创建子线程,同一个进程下的线程间可以共享内存数据,通过线程实现的并发成本比进程更低
  • 协程:Python 的协程在事件循环的驱动下在单线程中执行,由于协程可以在运行过程中被暂停,将执行权交还给事件循环调度其它协程,因此多个协程也可以交替执行,达到并发执行的效果;由于是在单线程中实现的并发模型,因此任何运行的单元如何阻塞了都会导致整个事件循环系统阻塞,需要格外小心

下面会分别使用上述三种并发模型,实现一个代码实例:

  • 主程序中模拟网络请求获取结果,其中通过 sleep 方式模拟网络请求延时
  • 启动一个子程序并发地监听状态,在结果返回前实时在控制台打印旋转指针,模拟加载状态
  • 结果返回后关闭子程序,并打印结果

多进程版本:

# 多进程,旋转指针
# 注意:在 Jupyter Notebook 中启动子进程,
# 会出现错误(原因是在 fork 子进程的过程中,只会将顶层代码中定义的内容复制到子进程中),而当前运行的 Jupyter 代码块不属于顶层
import itertools
import time
from multiprocessing import Process, synchronize, Event

def spin(msg: str, done: synchronize.Event) -> None:
    for char in itertools.cycle(r'\|/-'):
        status = f'\r{char} {msg}'
        print(status, end='', flush=True)
        if done.wait(.1):
            break
    blanks = ' ' * len(status)
    print(f'\r{blanks}\r', end='')

def slow() -> int:
    time.sleep(3)
    return 42

def supervisor() -> int:
    done = Event()
    spinner = Process(target=spin, args=('thinking!', done))
    print(f'spinner object: {spinner}')
    spinner.start()
    result = slow()
    done.set()
    spinner.join()
    return result

def main() -> None:
    result = supervisor()
    print(f'Answer: {result}')

if __name__ == '__main__':
    main()

多线程版本:

# 多线程,旋转指针
import itertools
import time
from threading import Thread, Event

def spin(msg: str, done: Event) -> None:
    for char in itertools.cycle(r'\|/-'):
        status = f'\r{char} {msg}'
        print(status, end='', flush=True)
        if done.wait(.1):
            break
    blanks = ' ' * len(status)
    print(f'\r{blanks}\r', end='')

def slow() -> int:
    time.sleep(3)
    return 42

def supervisor() -> int:
    done = Event()
    spinner = Thread(target=spin, args=('thinking!', done))
    print(f'spinner object: {spinner}')
    spinner.start()
    result = slow()
    done.set()
    spinner.join()
    return result

def main() -> None:
    result = supervisor()
    print(f'Answer: {result}')

if __name__ == '__main__':
    main()

协程版本(使用 asyncio 异步框架):

import asyncio
from asyncio import Event
import itertools

async def spin(msg: str, done: Event) -> None: 
    for char in itertools.cycle(r'\|/-'):
        status = f'\r{char} {msg}'
        print(status, flush=True, end='')
        try:
            if await asyncio.wait_for(done.wait(), timeout=.1):
                break
        except asyncio.TimeoutError:
            pass
    blanks = ' ' * len(status)
    print(f'\r{blanks}\r', end='')

async def slow() -> int:
    await asyncio.sleep(3)
    return 42

def main() -> None:
    result = asyncio.run(supervisor())
    print(f'Answer: {result}')

async def supervisor() -> int:
    done = Event()
    spinner = asyncio.create_task(spin('thinking!', done))
    print(f'spinner object: {spinner}')
    result = await slow()
    done.set()
    return result

if __name__ == '__main__':
    main()  

全局解释器锁(GIL)

Python 进程的执行依赖于 Python 解释器(Global Interpreter),其启动一个线程负责程序执行和垃圾回收。

在执行程序代码前需要先持有全局解释器锁(Global Interpreter Lock),而同一时间只能有一个线程持有全局解释器锁,这就导致即使使用多线程的并发模型,也没法充分利用计算机多核CPU的资源,这对于计算密集型应用程序是非常不友好的(最新的 Python 版本好像解决了这个问题)。

概括下来 GIL 包含以下主要机制:

  • 同一时间只能由一个线程持有
  • 为了防止一个Python线程无限期持有GIL,解释器默认每5毫秒暂停当前Python线程,释放GIL
  • Python标准库中发起系统调用的函数均可释放GIL。这包括所有执行磁盘I/O、网络I/O的函数,以及time.sleep()

由于 GIL 的限制,导致多线程的并发模型无法适用于计算密集型的应用场景,如果需要越过 GIL 的限制则需要使用多进程并发模型,而多线程的并发模型更适用 IO密集型的应用场景。

计算密集型任务

下面程序构建了一个计算密集型任务,用于判断一个数是否是素数,在我的电脑上 is_prime(5_000_111_000_222_021) 运行下来花了3s。

image.png

如果将前面段落中并发模型中的三个示例代码中的 slow() 函数(用来模拟请求延时)都替换成

is_prime(5_000_111_000_222_021) 操作分别会产生什么影响?

这里直接给出答案:

  • 多进程并发模型,子进程的打印效果不会收到影响,多个进程间不会受 GIL 限制并且可以充分利用计算机的多核CPU
  • 多线程并发模型,子线程的打印效果不会收到影响,因为 GIL 每 5ms 会释放当前线程的持有权,所以不会导致子线程直接阻塞,但由于多线程间的 GIL 竞争轮换,会导致 is_prime(5_000_111_000_222_021) 的执行耗时更长
  • 协程并发模型,由于在单线程中执行且 is_prime(5_000_111_000_222_021) 执行期间并没有释放执行权,故执行期间,子协程都会被阻塞住