爬虫学习笔记05-异步

news/2024/6/17 8:51:01

爬虫学习笔记05-异步

高性能异步爬虫:在爬虫中使用异步实现高性能的数据爬取操作。

异步的意义:通过一个线程利用其IO等待时间去做一些其他事情。

异步爬虫的方式

-1.多线程,多进程(不建议):
	好处:可以为相关阻塞的操作单独开启线程或者进程,阻塞操作就可以异步执行。
	弊端:无法无限制的开启多线程或者多进程
-2.线程池、进程池(适当的使用):
	好处:我们可以降低系统对进程或者线程创建和销毁的一个频率,从而很好的降低系统的开销。
	弊端:池中线程或进程的数量是有上限的。
	原则:线程池处理的是阻塞且耗时的操作。
 -3.单线程+异步协程(推荐):
 	event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足某些条件的时候,函数就会被循环执行
 	coroutine:协议对象,我们可以将协程对象注册到事件循环中,它会被事件循环调用。我们可以使用async关键字来定义一个方法,这个方法在调用时不会立刻被执行,而是返回一个协程对象。
	task:任务,它是对协程对象的进一步封装,包含了任务的各种状态。
	future:代表将来执行或还没有执行的任务,实际上和task没有本质区别。
	async:定义一个协程
	await:用来挂起阻塞方法的执行。

实战代码

#需求:爬取梨视频的视频数据
import requests
from lxml import etree
import re
import os
from multiprocessing.pool import Pool
if __name__=='__main__':
    url='https://www.pearvideo.com/category_5'
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.146 Safari/537.36'
    }
    page_text=requests.get(url=url,headers=headers).text
    tree=etree.HTML(page_text)
    li_list=tree.xpath('//*[@id="listvideoListUl"]/li')
    urls=[] #存储所有视频的连接
    for li in li_list:
        detail_url='https://www.pearvideo.com/'+li.xpath('./div/a/@href')[0]
        name=li.xpath('./div/a/div[2]/text()')[0]+'.mp4'
        print(detail_url,name)
        #对详情页的url发请求
        detail_page_text=requests.get(url=detail_url,headers=headers).text
        #从详情页中解析出视频的地址(url)
        #  ex='staUrl="(.*?)",vdoUrl'
        #  video_url=re.findall(ex,detail_page_text)[0]  这个方法已经不能用了
        tree=etree.HTML(detail_page_text)
        video_url=tree.xpath('//*[@id="JprismPlayer"]')
        print(video_url,'视频地址')
        dic={
            'name':name,
            'url':video_url,
        }
        urls.append(dic)

    def get_video_data(dic):
        url=dic['url']
        print(dic['name'],'正在下载......')
        data=requests.get(url=url,headers=headers).content
        #持久化存储操作
        with open(dic['name'],'wb') as fp:
            fp.write(data)
            print(dic['name'],'下载成功')

    #使用线程池对视频数据进行请求(较为耗时的阻塞操作)
    pool = Pool(2)
    #pool.map(get_video_data,urls)
    pool.close()
    pool.join()
  

协程介绍

import asyncio
async def request(url):
    print('正在请求的url是',url)
    print('请求成功',url)
    return url
    #async修饰的函数,调用之后返回的是一个协程对象
c=request('www..baidu.com')
    #创建一个事件循环对象
    #loop=asyncio.get_event_loop()
    #将协程对象注册到loop中,然后启动loop
    #loop.run_until_complete(c)

#task的使用
#loop=asyncio.get_event_loop()
#基于loop创建了一个task对象
#task=loop.create_task(c)
#print(task)
#loop.run_until_complete(task)
#print(task)

#future的使用
#loop=asyncio.get_event_loop()
#task=asyncio.ensure_future()
#print(task)
#loop.run_until_complete(task)
#print(task)

def callback_func(task):
    #result返回的就是任务对象中封装的协程对象对应函数的返回值
    print(task.result()):
#绑定回调
loop=asyncio.get_event_loop()
task=asyncio.ensure_future()
#将回调函数绑定到任务对象中
task.add_done_callback(callback_func())
loop.run_until_complete(task)

多任务异步协程

import asyncio
import time

async def request(url):
    print('正在下载',url)
    #在异步协程中如果出现了同步模块相关的代码,那么就无法实现异步
    await asyncio.sleep(2)
    print('下载完毕',url)

start=time.time()

urls=[
    'www.baidu.com',
    'www.sogou.com',
    'www.goubanjia.com'
]

#任务列表:存放多个任务对象
tasks=[]
for url in urls:
    c=request(url)
    task=asyncio.ensure_future(c)
    tasks.append(task)

