使用AIOHTTP和AIOFiles进行异步Python HTTP请求

9,195次阅读
没有评论

共计 13819 个字符,预计需要花费 35 分钟才能阅读完成。

使用典型的异步 Python 库处理数百个 HTTP 请求、磁盘写入和其他 I/O 密集型任务。

当在单线程同步语言的范围内构建应用程序时,局限性很快就会变得非常明显。我首先想到的是 writes:I/O 密集型任务的定义。将数据写入文件(或数据库)时,每个“写入”操作都会故意占用一个线程,直到写入完成。这对于确保大多数系统中的数据完整性非常有意义。例如,如果两个操作同时尝试更新数据库记录,哪一个是正确的?或者,如果脚本需要 HTTP 请求成功才能继续,那么我们如何继续操作,直到我们知道请求成功?

HTTP 请求是最常见的线程阻塞操作之一。当我们编写期望来自外部第三方的数据的脚本时,我们引入了无数的不确定性,这些不确定性只能由请求本身来回答,例如响应时间延迟、我们期望接收的数据的性质,或者请求是否会成功。即使使用我们有信心的 API,任何操作在完成之前也不一定会成功。因此,我们被“封锁”了。

随着应用程序的复杂性增加以支持更多的同时用户交互,软件正在远离线性执行的范例。因此,虽然我们可能不确定特定请求是否成功或数据库写入是否完成,但只要我们有办法优雅地处理和缓解这些问题,这是可以接受的。

一个值得异步执行的问题

您认为 Python 脚本执行数百个 HTTP 请求、解析每个响应并将输出写入单个文件需要多长时间?如果要在简单的 for 循环中使用请求,则需要等待相当长的时间让 Python 执行每个请求、打开文件、写入文件、关闭文件,然后继续执行下一个请求。

我们把 asyncio 提高脚本效率的能力放到实际测试中。我们将为数百个 URL 的每个任务执行两个 I/O 阻塞操作:执行和解析 HTTP 请求并将所需结果写入单个文件。我们实验的输入将是大量 URL,预期输出是从这些 URL 解析的元数据。让我们看看对数百个 URL 执行此操作需要多长时间。

该网站大约有 2000 篇自己发布的帖子,这使其成为这个小实验的绝佳实验对象。我创建了一个 CSV,其中包含这些帖子的 URL,这将是我们的输入。下面先睹为快:

输入样本

输入 CSV

样本输出

对于输入 CSV 中找到的每个 URL,我们的脚本将获取 URL、解析页面并将一些选择数据写入单个 CSV。结果将类似于以下示例:

我们的脚本将输出的示例

工作工具

我们需要三个核心 Python 库来实现这一目标:

  • Asyncio:Python 的基础库,用于运行异步 IO 绑定任务。该库在某种程度上已内置到 Python 核心语言中,引入了 async/await 关键字,分别表示函数何时异步运行以及何时等待此类函数。

  • Aiohttp:在客户端使用时,类似于 Python 的 requests 库,用于发出异步请求。或者,aiohttp 可以反向使用:作为应用程序 Web 服务器 来处理传入请求和提供响应,但这是另一个故事了。

  • Aiofiles:使写入磁盘(例如创建字节并将字节写入文件)成为一项非阻塞任务,这样即使多个任务绑定到同一个文件,多个写入也可以在同一线程上发生而不会相互阻塞。

# 安装必要的库
$ pip install asyncio aiohttp aiofiles

奖励:优化速度的依赖项

只需安装一些补充库,aiohttp 就可以更快地执行请求。这些库是 cchardet(字符编码检测)、aiodns(异步 DNS 解析)和 brotlipy(无损压缩)。我强烈建议使用下面方便提供的快捷方式安装它们(从我这里获取,我是互联网上的陌生人):

# 安装补充依赖项以加快请求速度
$ pip install aiohttp[speedups]

准备异步脚本 / 应用程序

我们将像任何其他 Python 脚本一样构造该脚本。我们的主模块 aiohttp_aiofiles_tutorial 将处理我们的所有逻辑。config.py 和 main.py 都位于主模块之外,并分别为我们的脚本提供一些基本配置和入口点:

# 我们的异步获取器 / 编写器的项目结构
/aiohttp-aiofiles-tutorial
├── /aiohttp_aiofiles_tutorial
│   ├── __init__.py
│   ├── fetcher.py
│   ├── loops.py
│   ├── tasks.py
│   ├── parser.py
│   └── /data  # Source data
│       ├── __init__.py
│       ├── parser.py
│       ├── tests
│       └── urls.csv
├── /export  # Destination for exported data
├── config.py
├── logger.py
├── main.py
├── pyproject.toml
├── Makefile
├── README.md
└── requirements.txt

