Python 中的多进程

Python 的“multiprocessing”模块感觉像线程,但实际上启动的是进程。

许多人在开始使用 Python 时,都兴奋地听说该语言支持线程。而且,正如我在之前的文章中讨论的那样,Python 确实支持具有易于使用且方便的接口的本机级线程。

但是,这些线程有一个缺点,即全局解释器锁 (GIL),它确保一次只有一个线程运行。由于线程在每次使用 I/O 时都会让出 GIL,这意味着虽然线程在 CPU 绑定的 Python 程序中是一个坏主意,但当您处理 I/O 时,它们是一个好主意。

但是,即使您使用大量的 I/O,您也可能更喜欢充分利用多核系统。而在 Python 的世界中,这意味着使用进程。

在我的文章“在 Python 中启动外部进程”中,我描述了如何从 Python 程序中启动进程,但这些示例都演示了您可以在外部进程中启动程序。通常,当人们谈论进程时,它们的工作方式很像线程,但更加独立(并且开销也更大)。

因此,这是一个两难境地:您是启动易于使用的线程,即使它们并没有真正并行运行?还是启动新的进程,而您对这些进程几乎没有控制权?

答案介于两者之间。Python 标准库附带了“multiprocessing”模块,该模块提供了使用线程的感觉,但实际上是使用进程。

因此,在本文中,我将研究“multiprocessing”库,并描述它可以执行的一些基本操作。

多进程基础知识

“multiprocessing”模块旨在看起来和感觉上都像“threading”模块,并且它在很大程度上成功地做到了这一点。例如,以下是一个多线程程序的简单示例


#!/usr/bin/env python3

import threading
import time
import random

def hello(n):
    time.sleep(random.randint(1,3))
    print("[{0}] Hello!".format(n))

for i in range(10):
    threading.Thread(target=hello, args=(i,)).start()

print("Done!")

在此示例中,有一个函数 (hello) 打印“Hello!” 以及传递的任何参数。然后,它运行一个 for 循环,该循环运行 hello 十次,每次都在独立的线程中。

但是等等。在函数打印其输出之前,它首先休眠几秒钟。当您运行此程序时,您最终得到的输出会演示线程是如何并行运行的,而不一定是按照它们被调用的顺序运行的


$ ./thread1.py
Done!
[2] Hello!
[0] Hello!
[3] Hello!
[6] Hello!
[9] Hello!
[1] Hello!
[5] Hello!
[8] Hello!
[4] Hello!
[7] Hello!

如果您想确保在所有线程完成运行后打印“Done!”,您可以使用 join。为此,您需要获取 threading.Thread 的每个实例,将其放入列表中,然后在每个线程上调用 join


#!/usr/bin/env python3

import threading
import time
import random

def hello(n):
    time.sleep(random.randint(1,3))
    print("[{0}] Hello!".format(n))

threads = [ ]
for i in range(10):
    t = threading.Thread(target=hello, args=(i,))
    threads.append(t)
    t.start()

for one_thread in threads:
    one_thread.join()

print("Done!")

此版本的唯一区别在于它将线程对象放入列表 (“threads”),然后迭代该列表,逐个加入它们。

但是等等,我保证我要谈论的是“multiprocessing”,而不是线程。这是怎么回事?

好吧,“multiprocessing”旨在提供使用线程的感觉。这是如此真实,以至于我基本上可以对我刚刚展示的程序进行一些搜索和替换

  • threading → multiprocessing
  • Thread → Process
  • threads → processes
  • thread → process

结果如下


#!/usr/bin/env python3

import multiprocessing
import time
import random

def hello(n):
    time.sleep(random.randint(1,3))
    print("[{0}] Hello!".format(n))

processes = [ ]
for i in range(10):
    t = multiprocessing.Process(target=hello, args=(i,))
    processes.append(t)
    t.start()

for one_process in processes:
    one_process.join()

print("Done!")

换句话说,您可以使用 multiprocessing.Process 在新进程中运行函数,具有完全的并发性并利用多核。它的工作方式非常像线程,包括在您创建的 Process 对象上使用 joinProcess 的每个实例都代表计算机上运行的进程,您可以使用 ps 查看,并且(理论上)可以使用 kill 停止。

有什么区别?

令我惊讶的是,API 几乎相同,但幕后却发生了非常不同的事情。让我尝试用另一对示例来更清楚地说明区别。

对于任何使用线程和进程进行编程的人来说,也许最大的区别是线程共享全局变量。相比之下,独立的进程是完全独立的;一个进程不能影响另一个进程的变量。(在以后的文章中,我计划研究如何解决这个问题。)

这是一个简单的示例,说明在线程中运行的函数如何修改全局变量(请注意,我在这里所做的是为了证明一个观点;如果您真的想从线程中修改全局变量,则应该使用锁)


#!/usr/bin/env python3

import threading
import time
import random

mylist = [ ]

def hello(n):
    time.sleep(random.randint(1,3))
    mylist.append(threading.get_ident())   # bad in real code!
    print("[{0}] Hello!".format(n))

threads = [ ]
for i in range(10):
    t = threading.Thread(target=hello, args=(i,))
    threads.append(t)
    t.start()

for one_thread in threads:
    one_thread.join()

print("Done!")
print(len(mylist))
print(mylist)