loop=asyncio.get_event_loop()
#需要将任务列表封装到wait中
loop.run_until_complete(asyncio.wait(tasks))
print(time.time()-start)

Aiohttp简单应用

import requests
import asyncio
import time
start=time.time()
urls=[
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/rwl'
]
async def get_page(url):
    print('正在下载',url)
    #requests.get是基于同步的,必须使用基于异步的网络请求模块进行指定url的请求发送
    #aiohttp:基于异步网络请求的模块
    response=requests.get(url=url)
    print('下载完毕:',response.text)

tasks=[]

for url in urls:
    c=get_page(url)
    tasks=asyncio.ensure_future(c)
    tasks.append(task)

loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
emd=time.time()
print('总耗时',end-start)

Aiohttp实现多任务异步协程

环境安装:pip install aiohttp

使用该模块中的ClientSession

import requests
import asyncio
import time
import aiohttp

start=time.time()
urls=[
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/rwl'
]
async def get_page(url):
    async with aiohttp.ClientSession() as session:
        #get()、post():
        #headers,params/data,proxy='http://ip:port'
        async with await session.get(url) as response:
            #text()返回字符串形式的响应数据
            #read()返回的是二进制形式的响应数据
            #json()返回的就是json对象
            #注意:获取响应数据操作之前一定要使用await进行手动挂起
            page_text=response.text()
            print(page_text)

tasks=[]

for url in urls:
    c=get_page(url)
    task=asyncio.ensure_future(c)
    tasks.append(task)

loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
emd=time.time()
print('总耗时',end-start)

事件循环

理解成为一个死循环,去检测并执行某些代码

伪代码

任务列表=[任务1,任务2,任务3,……]
while True:
可执行的任务列表,已完成的任务列表=去任务列表中检查所有的任务,将‘可执行’和‘已完成’的任务返回
for 就绪任务 in 可执行的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除 已完成的任务
如果 任务列表 中的任务都已完成,则终止循环

快速上手

协程函数,定义函数时 async def 函数名
协程对象,执行 协程函数()得到的协程对象

async def func():
	pass
result=func()

注意:执行协程函数创建协程对象,函数内部代码不会执行。
如果想要运行协程函数内部代码,必须要将协程对象交给事件循环处理

import  asyncio
async def func():
    print("这是一个协程函数")
result=func()
#去生成或获取一个事件循环
#loop = asyncio.get_event_loop()
#将任务放到任务列表
#loop.run_until_complete(任务)
asyncio.run(result)#py3.7之后的更简便写法

await

await+可等待的对象(协程对象/Feture/Task对象->IO等待)

示例1:

import  asyncio
async  def func():
    print("来玩啊")
    response=await  asyncio.sleep(2)
    print("结束",response)
asyncio.run(func())

示例2:

import  asyncio
async def others():
    print("start")
    await  asyncio.sleep(2)
    print('end')
    return '返回值'
async  def func():
    print("执行协程函数内部代码")
    #遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。
    # 当前协程被挂起时,事件循环可以去执行其他协程(任务)
    response=await others() #协程对象
    print("IO请求结束,结果为:",response)   
asyncio.run(func())

示例3:

import  asyncio
async def others():
    print("start")
    await  asyncio.sleep(2)
    print('end')
    return '返回值'

async  def func():
    print("执行协程函数内部代码")
    #遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。
    # 当前协程被挂起时,事件循环可以去执行其他协程(任务)
    response1=await others() #协程对象
    print("IO请求结束,结果为:",response1)
    response2=await others() #协程对象
    print("IO请求结束,结果为:",response2)
asyncio.run(func())

await就是等待对应的值得到结果只会再继续往下走。

Task对象

在事件循环中添加多个任务的。
Tasks用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用asyncio.create_task()函数以外,还可以用低层次的loop.create_task()或ensure_future()函数。不建议手动实例化Task对象。

注意:asyncio.create_task()函数在Python3.7中被加入。在Python3.7之前,可以改用低层次的asyncio.ensure_future()函数。

示例1(不太常用):

import asyncio
async def func(a):
    print(a)
    await asyncio.sleep(2)
    print(a)
    return '返回值'
async  def main():
    print("main开始")
    #创建Task对象,将当前执行func函数任务添加到事件循环
    task1=asyncio.create_task(func(1))
    task2=asyncio.create_task(func(2))
    print('main结束')
    #当执行某协程遇到IO操作时,会自动化切换执行其他任务。
    #此处的await是等待相对应的协程全部执行完毕并获取结果。
    ret1=await  task1
    ret2=await  task2
    print(ret1,ret2)
