在Python网络爬虫程序中使用线程池

Source

一、为什么需要使用线程池

首先,在python网络爬虫程序中使用多线程技术可以大大加快程序的执行时间。关于如何使用多线程,可以参考我的另一篇博文:
《python如何在网络爬虫程序中使用多线程》

假设一个最简单的多线程场景,就是我们想从一个页面上,爬取该页面上所有的图片。

当然了,这前期有一些工作要做,比如将所有的图片url都解析出来(本例中存到了列表img_url_list中),那么之后就可以发挥出多线程版本图片下载器的威力了。示例代码如下:

count = 0
path = 'd:/test'
pic_download_threads = []
for img_url in img_url_list:
    count += 1
    file_name = 'img_' + str(count)
    pic_download_threads.append(PicDownloader(str(count), img_url, file_name, path))

for working_thread in pic_download_threads:
    working_thread.start()

for working_thread in pic_download_threads:
    working_thread.join(5)

上面是常规的多线程处理方式,如果img_url_list里只有十几张图片的url,多线程版本将比单线程版本要快的多。

但我们回过头来考虑以上程序,假如img_url_list里面的成员数非常多的话,比如该页面可能有数百张图片需要下载时,此时会发生什么?如果像上面这么做,我们将启动数百个工作线程!!!而系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。并且,当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃。

在这种情况下,使用线程池就可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。

线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。

此外,当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃, 而使用线程池则可以有效地控制系统中并发线程的数量, 线程池的最大线程数参数(max_workers)可以控制系统中并发线程的数量不超过此数。

二、线程池的使用

2.1 线程池的类与方法

线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。

如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

Exectuor 提供了如下常用方法:

  • submit(fn, *args, **kwargs):
    将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。

  • map(func, *iterables, timeout=None, chunksize=1):
    该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。

  • shutdown(wait=True):
    关闭线程池。

程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。

Future 提供了如下方法:

  • cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。

  • cancelled():返回 Future 代表的线程任务是否被成功取消。

  • running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。

  • done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。

  • result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。

  • exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。

  • add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。

在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。

2.2 使用线程池的一般步骤

使用线程池来执行线程任务的常见步骤如下:

  1. 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
  2. 定义一个普通函数作为线程任务。
  3. 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
  4. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。

三、在爬虫程序中使用线程池的实例

下面的类PageContentProducer是一个生产者进程, 它的角色是作为生产者消费者模型中的生产者。
它有一个入参page_list,是需要处理的所有page的列表。

这里我们重点关注的是它如何通过线程池的方式来处理一个可能非常大的page_list列表,并解析出每一page页的内容。

# coding=utf-8

import requests
import multiprocessing
import re
import os
import concurrent.futures

from bs4 import BeautifulSoup as bs

class PageContentProducer(multiprocessing.Process):
    def __init__(self, page_list:list, output_queue:multiprocessing.JoinableQueue):
        multiprocessing.Process.__init__(self)
        self.daemon = True
        self.page_list = page_list
        self.content_list = []
        self.output_queue = output_queue
    
    def run(self):
        '''
        向队列中加入每一篇文章
        '''
        self.visit_all_page_to_get_content()
        
        for content in self.content_list:
            print(f"已加入: {content['title']}")
            self.output_queue.put(content)
        
    def visit_all_page_to_get_content(self):
        '''
        使用线程池处理所有的page, 并从每一页上提取所有的文章content
        '''
        # 在 3.8 版更改: max_workers 的默认值已改为 min(32, os.cpu_count() + 4)。这个默认值会
        # 保留至少 5 个工作线程用于 I/O 密集型任务。对于那些释放了 GIL 的 CPU 密集型任务,它最多会
        # 使用 32 个 CPU 核心。这样能够避免在多核机器上不知不觉地使用大量资源。
        # 现在 ThreadPoolExecutor 在启动 max_workers 个工作线程之前也会重用空闲的工作线程。
        
        # We can use a with statement to ensure threads are cleaned up promptly
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            # 向线程池提交任务
            future_to_page = {executor.submit(self.get_page_content, page_url) : page_url for page_url in self.page_list}
            # 遍历已经完成任务的线程来获取相关结果
            for future in concurrent.futures.as_completed(future_to_page):
                page = future_to_page[future]
                try:
                    # 获取任务执行结果
                    result_list = future.result()
                    self.content_list += result_list
                except Exception as e:
                    print(f'{page} generated an exception: {e}')
        print(f'共提取到{len(self.content_list)}条content记录')
        
    def get_page_content(self, page_url) -> list:
        '''
        线程工作函数, 访问page_url, 并提取该页面上的所有文章, 以列表形式返回
        '''
        content_list = []
        try:
            res = requests.get(url=page_url)
            if 200 == res.status_code:
                page_html = res.content.decode('utf-8')
                soup = bs(page_html, 'lxml')
                items = soup.find_all('li', onclick=re.compile('.*content_[0-9]*.*'))
                print(f'从page: {page_url} 上提取到了[{len(items)}]个content')

                for item in items:
                    content = {}
                    # 提取标题
                    item_title = item.find('a', href='#')
                    content['title'] = item_title.text
                    # 提取图片数目
                    item_num = item.find('span')
                    content['num'] = item_num.text
                    # 提取url, 格式为location.href='content_48388.html';
                    href = item['onclick']
                    item_url = href.split("'")[1]
                    content['url'] = 'https://xxxx.xyz/' + item_url
                    content_list.append(content)
        except Exception as e:
            print(f'从page: {page_url} 上添加content失败')
            print(repr(e))
        return content_list

涉及到的线程池代码函数主要是visit_all_page_to_get_content,它负责线程池的创建和管理,提交任务并获取任务的执行结果:

def visit_all_page_to_get_content(self):
    ...

工作线程函数是get_page_content,它负责具体的任务执行,即提取每一个page上的所有文章(content)信息:

def get_page_content(self, page_url) -> list:
    ...

这里创建了一个max_workers=10的线程池,如果不指定的话,max_workers 的默认值将由系统指定,其默认值为 min(32, os.cpu_count() + 4)。

由于线程池实现了上下文管理协议(Context Manage Protocol),因此,程序可以使用 with 语句来管理线程池,这样即可避免手动关闭线程池,交由系统自动关闭,如上面的程序所示。

随后我们用每一个page_url作为参数,通过executor.submit向该线程池提交任务,并使用模块函数concurrent.futures.as_completed获取已经执行完的futures, 然后通过future.result()来获取每一个线程任务的返回结果。