我是靠谱客的博主 爱听歌灰狼,最近开发中收集的这篇文章主要介绍提高爬虫效率之python并发编程,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

python并发编程分为三个方向,分别是 多进程,多线程,多协程 。根据具体情况选择具体的方式,能提高程序的速度。

三种类别的对比

  • 多进程

    • 优点:可以利用多个CPU并行运算(多核计算机)
    • 缺点:占用的资源最多,可启动的数目比线程少
    • 适用于:CPU密集型计算(使用CPU较多,IO较少)
  • 多线程

    • 优点:相比进程,更轻量级,占用资源少
    • 缺点:
      • 相比进程:多线程只能并发执行,不能利用多个CPU (GIL)
      • 相比协程:启动数目有限制,占用内存资源,有线程切换的开销
    • 适用于:IO密集型计算,同事运行的任务数目要求不多
  • 多协程:

    • 优点:内存开销最小,启动的协程数目最多
    • 缺点:支持的库有限制(aiohttp ),代码实现复杂
    • 适用于:IO密集型计算,需要超多任务运行,但有现成的库支持的场景
  • 三种方式的理解:

    有一个老板想要开个工厂进行生产口罩

    他需要花一些财力物力制作一条生产线,这个生产线上有很多的器件以及材料这些所有的 为了能够生产口罩而准备的资源称之为:进程

    只有生产线是不能够进行生产的,所以老板的找个工人来进行生产,这个工人能够利用这些材料最终一步步的将口罩做出来,这个来做事情的工人称之为:线程

    这个老板为了提高生产率,想到3种办法:

    在这条生产线上多招些工人,一起来做口罩,这样效率是成倍増长,即单进程 多线程方式

    老板发现这条生产线上的工人不是越多越好,因为一条生产线的资源以及材料毕竟有限,所以老板又花了些财力物力购置了另外一条生产线,然后再招些工人这样效率又再一步提高了,即多进程 多线程方式

    老板发现,现在已经有了很多条生产线,并且每条生产线上已经有很多工人了(即程序是多进程的,每个进程中又有多个线程),为了再次提高效率,老板想了个损招,规定:如果某个员工在上班时临时没事或者再等待某些条件(比如等待另一个工人生产完谋道工序 之后他才能再次工作) ,那么这个员工就利用这个时间去做其它的事情,那么也就是说:如果一个线程等待某些条件,可以充分利用这个时间去做其它事情,其实这就是:协程方式

多线程实例

​ 使用模块 threading

import threading
import requests
import time

urls = [
	f"https://www.cnblogs.com/#p{page}" for page in range(1, 50)
]


def spider(url):
	response = requests.get(url).text
	print(url, len(response))

def sign_threading():
	print("sign_threading start")
	for url in urls:
		spider(url)
	print("sign_threading end")

def multi_threading():
	print("multi_threading start")
	threadings = []

	for url in urls:
		threadings.append(threading.Thread(target=spider, args=(url,)))

	for thread in threadings:
		thread.start()

	for thread in threadings:
		thread.join()
		
	print("multi_threading")


if __name__ == '__main__':
	start_time = time.time()
	sign_threading()
	end_time = time.time()
	print("sign_threading cost:", end_time-start_time)
	start_time = time.time()
	multi_threading()
	end_time = time.time()
	print("multi_threading cose:", end_time-start_time)

生产者消费者模式

队列的使用,队列保证了多个线程之间的数据通信

  • 导入类 ,import queue
  • 创建队列,q = queue.Queue()
  • 添加元素,q.put(item)
  • 获取元素,item = q.get()
  • 查看状态
    • 查看元素的个数,q.qsize()
    • 判断是否为空,q.empty()
    • 判断是否为满,q.full()
# 生产者消费者模式

import requests
from lxml import etree
import time
import queue
import threading
import random

data_queue = queue.Queue()


def spider(url):
    response = requests.get(url).text
    return (url, response)


# 生产者: 目的在于请求到html网页, 并存入队列
def producer(url_queue, data_queue):
    while True:
        url = url_queue.get()
        data_queue.put(spider(url))

        print(threading.current_thread().name, f"data_queue.size", data_queue.qsize())
        time.sleep(random.randint(1, 2))


# 消费者: 目的在于解析html网页数据,并写入文件
def consumer(data_queue, fq):
    while True:
        data = data_queue.get()
        html = etree.HTML(data[1])
        titles = html.xpath("//a[@class='post-item-title']/text()")
        url = data[0]
        for title in titles:
            fq.write(url + ", " + title + "n")

        print(threading.current_thread().name, f"data_queue.size", data_queue.qsize())
        time.sleep(random.randint(1, 2))


if __name__ == '__main__':
    url_queue = queue.Queue()
    data_queue = queue.Queue()
    for i in range(1, 201):
        url_queue.put(f"https://www.cnblogs.com/#p{str(i)}")
    fq = open("text2_data.txt", "w", encoding="utf8")

    # 创建生产者线程
    for ids in range(5):
        t = threading.Thread(target=producer, args=(url_queue, data_queue), name=f"producer {ids}")
        t.start()

    # 创建消费者线程
    for ids in range(2):
        t = threading.Thread(target=consumer, args=(data_queue, fq), name=f"consumer {ids}")
        t.start()