asyncio.run(main())

示例2(常用):

import asyncio
async def func(a):
    print(a)
    await asyncio.sleep(2)
    print(a)
    return '返回值'
async  def main():
    print("main开始")
    #创建Task对象,将当前执行func函数任务添加到事件循环
    #当执行某协程遇到IO操作时,会自动化切换执行其他任务。
    #此处的await是等待相对应的协程全部执行完毕并获取结果。
    task_list=[
        asyncio.create_task(func(1),name='n1'),
        asyncio.create_task(func(2),name='n2')
    ]
    print('main结束')
    done,pending=await asyncio.wait(task_list,timeout=1) #done为任务返回值的集合,pending为等待时间
    print(done,pending)

asyncio.run(main())

示例3:

import asyncio
async def func(a):
    print(a)
    await asyncio.sleep(2)
    print(a)
    return '返回值'

task_list=[
    func(1),
    func(2)
]
done,pending=asyncio.run(asyncio.wait(task_list)) #done为任务返回值的集合,pending为等待时间
print(done,pending)

asyncio.Future对象

Task继承Future,Task对象内部await结果的处理是基于Future对象来的。

示例1:

import asyncio
async def main():
    #获取当前事件循环
    loop=asyncio.get_running_loop()
    #创建一个任务(Future对象),这个任务什么都不干
    fut=loop.create_future()
    #等待任务的最终结果,没有结果则会一直等下去
    await  fut
asyncio.run(main())

示例2:

import asyncio
async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result("666")
async def main():
    #获取当前事件循环
    loop=asyncio.get_running_loop()
    #创建一个任务(Future对象),这个任务什么都不干
    fut=loop.create_future()
    #创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值
    #即手动设置future任务的最终结果,那么fut就可以结束了。
    await loop.create_task(set_after(fut))
    #等待Future对象获取 最终结果,否则一直等下去
    data=await fut
    print(data)
asyncio.run(main())

concurrent.futures.Future对象

使用线程池、进程池实现异步操作时用到的对象

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

def func(value):
    time.sleep(1)
    print(value)

#创建线程池
pool=ThreadPoolExecutor(max_workers=5)
#创建进程池
#pool=ProcessPoolExecutor(max_workers=5)
for i in range(10):
    fut=pool.submit(func,1)
    print(fut)

以后写代码可能会存在交叉使用。例如:crm项目80%都是基于协程异步编程+MySQL(不支持)【线程、进程做异步编程】

import time
import asyncio
import concurrent.futures

def func1():
    #某个耗时操作
    time.sleep(2)
    return "SB"
async def main():
    loop=asyncio.get_running_loop()
    #1.Run in the default loop's executor(默认ThreadPoolExecutor)
    #第一步:内部会先调用ThreadPoolExecutor的sumbit方法去线程池中申请一个线程去执行
    #fun1函数,并返回一个concurrent.futures.Future对象
    #第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装成asyncio.Future对象
    #因为concurrent.futures.Future对象不支持await语法,所以需要包装为asyncio.Future对象,才能使用
    fut=loop.run_in_executor(None,func1)
    result=await fut
    print('default thread pool',result)
    #2.Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result=await loop.run_in_executor(pool,func1)
        print('custom thread pool',result)
    #Run in a custom Process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result=await loop.run_in_executor(pool,func1)
        print('custom process pool',result)
asyncio.run(main())

asyncio+不支持异步的模块

import asyncio
import requests

async def download_image(url):
    #发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)
    print("开始下载:",url)
    loop=asyncio.get_event_loop()
    #requests模块默认不支持异步操作,所以就使用线程池来配合实现了。
    future=loop.run_in_executor(None,requests.get,url)
    response=await future
    print('下载完成')
    #图片保存到本地文件
    file_name=url.rsplit('_')[-1]
    with open(file_name,mode='wb') as file_object:
        file_object.write(response.content)

if __name__=='__main__':
    url_list=[
        'https://www.baidu.com/img/dong_8f1d47bcb77d74a1e029d8cbb3b33854.gif',
    ]
    tasks=[download_image(url) for url in url_list]
    loop=asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

异步迭代器

什么是异步迭代器?

实现了__aiter__()__anext__()方法的对象。__anext__()必须返回一个awaitable对象。async for会处理异步迭代器的__anext__()方法所返回的可等待对象,直到其引发StopAsyncIteration异常。由PEP492引入。

什么是异步可迭代对象?

可在async for语句中被使用的对象。必须通过它的__aiter__()方法返回一个asynchronous iterator。由PEP492引入。