/export 只是一个空目录,我们将在其中写入输出文件。

/data 子模块包含上面提到的输入 CSV,以及解析它的一些基本逻辑。没什么值得打电话回家的,但如果你好奇的话,可以在 Github 存储库上找到源代码。最后附上本文代码。

开始事情

我们卷起袖子,从强制性脚本“入口点”main.py 开始。这将启动 /aiohttp_aiofiles_tutorial 中的核心函数,称为 init_script():

#main.py
"""脚本入口点。"""
import asyncio

from aiohttp_aiofiles_tutorial import init_script


if __name__ == "__main__":
    asyncio.run(init_script())

init_script() 这看起来像是我们正在通过运行单个函数 / 协程 asyncio.run(),乍一看这似乎违反直觉。您可能会问,asyncio 的目的不就是同时运行多个协程吗?

它的确是!init_script() 是一个调用其他协程的协程。其中一些协程从其他协程创建任务,其他协程执行任务,等等。asyncio.run() 创建一个事件循环,直到目标协程完成为止(包括父协程调用的所有协程),该循环不会停止运行。因此,如果我们保持干净,asyncio.run() 就是一次性调用来初始化脚本。

初始化我们的脚本

这就是乐趣的开始。我们已经确定脚本的目的是输出单个 CSV 文件,这就是我们开始的地方:通过在整个脚本运行的上下文中创建并打开一个输出文件:

#aiohttp_aiofiles_tutorial/__init__.py
"""同时发出数百个请求并将响应保存到磁盘。"""
import aiofiles
from config import EXPORT_FILEPATH


async def init_script():
    """准备输出文件并启动任务创建 / 执行。"""
    async with aiofiles.open(EXPORT_FILEPATH, mode="w+") as outfile:
        await outfile.write("title,description,primary_tag,url,published_atn")
        # (我们其余的脚本逻辑将在这里执行).
        # ...

我们的脚本首先打开一个带有 aiofiles. 只要我们的脚本通过 在打开的异步文件的上下文中运行 async with aiofiles.open() as outfile:,我们就可以不断写入该文件,而不必担心打开和关闭文件。

将此与 Python 中处理文件 I/O 的同步 with open() as outfile:(默认)实现进行比较。通过使用,我们几乎 aiofiles 可以同时从多个源将数据写入同一文件。

EXPORT_FILEPATH 碰巧以 CSV (/export/hackers_pages_metadata.csv) 为目标。每个 CSV 都需要一行标题;因此,我们 await outfile.write() 在打开 CSV 后立即写入标题:

# 将一行写入由列标题组成的 CSV
...
await outfile.write("title,description,primary_tag,url,published_atn")

前进

下面是__init__.py 的完整版本,最终将使我们的脚本付诸实践。最值得注意的补充是协程的引入 execute_fetcher_tasks();我们将一次剖析这一块:

#aiohttp_aiofiles_tutorial/__init__.py
"""同时发出数百个请求并将响应保存到磁盘。"""
import asyncio
import time

import aiofiles
from aiofiles.threadpool.text import AsyncTextIOWrapper as AsyncIOFile
from aiohttp import ClientSession
from config import EXPORT_FILEPATH, HTTP_HEADERS

from .data import urls_to_fetch  # URLs parsed from a CSV
from .tasks import create_tasks  # Creates one task per URL


async def init_script():
    """准备输出文件并启动任务创建 / 执行。"""
    async with aiofiles.open(EXPORT_FILEPATH, mode="w+") as outfile:
        await outfile.write("title,description,primary_tag,url,published_atn")
        await execute_fetcher_tasks(outfile)
        await outfile.close()


async def execute_fetcher_tasks(outfile: AsyncIOFile):
    """
    打开异步 HTTP 会话并执行创建的任务。:param AsyncIOFile outfile:要写入的本地文件的路径。"""
    async with ClientSession(headers=HTTP_HEADERS) as session:
        task_list = await create_tasks(session, urls_to_fetch, outfile)
        await asyncio.gather(*task_list)