Lock用于解决线程安全问题

用法一:

import threading

lock = threading.Lock()
lock.acquire() ## 获得锁
try:
  # do something
finally:
  lock.release() # 释放锁

用法二:

import threading

lock = threading.Lock()

with lock:
  # do something

银行取钱的问题

# lock锁机制,保护线程的数据安全

import threading
import time

lock = threading.Lock()
class Acount:
	def __init__(self, monney):
		self.monney = monney


def get_money(acount, monney):
	with lock:
		if acount.monney >= monney:
			print("取钱成功!")
			acount.monney -= monney
			print(f"余额为: {acount.monney}")
		else:
			print("取钱失败!")


if __name__ == '__main__':
 	acount = Acount(1000)
 	t1 = threading.Thread(target=get_money, args=(acount, 800)) 
 	t2 = threading.Thread(target=get_money, args=(acount, 800)) 
 	t1.start()
 	t2.start()

线程池的使用

​ 我们知道在在使用多线程时,会不断新建和销毁线程,这会造成一定的开销,线程池的目的就是不会销毁线程,直接供给下一个线程使用,这样可减少一定的开销。

  • 语法

    • 方式一:使用map函数,简单

      from concurrent.futures import ThreadPoolExecute
      
      with ThreadPoolExecutor() as pool:
          results = pool.map(craw, urls) # craw为线程需要执行的函数,urls为所有的线程参数, 返回值是每个线程返回值组合成的列表
          for results in results:
              print(result)
      
    • 方式二:futrue模式,更强大

      from concurrent.futures import ThreadPoolExecutor, as_completed
      
      with ThreadPoolExecutor() as pool:
          futures = [pool.submit(craw, url) for url in urls]
          for future in futures:
              print(future.result())
          # 相对于前面,as_completed顺序是不定的
          for future in as_completed(futures):
              print(future.result())实例
      
  • 实例

    # 线程池的使用
    
    import requests
    from concurrent.futures import ThreadPoolExecutor, as_completed
    import time
    
    urls = [
    	f"https://www.cnblogs.com/#p{page}" for page in range(1, 50)
    ]
    
    def spider(url):
    	response = requests.get(url).text
    	return url, len(response)
    
    # 第一种方式
    start_time = time.time()
    with ThreadPoolExecutor() as pool:
    	results = pool.map(spider, urls)
    	for result in results:
    		print(result)
    end_time = time.time()
    print("使用map方式所需时间: ", end_time-start_time)
    
    # 第二种方式
    start_time = time.time()
    with ThreadPoolExecutor() as pool:
    	futures = [ pool.submit(spider, url) for url in urls]
    	# for future in futures:
    	# 	print(future.result())
    	for future in as_completed(futures):
    		print(future.result())
    end_time = time.time()
    print("使用submit所需时间: ", end_time-start_time)
    

多进程

多进程和多线程使用方式是差不多的,如下表:

语法条目多线程多进程
引入模块from threading import Threadfrom multiprocessing import Process
新建
启动
等待结束
t = Thread(target=func, args=(100,))
t.start()
t.join()
t = Process(target=func, args=(100, ))
t.start()
t.join()
数据通信import queue
q = queue.Queue()
item = q.get()
from multiprocessing import Queue
q = Queue()
item = q.get()
安全加锁from threading import Lock
lock = Lock()
with lock:
     # do something
from multiprocessing import Lock()
lock = Lock()
with lock:
     # do somethin
池化技术from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as pool:
     # 方法1
     results = pool.map(func, [1,2,3])
    # 方法2
    futures = pool.submit(func, 1)
    result = future.result()
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:
     # 方法1
     results = pool.map(func, [1,2,3])
     # 方法2
    futures = pool.submit(func, 1)
     result = future.result()

多协程

协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)。 为啥说它是一个执行单元,因为它自带CPU上下文。这样只要在合适的时机, 我们可以把一个协程切换到另一个协程。 只要这个过程中保存或恢复 CPU上下文那么程序还是可以运行的。

  • 单线程:

    image-20210322171013039

  • 多协程:就是在单线程内实现并发

    核心原理:用一个超级循环(其实就是while true)循环,配合IO多路复用原理(IO是CPU可以干其他事情)

    image-20210322171645053

  • 异步IO库:asyncio

    注意:

    • 要用在异步IO编程中,依赖的库不许支持异步IO特性
    • requests不支持异步,但aiohttp支持异步
    import asyncio
    
    # 获取时间循环
    loop = asyncio.get_event_loop()
    
    # 定义协程
    async def myfunc(url):
        await get_url(url)
        
    # 创建task列表
    tasks = [loop.create_task(myfunc(ulr) for url in urls)]
    
    # 执行爬虫时间列表
    loop.run_until_complete(asyncio.wait(tasks))
    

最后

以上就是爱听歌灰狼为你收集整理的提高爬虫效率之python并发编程的全部内容,希望文章能够帮你解决提高爬虫效率之python并发编程所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部