import asyncio
class Reader(object):
    #自定义异步迭代器
    def __init__(self):
        self.count=0
    async def readline(self):
        #await asyncio.sleep(1)
        self.count+=1
        if self.count==100:
            return None
        return self.count
    def __aiter__(self):
        return self
    async def __anext__(self):
        val=await self.readline()
        if val==None:
            raise StopAsyncIteration
        return val

async def func():
    obj=Reader()
    async for item in obj:
        print(item)

asyncio.run(func())

异步上下文管理器

此种对象通过定义__aenter__()__aexit__()方法来对async_with语句中的环境进行控制。由PEP 492引入

import asyncio

class AsyncContextManager:
    def __init__(self):
        self.conn = 1
    async def do_something(self):
        #异步操作数据库
        return 666
    async def __aenter__(self):
        #异步链接数据库
        self.conn=await asyncio.sleep(1)
        return self
    async def __aexit__(self,exc_type,exc,tb):
        #异步关闭数据库链接
        await asyncio.sleep(1)
#obj=AsyncContextManager()
async def func():
    async with AsyncContextManager() as f:
        result=await f.do_something()
        print(result)

asyncio.run(func())

uvloop

是asyncio的事件循环的替代方案。事件循环的效率>默认asyncio的事件循环。

pip3 install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
#编写asyncio的代码,与之前写的代码一致。
#内部的事件循环自动化会变为uvloop
asyncio.run(...)

注意:一个asgi->uvicorn内部使用的就是uvloop

异步redis

在使用python代码操作redis时,链接/操作/断开都是网络IO。

pip3 install aioredis

示例1:

import asyncio
import aioredis

async def execute(address,password):
    print("开始执行",address)
    #网络IO操作:创建redis连接
    redis=await aioredis.create_redis(address,password=password)
    #网络IO操作:在redis中设置哈希值car,内部再设三个键值对
    await redis.hmset_dict('car',key1=1,key2=2,key3=3)
    #网络IO操作:去redis中获取值
    result=await redis.hgetall('car',encoding='utf-8')
    print(result)
    redis.close()
    #网络IO操作:关闭redis连接
    await redis.wait_closed()
    print("结果",address)
asyncio.run(execute('redis://169.254.162.83',''))

示例2:

import asyncio
import aioredis

async def execute(address,password):
    print("开始执行",address)
    #网络IO操作:创建redis连接
    redis=await aioredis.create_redis(address,password=password)
    #网络IO操作:在redis中设置哈希值car,内部再设三个键值对
    await redis.hmset_dict('car',key1=1,key2=2,key3=3)
    #网络IO操作:去redis中获取值
    result=await redis.hgetall('car',encoding='utf-8')
    print(result)
    redis.close()
    #网络IO操作:关闭redis连接
    await redis.wait_closed()
    print("结果",address)

task_list=[
    execute('redis://47.93.4.197:6379','root!2345'),
    execute('redis://47.93.4.198:6379','root!2345')
]
asyncio.run(asyncio.wait(task_list))

异步MySQL

pip3 install aiomysql

示例1:

import asyncio
import aiomysql

async def execute():
    #网络IO操作:连接MySQL
    conn=await aiomysql.connect(host='127.0.0.1',port=3306,user='root',password='123',db='mysql',)
    #网络IO操作:创建CURSOR
    cur=await conn.cursor()
    #网络IO操作:执行SQL
    await cur.execute('SELECT Host,User From user')
    #网络IO操作:获取SQL结果
    result=await cur.fetchall()
    print(result)
    #网络IO操作:关闭链接
    await cur.close()
    conn.close()
asyncio.run(execute())

示例2:

import asyncio
import aiomysql

async def execute(host,password):
    print("开始",host)
    #网络IO操作:先去连接47.93.40.197,遇到IO则自动切换任务,去连接47.93.40.198:6379
    conn=await aiomysql.connect(host=host,port=3306,user='root',password=password,db='mysql',)
    #网络IO操作:遇到IO会自动切换任务
    cur=await conn.cursor()
    await cur.execute('SELECT Host,User From user')
    result=await cur.fetchall()
    print(result)
    await cur.close()
    conn.close()
    print("结束",host)
task_list=[
    execute('47.93.41.197',"root!2345"),
    execute('47.93.40.197',"root!2345")
]

asyncio.run(asyncio.wait(task_list))

FastAPI框架

pip3 install fastapi
pip3 install uvicorn(asgi内部基于uvloop)
示例1:

import asyncio
import uvicorn
from  fastapi import  FastAPI
app=FastAPI()
@app.get("/")
def index():
    """普通操作接口"""
    #某个IO操作10s
    return {"message":"Hello World"}

