概述
如果要爬取一个大型网站时,串行下载显然已经不再适用,所以使用并发下载,用多线程和多进程这来嗯中下载网页的方式。
测试环境
Alexa提供了最受欢迎的100万个网站列表,
网址http://www.alexa.com/topsites
也可以通过http://s3.amazonaws.com/alexa-static/top-1m.csv.zip
直接下载这一列表的压缩文件。
解析Alexa列表
抽取数据的步骤:
- 下载.zip文件;
- 从.zip文件中提取CSV文件;
- 解析CSV文件;
- 遍历CSV文件中的每一行,从中抽取出域名数据;
import csv
from zipfile import ZipFile
from DownLoad import Downloader
from StringIO import StringIO
#实例化一个下载的对象
D=Downloader()
# 下载列表的压缩包
zipped_data=D('http://s3.amazonaws.com/alexa-static/top-1m.csv.zip')
urls=[]
# ZipFile需要一个类似文件的接口,所以使用StringIO封装
# StringIO主要用于在内存缓冲区中读写数据
with ZipFile(StringIO(zipped_data)) as zf:
# 从文件列表中提取CSV文件的名称
csv_filename=zf.namelist()[0]
for _,website in csv.reader(zf.open(csv_filename)):
# 使域名合法
urls.append('http://' + website)
如果想在之前开发的爬虫中复用上述功能,需要修改scrape_callback接口:
import csv
from zipfile import ZipFile
from mongo_02 import MongoCache
from StringIO import StringIO
class AlexaCallback:
def __init__(self,max_urls=1000):
self.max_urls=max_urls
self.seed_url='http://s3.amazonaws.com/alexa-static/top-1m.csv.zip'
def __call__(self, url,html):
if url==self.seed_url:
urls=[]
cache=MongoCache()
with ZipFile(StringIO(html)) as zf:
csv_filename=zf.namelist()[0]
for _,website in csv.reader(zf.open(csv_filename)):
if 'http://' + website not in cache:
urls.append('http://' + website)
if len(urls) == self.max_urls:
break
return urls
串行爬虫
import time
from Link_crawler import link_crawler
from mongo_02 import MongoCache
from alexa_cb import AlexaCallback
def main():
scrape_callback=AlexaCallback()
bigcache=MongoCache()
link_crawler(scrape_callback.seed_url,cache=bigcache,scrape_callback=scrape_callback)
if __name__=='__main__':
start = time.time()
main()
print "the app run:%s" % (time.time() - start)
多线程爬取
# coding:utf-8
import threading
import time
import urlparse
from DownLoad import Downloader
SLEEP_TIME=1
def threaded_crawer(seed_url,delay=5,cache=None,scrape_callback=None,
user_agent='wswp',proxies=None,num_retries=1,
max_threads=10,timeout=60):
crawl_queue=[seed_url]
seen=set([seed_url])
D=Downloader(cache=cache,delay=delay,user_agent=user_agent,
proxies=proxies,num_retries=num_retries,timeout=timeout)
def process_queue():
while True:
try:
url=crawl_queue.pop()
except IndexError:
break
else:
html=D(url)
if scrape_callback:
try:
links=scrape_callback(url,html) or []
except Exception as e:
print 'Error in callback for:{}:{}'.format(url,e)
else:
for link in links:
link=normalize(seed_url,link)
seen.add(link)
crawl_queue.append(link)
#
threads=[]
while threads or crawl_queue:
# 将没有在用的线程全部删除,以防不执行,却使线程数达到最大
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
# 当前活跃的线程数小于最大线程数并且链接队列不为空时继续爬取
while len(threads) < max_threads and crawl_queue:
# 产生一个线程
thread=threading.Thread(target=process_queue)
thread.setDaemon(True) #设置守护线程,设定守护线程时, 主线程执行结束, 子线程也结束;
thread.start() #启动线程
threads.append(thread)
# 模拟线程阻塞,使用多线程功能
time.sleep(SLEEP_TIME)
def normalize(seed_url,link):
"""将链接格式补充完整"""
link,_=urlparse.urldefrag(link)
return urlparse.urljoin(seed_url,link)
多进程爬虫
目前,爬虫队列都是存储在本地内存当中,其他进程都无法处理这个爬虫。为了解决该问题,需要把爬虫队列转移到MongoDB当中。单独存储队列,意味着即使是不同的服务器也能够协同处理同一爬虫任务。
# coding:utf-8
from datetime import datetime, timedelta
from pymongo import MongoClient, errors
class MongoQueue:
OUTSTANDING,PROCESSING,COMPLETE=range(3)
def __init__(self,client=None,timeout=300):
self.client=MongoClient() if client is None else client
self.db=self.client.cache
self.timeout=timeout
def __nonzero__(self):
#如果还有进程在工作或者还有未完成的进程就返回True
record=self.db.craw_queue.find_one(
{'status':{'$ne':self.COMPLETE}}
)
return True if record else False
def push(self,url):
"""将新的url添加到数据库,如果已经则什么都不做"""
try:
self.db.craw_queue.insert({'_id':url,'status':self.OUTSTANDING})
except errors.DuplicateKeyError as e:
pass
def pop(self):
"""获取状态为outstanging的url,将它的状态设置为processing,并且加上当前时间
以便判断这个进程有效期"""
record=self.db.craw_queue.find_and_modify(
query={'status':self.OUTSTANDING},
update={'$set':{'status':self.PROCESSING,
'timestamp':datetime.now()}}
)
#如果存在这样的进程就返回它的id
if record:
return record['_id']
#如果没有就将释放一些空闲的进程
else:
self.repair()
raise KeyError()
def repair(self):
"""释放进程"""
#将到期并且状态为complete的进程状态设置为outstanging
record=self.db.craw_queue.find_and_modify(
query={
'timestamp':{'$lt':datetime.now() -
timedelta(seconds=self.timeout)},
'status':{'$ne':self.COMPLETE}
},
update={'$set':{'status':self.OUTSTANDING}}
)
if record:
print 'Relesed:',record['_id']
def complete(self,url):
# 将传入的url进程状态设置为完成状态
self.db.craw_queue.update({'_id':url},{'$set':{'status':self.COMPLETE}})
当添加一个新的URL时,其状态为OUTSTANDING;当URL从队列中去除准备下载时其状态为PROCESSING;当下在结束后,其状态为COMPLETE。如果下载超时,就repair()将状态重新设为OUTSTANDING,以便再次处理。
# coding:utf-8
import threading
import time
import urlparse
import multiprocessing
from DownLoad import Downloader
from mondo_03 import MongoQueue
SLEEP_TIME=1
def threaded_crawer(seed_url,delay=5,cache=None,scrape_callback=None,
user_agent='wswp',proxies=None,num_retries=1,
max_threads=10,timeout=60):
crawl_queue=MongoQueue()
crawl_queue.clear()
crawl_queue.push(seed_url)
D=Downloader(cache=cache,delay=delay,user_agent=user_agent,
proxies=proxies,num_retries=num_retries,timeout=timeout)
def process_queue():
while True:
try:
url=crawl_queue.pop()
except IndexError:
break
else:
html=D(url)
if scrape_callback:
try:
links=scrape_callback(url,html) or []
except Exception as e:
print 'Error in callback for:{}:{}'.format(url,e)
else:
for link in links:
link=normalize(seed_url,link)
crawl_queue.push(link)
crawl_queue.complete(url)
#
threads=[]
while threads or crawl_queue:
# 将没有在用的线程全部删除,以防不执行,却使线程数达到最大
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
# 当前活跃的线程数小于最大线程数并且链接队列不为空时继续爬取
while len(threads) < max_threads and crawl_queue:
# 产生一个线程
thread=threading.Thread(target=process_queue)
thread.setDaemon(True) #设置守护线程,设定守护线程时, 主线程执行结束, 子线程也结束;
thread.start() #启动线程
threads.append(thread)
# 模拟线程阻塞,使用多线程功能
time.sleep(SLEEP_TIME)
def process_crawler(args,**kwargs):
# 获取当前计算机的cpu数量
num_cpus=multiprocessing.cpu_count()
print 'Starting{} processes'.format(num_cpus)
processes=[]
for i in range(num_cpus):
p=multiprocessing.Process(target=threaded_crawer,args=[args],kwargs=kwargs)
p.start()
processes.append(p)
for p in processes:
p.join()
def normalize(seed_url,link):
"""将链接格式补充完整"""
link,_=urlparse.urldefrag(link)
return urlparse.urljoin(seed_url,link)
改动将Python内建队列替换成基于MongoDB的新队列,命名为MongoQueue。由于该队列会在内部实现中处理重复URL的问题,所以不再需要seen变量。最后调用complete方法,用于记录该URL已经被成功解析。
再构建一个对进程处理,类似于多线程处理方法。
最后
以上就是虚幻洋葱为你收集整理的Python网络爬虫——并发下载的全部内容,希望文章能够帮你解决Python网络爬虫——并发下载所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复