Python 中的线程

线程可以提供并发性,即使它们不是真正的并行。

在我的上一篇文章中,我简要介绍了向程序添加并发性的方法。在本文中,我将重点介绍其中一种形式,这种形式因其对许多开发人员来说尤其令人沮丧而闻名:线程。我将探讨在 Python 中使用线程的方法,以及该语言在您这样做时对您的限制。

线程背后的基本思想很简单:正如计算机可以一次运行多个进程一样,您的进程也可以一次运行多个线程。当您希望程序在后台执行某些操作时,您可以启动一个新线程。主线程继续在前台运行,允许程序同时执行两件(或更多)事情。

启动新进程和新线程之间有什么区别?新进程完全独立于您现有的进程,从而为您提供更高的稳定性(因为进程不能相互影响或破坏),但也降低了灵活性(因为数据不能轻易地从一个线程流向另一个线程)。由于进程中的多个线程共享数据,因此它们可以更紧密、更容易地相互协作。

例如,假设您要从各种网站检索所有数据。我首选的用于从 Web 检索数据的 Python 包是“requests”包,可从 PyPI 获取。因此,我可以使用一个 for 循环,如下所示


length = {}

for one_url in urls:
    response = requests.get(one_url)
    length[one_url] = len(response.content)

for key, value in length.items():
    print("{0:30}: {1:8,}".format(key, value))

这个程序是如何工作的?它逐个遍历 URL 列表(作为字符串),计算内容的长度,然后将该内容存储在名为 length 的字典中。length 中的键是 URL,值是请求的 URL 内容的长度。

到目前为止,一切都很好;我已将其转换为一个完整的程序 (retrieve1.py),如清单 1 所示。我将九个 URL 放入一个名为 urls.txt 的文本文件(清单 2),然后计时检索每个 URL 所需的时间。在我的计算机上,总时间约为 15 秒,尽管时间显然存在一些差异。

清单 1. retrieve1.py


#!/usr/bin/env python3

import requests
import time

urls = [one_line.strip()
        for one_line in open('urls.txt')]

length = {}

start_time = time.time()

for one_url in urls:
    response = requests.get(one_url)
    length[one_url] = len(response.content)

for key, value in length.items():
    print("{0:30}: {1:8,}".format(key, value))

end_time = time.time()

total_time = end_time - start_time

print("\nTotal time: {0:.3} seconds".format(total_time))

清单 2. urls.txt


http://lerner.co.il
http://LinuxJournal.com
http://en.wikipedia.org
http://news.ycombinator.com
http://NYTimes.com
http://Facebook.com
http://WashingtonPost.com
http://Haaretz.co.il
http://thetech.com

使用线程改进计时

我该如何改进计时?嗯,Python 提供了线程。许多人认为 Python 的线程存在致命缺陷,因为由于 GIL(全局解释器锁),一次只能实际执行一个线程。如果您正在运行一个执行 серьез 计算的程序,并且您真的希望系统并行使用多个 CPU,那么这是正确的。

但是,我在这里有一个不同的用例。我感兴趣的是从不同的网站检索数据。Python 知道 I/O 可能需要很长时间,因此每当 Python 线程进行 I/O(即屏幕、磁盘或网络)时,它都会放弃控制权,并将 GIL 的使用权交给不同的线程。

在我的“retrieve”程序的情况下,这非常完美。我可以生成一个单独的线程来检索数组中的每个 URL。然后我可以等待 URL 并行检索,一次检查每个线程。这样,我可能会节省时间。

让我们从重写程序的核心开始。我希望将检索实现为一个函数,然后调用该函数以及一个参数——我要检索的 URL。然后,我可以通过创建一个新的 threading.Thread 实例来调用该函数,告诉新实例我不仅想在新线程中运行哪个函数,还想传递哪个(些)参数。代码如下所示


for one_url in urls:
    t = threading.Thread(target=get_length, args=(one_url,))
    t.start()