if __name__=='__main__':
    uvicorn.run("luffy:app",host="127.0.0.1",port=5000,log_level='info')

示例2:(程序名:FastAPI框架.py)

import asyncio
import uvicorn
import  aioredis
from aioredis import Redis
from fastapi import FastAPI
app=FastAPI()
#创建一个redis连接池
REDIS_POOL=aioredis.ConnectionsPool('redis://47.193.14.198:6379',password='root123',minsize=1,maxsize=10)
@app.get("/")
def index():
    """普通操作接口"""
    return {"message":"Hello World"}
@app.get("/red")
async def red():
    '''异步操作接口'''
    pinrt("请求来了")
    await asyncio.sleep(3)
    #连接池获取一个连接
    conn=await REDIS_POOL.acquire()
    redis=Redis(conn)
    #设置值
    await redis.hmset_dict('car',key1=1,key2=2,key3=3)
    #读取值
    result=await redis.hgetall('car',encoding='utf-8')
    print(result)
    #连接归还连接池
    REDIS_POOL.release(conn)
    return result

if __name__=='__main__':
    uvicorn.run("FastAPI框架:app",host='127.0.0.1',port=5000,log_level="info")

异步爬虫实例

pip3 install aiohttp
import asyncio
import aiohttp

async def fetch(session,url):
    print("发送请求:",url)
    async with session.get(url,verify_ssl=False) as response:
        text=await response.text()
        print("得到结果:",url,len(text))
        return text

async def main():
    async with aiohttp.ClientSession() as session:
        url_list=[
            'https://python.org',
            'https://www.baidu.com',
            'https://www.pythonav.com'
        ]
        tasks=[asyncio.create_task(fetch(session,url)) for url in url_list]
        done,pending=await asyncio.wait(tasks)

if __name__=='__main__':
    asyncio.run(main())

http://www.niftyadmin.cn/n/397495.html

相关文章

【Android -- 开源库】腾讯 TBS 浏览器 SDK 接入

简介 在 Android 开发项目中,经常会用到 Webview 。而 WebView 是出了名的坑,各种 Bug。腾讯 TBS 浏览服务面向应用开发商和广大开发者,提供浏览增强,内容框架,广告体系,H5游戏分发,大数据等服…

Flask中debug的用法详解

Flask默认是没有开启debug模式的,使用app.run()运行程序后,控制台输出* Debug mode: off。 在具体使用Flask时,可以根据应用场景选择是否使用debug。 开发模式:在程序员自己写代码的时候,开启debug模式,即…

AI炒股回报率500%?内行揭秘玄机

一篇来自佛罗里达大学的研究报告震惊了金融圈:用ChatGPT对公司新闻进行情绪分析,并按此在股市做多、卖空,最高可获得超过500%的投资回报率。虽然坊间对这份报告中惊人的回报率数据有所怀疑,但金融界正在因AI的介入发生改变。 摩根…

gitignore的语法

.gitignore 文件是用来告诉 Git 哪些文件或目录不应该被跟踪的。下面是一些常见的 .gitignore 文件语法规则: 空行或以#开头的行将被 Git 忽略,可以用作注释。 星号 * 代表零个或多个任意字符。例如, *.txt 会匹配所有的 .txt 文件。 问号 ? 代表一个…

《黑马程序员》分布式内存计算Spark环境部署

分布式内存计算Spark环境部署 注意 本小节的操作,基于:大数据集群(Hadoop生态)安装部署环节中所构建的Hadoop集群 如果没有Hadoop集群,请参阅前置内容,部署好环境。 简介 Spark是一款分布式内存计算引…

【C++】右值引用和移动语义

1.左值和右值 在C中,每个表达式或者是左值,或者是右值。 左值(lvalue):可以出现在赋值表达式左侧的值,例如变量名a、数据成员a.m、下标表达式a[n]、解引用表达式*p等。左值可以被赋值和取地址。右值(rvalue):只能出现…

基于matlab地形可视化仿真

一、前言 此示例说明了将常规可用的数字高程模型转换为 X3D 格式以用于虚拟现实场景的可能性。 作为地形数据源,已使用南旧金山 DEM 模型。场景中包含一个简单的预制波音 747 模型,以展示从多个来源即时创建虚拟场景的技术。 此示例需要映射工具箱。 二、…

如何将坐标数据(.xls)转换为矢量范围(.shp)

在工作中,我们经常会遇到要将坐标数据(.xls)转换为矢量范围(.shp)的情况,那该如何使用ArcMap完成这项工作呢 / 『思路:使用ArcMap将Excel数据以 XY 数据的方式导入,导出点要素&#…