该程序基本上没有改变,只是在顶部定义了一个新的空列表 (mylist)。该函数将其 ID 附加到该列表,然后返回。

现在,我这样做的方式不是很明智,因为 Python 数据结构不是线程安全的,并且从多个线程附加到列表最终会赶上您。但这里的重点不是演示线程,而是将它们与进程进行对比。

当我运行上面的代码时,我得到


$ ./th-update-list.py
[0] Hello!
[2] Hello!
[6] Hello!
[3] Hello!
[1] Hello!
[4] Hello!
[5] Hello!
[7] Hello!
[8] Hello!
[9] Hello!
Done!
10
[123145344081920, 123145354592256, 123145375612928,
 ↪123145359847424, 123145349337088, 123145365102592,
 ↪123145370357760, 123145380868096, 123145386123264,
 ↪123145391378432]

因此,您可以看到全局变量 mylist 由线程共享,并且当一个线程修改列表时,该更改对所有其他线程都可见。

但是,如果您将程序更改为使用“multiprocessing”,则输出看起来会有些不同


#!/usr/bin/env python3

import multiprocessing
import time
import random
import os

mylist = [ ]

def hello(n):
    time.sleep(random.randint(1,3))
    mylist.append(os.getpid())
    print("[{0}] Hello!".format(n))

processes = [ ]
for i in range(10):
    t = multiprocessing.Process(target=hello, args=(i,))
    processes.append(t)
    t.start()

for one_process in processes:
    one_process.join()

print("Done!")
print(len(mylist))
print(mylist)

除了切换到 multiprocessing 之外,此程序版本中最大的更改是使用 os.getpid 获取当前进程 ID。

此程序的输出如下


$ ./proc-update-list.py
[0] Hello!
[4] Hello!
[7] Hello!
[8] Hello!
[2] Hello!
[5] Hello!
[6] Hello!
[9] Hello!
[1] Hello!
[3] Hello!
Done!
0
[]

一切看起来都很棒,直到最后检查 mylist 的值时。它怎么了?程序不是附加到它了吗?

有点。问题是,此程序中没有“它”。每次使用“multiprocessing”创建一个新进程时,每个进程都有其自己的全局 mylist 列表的值。因此,每个进程都添加到自己的列表中,该列表在进程加入时消失。

这意味着对 mylist.append 的调用成功,但它在十个不同的进程中成功。当函数从其自己的进程中执行返回时,该进程的列表没有任何痕迹留下。主进程中唯一的 mylist 变量仍然为空,因为没有人附加到它。

队列来救援

在线程程序的世界中,即使您能够附加到全局 mylist 变量,您也不应该这样做。这是因为 Python 的数据结构不是线程安全的。实际上,只有一种数据结构保证是线程安全的——multiprocessing 模块中的 Queue 类。

队列是 FIFO(即“先进先出”)。任何想要向队列添加数据的人都可以在队列上调用 put 方法。任何想要从队列中检索数据的人都使用 get 命令。

现在,多线程程序世界中的队列可以防止与线程安全有关的问题。但在多进程的世界中,队列允许您弥合进程之间的差距,将数据发送回主进程。例如


#!/usr/bin/env python3

import multiprocessing
import time
import random
import os
from multiprocessing import Queue

q = Queue()

def hello(n):
    time.sleep(random.randint(1,3))
    q.put(os.getpid())
    print("[{0}] Hello!".format(n))

processes = [ ]
for i in range(10):
    t = multiprocessing.Process(target=hello, args=(i,))
    processes.append(t)
    t.start()

for one_process in processes:
    one_process.join()

mylist = [ ]
while not q.empty():
    mylist.append(q.get())

print("Done!")
print(len(mylist))
print(mylist)

在此版本的程序中,我直到后期才创建 mylist。但是,我很早就创建了 multiprocessing.Queue 的实例。Queue 实例旨在在不同的进程之间共享。此外,它可以处理可以使用“pickle”存储的任何类型的 Python 数据,这基本上意味着任何数据结构。

hello 函数中,它将对 mylist.append 的调用替换为对 q.put 的调用,将当前进程的 ID 号放在队列中。它创建的十个进程中的每一个都将自己的 PID 添加到队列中。

请注意,此程序分阶段进行。首先,它启动十个进程,然后它们都并行执行其工作,然后它等待它们完成(使用 join),以便它可以处理结果。它从队列中拉取数据,将其放入 mylist,然后对其检索到的数据执行一些计算。

队列的实现非常流畅且易于使用,很容易忘记这些队列正在使用一些幕后的操作系统魔法来保持事物协调。很容易认为您正在使用线程,但这正是 multiprocessing 的重点;它可能感觉像线程,但每个进程都单独运行。这为您提供了程序中的真正并发性,这是线程无法做到的。

结论

线程易于使用,但线程不会真正并行执行。Multiprocessing 是一个模块,它提供了一个几乎与线程 API 相同的 API。这并没有掩盖所有差异,但它在很大程度上确保了事情不会失控。

Reuven Lerner 在世界各地的公司教授 Python、数据科学和 Git。您可以订阅他的免费每周“更好的开发者”电子邮件列表,并从他的书籍和课程中学习,网址为 http://lerner.co.il。Reuven 与他的妻子和孩子住在以色列的莫迪因。

加载 Disqus 评论