execute_fetcher_tasks() 分解主要是为了组织我们的代码。该协程接受 outfile 一个参数,该参数将作为我们最终解析的数据的目的地。逐行来看:

  • async with ClientSession(headers=HTTP_HEADERS) as session:与 Python 请求库不同,aiohttp 使我们能够打开一个客户端会话,该会话创建一个连接池,该连接池一次 最多允许 100 个活动连接。因为我们将发出 200 个以下的请求,所以获取所有这些 URL 所需的时间将与 Python 在正常情况下获取两个 URL 所需的时间相当。

  • create_tasks():我们要定义的这个函数接受三个参数。第一个是 ClientSession 我们之前刚刚打开一行的异步。接下来,我们有 urls_to_fetch 变量(之前在脚本中导入)。这是一个简单的 Python 字符串列表,其中每个字符串都是从我们之前的“输入”CSV 解析而来的 URL。该逻辑通过一个简单的函数在其他地方处理(对于本教程来说并不重要)。最后,outfile 被传递,因为我们稍后将写入该文件。使用这些参数,create_tasks() 将为 174 个 URL 中的每一个创建一个任务。其中每个都会将给定 URL 的内容下载到目标目录。该函数返回任务,但在我们发出指令之前不会执行它们,这是通过 …

  • asyncio.gather(*task_list):Asyncio 的 gather() 方法在当前运行的事件循环内执行一组任务。一旦开始,异步 I/O 的速度优势将立即显现出来。

创建异步任务

如果您还记得的话,PythonTask 包装了我们将来要执行的函数(协程)。此外,每个任务都可以暂时搁置以执行其他任务。在执行创建任务之前,必须传递预定义的协程以及适当的参数。

我分开 create_tasks() 返回一个 Python 任务列表,其中每个“任务”将执行获取我们的 URL 之一:

#aiohttp_aiofiles_tutorial/tasks.py
"""准备要执行的任务。"""
import asyncio
from asyncio import Task
from typing import List

from aiofiles.threadpool.text import AsyncTextIOWrapper as AsyncIOFile
from aiohttp import ClientSession

from .fetcher import fetch_url_and_save_data


async def create_tasks(session: ClientSession, urls: List[str], outfile: AsyncIOFile
) -> List[Task]:
    """
    创建 asyncio 任务来解析 HTTP 请求响应。:param ClientSession session: 异步 HTTP 请求会话。:param List[str] urls:要获取的资源 URL。:param AsyncIOFile outfile:要写入的本地文件的路径。:returns: List[Task]
    """
    task_list = []
    for i, url in enumerate(urls):
        task = asyncio.create_task(
            fetch_url_and_save_data(
                session,
                url,
                outfile,
                len(urls),
                i
            )
        )
        task_list.append(task)
    return task_list

关于 asyncio 任务的一些值得注意的事情:

  • 我们预先定义“工作要做”。a 的创建 Task 并不执行代码。我们的脚本将使用不同的参数同时运行同一函数 174 次。我们希望预先定义这些任务是有道理的。

  • 定义任务既快速又简单。瞬间,CSV 中的每个 URL 都会创建一个相应的任务并将其添加到 task_list.

  • 任务准备就绪后,只剩下一件事情要做了,那就是把它们全部启动并开始聚会。这就是 asyncio.gather(*task_list)__ init __ .py 中的行发挥作用的地方。

Asyncio 的 Task 对象本身就是一个类,具有其属性和方法,本质上提供了一个包装器,其中包含检查任务状态、取消任务等的方法。

执行我们的任务

回到 之前 create_tasks(),我们创建了每个任务,每个任务单独执行一个称为每个任务的方法 fetch_url_and_save_data()。这个函数做了三件事:

  • 通过 aiohttp 的会话上下文(由 处理 async with session.get(url) as resp:)向给定任务的 URL 发出异步请求

  • 将响应正文作为字符串读取。

  • html 通过传递给我们的最后一个函数,将响应正文的内容写入文件 parse_html_page_metadata():

#aiohttp_aiofiles_tutorial/fetcher.py
"""获取 URL、提取其内容并将解析后的数据写入文件。"""
from aiofiles.threadpool.text import AsyncTextIOWrapper as AsyncIOFile
from aiohttp import ClientError, ClientSession, InvalidURL
from logger import LOGGER

from .parser import parse_html_page_metadata