但是等等。get_length 函数将如何将内容长度传递给程序的其余部分?在线程程序中,您真的不能让各个线程修改内置的数据结构,例如列表。这是因为此类数据结构不是线程安全的,并且从一个线程执行诸如“append”之类的操作可能会导致各种问题。

但是,您可以使用“队列”数据结构,它是线程安全的,因此保证了一种通信形式。该函数可以将其结果放入队列中,然后,当所有线程都完成运行时,您可以从队列中读取这些结果。

下面是该函数的样子


from queue import Queue

queue = Queue()

def get_length(one_url):
    response = requests.get(one_url)
    queue.put((one_url, len(response.content)))

如您所见,该函数检索 one_url 的内容,然后将 URL 本身以及内容的长度放入元组中。然后将该元组放入队列中。

这是一个不错的小程序。主线程生成一个新线程,每个线程都运行 get_length。在 get_length 中,信息被卡在队列中。

问题是,现在需要从队列中检索东西。但是,如果您在启动线程后立即执行此操作,则您可能会在线程完成之前从队列中读取数据。因此,您需要“加入”线程,这意味着等待它们完成。一旦所有线程都已加入,您就可以从队列中读取它们的所有信息。

有几种不同的方法可以加入线程。一种简单的方法是创建一个列表,您将在其中存储线程,然后在创建每个新线程对象时将其附加到该列表


threads = [ ]

for one_url in urls:
    t = threading.Thread(target=get_length, args=(one_url,))
    threads.append(t)
    t.start()

然后,您可以迭代每个线程对象,加入它们


for one_thread in threads:
    one_thread.join()

请注意,当您以这种方式调用 one_thread.join() 时,调用会阻塞。也许这不是最有效的方法,但在我的实验中,检索所有 URL 仍然只需要大约一秒钟——快了 15 倍。

换句话说,Python 线程通常被认为是糟糕且无用的。但在这种情况下,您可以看到它们允许我在没有太多麻烦的情况下并行化程序,使不同的部分并发执行。

清单 3. retrieve2.py


#!/usr/bin/env python3

import requests
import time
import threading
from queue import Queue

urls = [one_line.strip()
        for one_line in open('urls.txt')]

length = {}
queue = Queue()
start_time = time.time()
threads = [ ]

def get_length(one_url):
    response = requests.get(one_url)
    queue.put((one_url, len(response.content)))

# Launch our function in a thread
print("Launching")
for one_url in urls:
    t = threading.Thread(target=get_length, args=(one_url,))
    threads.append(t)
    t.start()

# Joining all
print("Joining")
for one_thread in threads:
    one_thread.join()

# Retrieving + printing
print("Retrieving + printing")
while not queue.empty():
    one_url, length = queue.get()
    print("{0:30}: {1:8,}".format(one_url, length))

end_time = time.time()

total_time = end_time - start_time

print("\nTotal time: {0:.3} seconds".format(total_time))

注意事项

好消息是,这演示了当您执行大量耗时的 I/O 操作时,使用线程可能是多么有效。如果您正在用 Python 编写使用线程的服务器,这尤其是一个好消息;您可以为每个传入的请求打开一个新线程,和/或将每个新请求分配给现有的、预先创建的线程。同样,如果线程真的不需要以真正的并行方式执行,那么您就没事了。

但是,如果您的系统收到大量请求怎么办?在这种情况下,您的线程可能无法跟上。如果每个线程中执行的代码是 CPU 密集型的,则尤其如此。

在这种情况下,您不想使用线程。一个流行的选择——事实上,流行的选择——是使用进程。在我的下一篇文章中,我计划研究这些进程如何工作和交互。

Reuven Lerner 在世界各地的公司教授 Python、数据科学和 Git。您可以订阅他的免费每周“更好的开发者”电子邮件列表,并从他的书籍和课程中学习,网址为 http://lerner.co.il。Reuven 与他的妻子和孩子住在以色列的莫迪因。

加载 Disqus 评论