async def fetch_url_and_save_data(
    session: ClientSession,
    url: str,
    outfile: AsyncIOFile,
    total_count: int,
    i: int,
):
    """
     在解析之前从 URL 获取原始 HTML。:param ClientSession session: 异步 HTTP 请求会话。:param str url: 要获取的目标 URL。:param AsyncIOFile outfile:要写入的本地文件的路径。:param int Total_count:要获取的 URL 总数。:param int i:URL 总数中当前迭代的 URL。"""
    try:
        async with session.get(url) as resp:
            if resp.status != 200:
                pass
            html = await resp.text()
            page_metadata = await parse_html_page_metadata(html, url)
            await outfile.write(f"{page_metadata}n")
            LOGGER.info(f"Fetched URL {i} of {total_count}: {page_metadata}"
            )
    except InvalidURL as e:
        LOGGER.error(f"Unable to fetch invalid URL `{url}`: {e}")
    except ClientError as e:
        LOGGER.error(f"ClientError while fetching URL `{url}`: {e}")
    except Exception as e:
        LOGGER.error(f"Unexpected error while fetching URL `{url}`: {e}"
        )

当通过 aiohttp 获取 URL 时 ClientSession,调用.text()response() 上的方法将以字符串 await resp.text() 形式返回请求的响应。不要与 混淆,后者返回一个字节对象(对于拉取媒体文件或字符串以外的任何内容很有用)。.body()

如果您继续跟踪,我们现在已经深入了三个“上下文”:

  1. 我们通过打开一个 aiofiles.open() 上下文来开始我们的脚本,该上下文将保持打开状态,直到我们的脚本完成。outfile 这允许我们在脚本运行期间从任何任务写入数据。

  2. 将标头写入 CSV 文件后,我们使用 打开了一个持久的客户端请求会话 async with ClientSession() as session,这允许我们在会话打开时批量发出请求。

  3. 在上面的代码片段中,我们输入了第三个也是最后一个上下文:单个 URL 的响应上下文(通过 async with session.get(url) as resp)。与其他两个上下文不同,我们将进入和离开此上下文 174 次(每个 URL 一次)。

在每个 URL 响应上下文中,我们最终开始生成一些输出。这给我们留下了最后的逻辑(await parse_html_page_metadata(html, url)),它解析每个 URL 响应并从页面返回一些抓取的元数据,然后将所述元数据写入我们的 outfile 下一行 await outfile.write(f”{page_metadata}n”)。

将解析的元数据写入 CSV

您可能会问,我们计划如何从 HTML 页面中提取元数据?当然是使用 BeautifulSoup!有了 HTTP 响应的 HTML,我们就可以 bs4 解析每个 URL 响应并返回以下各列的值 outfile:title、description、Primary_tag、published at 和 url。

这五个值以逗号分隔的字符串形式返回,然后 outfile 作为单行写入 CSV。

#aiohttp_aiofiles_tutorial/parser.py
"""从原始 HTML 中解析元数据。"""
from bs4 import BeautifulSoup
from bs4.builder import ParserRejectedMarkup
from logger import LOGGER


async def parse_html_page_metadata(html: str, url: str) -> str:
    """
    将页面元数据从原始 HTML 提取到 CSV 行中。:param str html: 给定获取的 URL 的原始 HTML 源。:param str url:与提取的 HTML 关联的 URL。:returns: str
    """
    try:
        soup = BeautifulSoup(html, "html.parser")
        title = soup.title.string.replace(",", ";")
        description = (soup.head.select_one("meta[name=description]")
            .get("content")
            .replace(",", ";")
            .replace('"',"`")
            .replace("'","`")
        )
        primary_tag = (
            soup.head
            .select_one("meta[property='article:tag']")
            .get("content")
        )
        published_at = (
            soup.head
            .select_one("meta[property='article:published_time']")
            .get("content")
            .split("T")[0]
        )
        if primary_tag is None:
            primary_tag = ""return f"{title}, {description}, {primary_tag}, {url}, {published_at}"
    except ParserRejectedMarkup as e:
        LOGGER.error(f"Failed to parse invalid html for {url}: {e}"
        )
    except ValueError as e:
        LOGGER.error(f"ValueError occurred when parsing html for {url}: {e}"
        )
    except Exception as e:
        LOGGER.error(f"Parsing failed when parsing html for {url}: {e}"
        )

运行珠宝,运行脚本

让我们带这个坏男孩去兜风吧。我在__init__.py 中添加了一个计时器来记录脚本持续时间所经过的秒数:

#aiohttp_aiofiles_tutorial/__init__.py
"""同时发出数百个请求并将响应保存到磁盘。"""
import time
from time import perf_counter as timer

...


async def init_script():
    """Prepare output file & kickoff task creation/execution."""
    start_time = timer()  # Add timer to function
    async with aiofiles.open(EXPORT_FILEPATH, mode="w+") as outfile:
        await outfile.write("title,description,primary_tag,url,published_atn")
        await execute_fetcher_tasks(outfile)
        await outfile.close()
    LOGGER.success(f"Executed {__name__} in {time.perf_counter() - start_time:0.2f} seconds."
     )  # Log time of execution


...

make run 如果您按照存储库进行操作(或者只是输入),请混合该 mfing 命令 python3 main.py。系好安全带:

# 在约 3 秒内获取 174 页后日志的尾部
...
16:12:34 PM | INFO: Fetched URL 165 of 173: Setting up a MySQL Database on Ubuntu, Setting up MySQL the old-fashioned way: on a linux server, DevOps, https://hackersandslackers.com/set-up-mysql-database/, 2018-04-17

16:12:34 PM | INFO: Fetched URL 164 of 173: Dropping Rows of Data Using Pandas, Square one of cleaning your Pandas Dataframes: dropping empty or problematic data., Data Analysis, https://hackersandslackers.com/pandas-dataframe-drop/, 2018-04-18

16:12:34 PM | INFO: Fetched URL 167 of 173: Installing Django CMS on Ubuntu, Get the play-by-play on how to install DjangoCMS: the largest of three major CMS products for Python`s Django framework., Software, https://hackersandslackers.com/installing-django-cms/, 2017-11-19

16:12:34 PM | INFO: Fetched URL 166 of 173: Starting a Python Web App with Flask & Heroku, Pairing Flask with zero-effort container deployments is a deadly path to addiction., Architecture, https://hackersandslackers.com/flask-app-heroku/, 2018-02-13

16:12:34 PM | INFO: Fetched URL 171 of 173: Another 'Intro to Data Analysis in Python Using Pandas' Post, An introduction to Python`s quintessential data analysis library., Data Analysis, https://hackersandslackers.com/intro-python-pandas/, 2017-11-16

16:12:34 PM | INFO: Fetched URL 172 of 173: Managing Python Environments With Virtualenv, Embrace core best-practices in Python by managing your Python packages using virtualenv and virtualenvwrapper., Software, https://hackersandslackers.com/python-virtualenv-virtualenvwrapper/, 2017-11-15

16:12:34 PM | INFO: Fetched URL 170 of 173: Visualize Folder Structures with Python’s Treelib, Using Python`s treelib library to output the contents of local directories as visual tree representations., Data Engineering, https://hackersandslackers.com/python-tree-hierachies-treelib/, 2017-11-17

16:12:34 PM | INFO: Fetched URL 169 of 173: Merge Sets of Data in Python Using Pandas, Perform SQL-like merges of data using Python`s Pandas., Data Analysis, https://hackersandslackers.com/merge-dataframes-with-pandas/, 2017-11-17

16:12:34 PM | INFO: Fetched URL 168 of 173: Starting an ExpressJS App, Installation guide for ExpressJS with popular customization options., JavaScript, https://hackersandslackers.com/create-an-expressjs-app/, 2017-11-18

16:12:34 PM | SUCCESS: Executed aiohttp_aiofiles_tutorial in 2.96 seconds.

用 Python 编写异步脚本肯定需要更多的努力,但不会增加数百或数千倍的努力。即使您追求的不是速度,处理大型应用程序的数量也使 asyncio 变得绝对至关重要。例如,如果您的聊天机器人或网络服务器正在处理用户的请求,那么当第二个用户同时尝试与您的应用程序交互时会发生什么?通常答案是什么:用户 1 得到了他们想要的东西,而用户 2 则被困在阻塞的线程中。

源代码:github.com/hackersandslackers/aiohttp-aiofiles-tutorial/tree/master/aiohttp_aiofiles_tutorial/data 文章来源地址 https://www.toymoban.com/diary/python/582.html

到此这篇关于使用 AIOHTTP 和 AIOFiles 进行异步 Python HTTP 请求的文章就介绍到这了, 更多相关内容可以在右上角搜索或继续浏览下面的相关文章,希望大家以后多多支持 TOY 模板网!

    正文完
     0
    Yojack
    版权声明:本篇文章由 Yojack 于1970-01-01发表,共计13819字。
    转载说明:
    1 本网站名称:优杰开发笔记
    2 本站永久网址:https://yojack.cn
    3 本网站的文章部分内容可能来源于网络,仅供大家学习与参考,如有侵权,请联系站长进行删除处理。
    4 本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
    5 本站所有内容均可转载及分享, 但请注明出处
    6 我们始终尊重原创作者的版权,所有文章在发布时,均尽可能注明出处与作者。
    7 站长邮箱:laylwenl@gmail.com
    评论(没有评论)