我是靠谱客的博主 大方心锁,最近开发中收集的这篇文章主要介绍利用BaseManager搭建分布式爬虫,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

multiprocessing模块

分享进程间的通信的时候(参考(python进程间通信(二)-分布式进程),介绍了Python的multiprocessing
模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,依靠网络通信,将任务分布到其他多个进程中.我们可以利用这个思想,来搭建一套分布式的爬虫.

实现

架构采用主从模式,主从模式是指由一台主机作为控制节点负责所有运行网络爬虫的主机进行管理,爬虫只需要从控制节点那里接收任务,并把新生成任务提交给控制节点就可以了,在这个过程中不必与其他爬虫通信。而控制节点则需要与所有爬虫进行通信
使用三台主机进行分布式爬取,一台主机作为控制节点,另外两台主机作为爬虫节点

一. 控制节点部分(ControlNode)

控制节点主要分为URL管理器数据存储器控制调度器。控制调度器通过三个进程来协调URL管理器和数据存储器的工作,一个是URL管理进程,负责URL的管理和将URL传递给爬虫节点,一个是数据提取进程,负责读取爬虫节点返回的数据,将返回数据中的URL交给URL管理进程,将标题和摘要等数据交给数据存储进程,最后一个是数据存储进程,负责将数据提取进程中提交的数据进行本地存储

  • 控制调度器
    控制调度器主要是产生并启动URL管理进程、数据提取进程和数据存储进程,同时维护4个队列保持进程间的通信,分别为url_queue,result_queue, conn_q,store_q。4个队列说明如下:

    1. member_q队列是URL管理进程将URL传递给爬虫节点的通道
    2. result_q队列是爬虫节点将数据返回给数据提取进程的通道
    3. conn_q队列是数据提取进程将新的URL数据提交给URL管理进程的通道
    4. store_q队列是数据提取进程将获取到的数据交给数据存储进程的通道
#coding:utf-8
import time

from multiprocessing.managers import BaseManager
from multiprocessing import Process, Queue

from DataOutput import DataOutput
from MemberManager import MemberManager


class NodeManager(object):

    # 创建一个分布式管理器
    def start_Manager(self,member_q,result_q):
        # 把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象,
        # 将Queue对象在网络中暴露

        BaseManager.register('get_task_queue',callable=lambda:member_q)
        BaseManager.register('get_result_queue',callable=lambda:result_q)

        manager = BaseManager(address=('', 8001), authkey='zhihuuser'.encode('utf-8'))
        return manager

    # 进程1:member管理
    def member_manager_proc(self,member_q,con_q,root_member):
        member_manager = MemberManager()
        member_manager.add_new_member(root_member)
        while True:
            while(member_manager.has_new_member()):
                new_member = member_manager.get_new_member()  #从URL管理器获取新的member
                member_q.put(new_member)  #将新的URL发给工作节点
                print('old_member=',member_manager.old_member_size())

                if(member_manager.old_member_size()>2000):
                    member_q.put('end')
                    print('控制节点发起结束通知!')
                    #关闭管理节点,同时存储set状态
                    member_manager.save_progress('new_members.txt',member_manager.new_members)
                    member_manager.save_progress('old_members.txt',member_manager.old_members)
                    return
            #将从result_solve_proc获取到的members添加到URL管理器之间
            try:
                members = con_q.get()
                member_manager.add_new_members(members)
            except BaseException as e:
                time.sleep(0.1)

    # 进程2:数据提取
    def result_solve_proc(self,result_q,con_q,store_q):
        while(True):
            try:
                if not result_q.empty():
                    #Queue.get(block=True, timeout=None)
                    content = result_q.get(True)
                    if content['new_members']=='end':
                        print('结果分析进程接受通知然后结束!')
                        store_q.put('end')
                        return
                    con_q.put(content['new_members']) # member为set类型
                    store_q.put(content['data']) # 解析出来的数据为dict类型
                else:
                    time.sleep(0.1)
            except BaseException as e:
                time.sleep(0.1)

    # 进程3:数据存储
    def store_proc(self,store_q):
        output = DataOutput()
        while True:
            if not store_q.empty():
                data = store_q.get()
                if data=='end':
                    print('存储进程接受通知然后结束!')
                    output.ouput_end(output.filepath)

                    return
                output.store_data(data)
            else:
                time.sleep(0.1)
        pass


if __name__=='__main__':

    member_q = Queue()
    result_q = Queue()
    store_q = Queue()
    con_q = Queue()

    #创建分布式管理器
    node = NodeManager()
    manager = node.start_Manager(member_q,result_q)

    #创建URL管理进程、 数据提取进程和数据存储进程
    member_manager_proc = Process(target=node.member_manager_proc, args=(member_q,con_q,'excited-vczh',))
    result_solve_proc = Process(target=node.result_solve_proc, args=(result_q,con_q,store_q,))
    store_proc = Process(target=node.store_proc, args=(store_q,))

    #启动3个进程和分布式管理器
    member_manager_proc.start()
    result_solve_proc.start()
    store_proc.start()
    manager.get_server().serve_forever()
  • URL管理器
    由于我们采用set内存去重的方式,如果直接存储大量的URL链接,尤其是URL链接很长的时候,很容易造成内存溢出,所以我们采用将爬取过的URL进行MD5处理,由于字符串经过MD5处理后的信息摘要长度可以128bit,将生成的MD5摘要存储到set后,可以减少好几倍的内存消耗,Python中的MD5算法生成的是32位的字符串,由于我们爬取的url较少,md5冲突不大,完全可以取中间的16位字符串,即16位MD5加密。save_progress和load_progress方法进行序列化的操作,将未爬取URL集合和已爬取的URL集合序列化到本地,保存当前的进度,以便下次恢复状态。URL管理器URLManager.py代码如下:
#coding:utf-8
import pickle
import hashlib

class URLManager(object):
    def __init__(self):
        # self.new_urls_key = ""
        self.new_members = self.load_progress('new_members.txt')#未爬取URL集合
        self.old_members = self.load_progress('old_members.txt')#已爬取URL集合

    # 从本地文件加载进度
    def load_progress(self,path):
        print('[+] 从文件加载进度: %s' % path)
        try:
            with open(path, 'rb') as f:
                tmp = pickle.load(f)
                return tmp
        except:
            print('[!] 无进度文件, 创建: %s' % path)
        return set()
    # 保存进度
    def save_progress(self,path,data):
        with open(path, 'wb') as f:
            pickle.dump(data, f)

    # 判断是否有未爬取的member
    def has_new_member(self):
        return self.new_member_size()!=0

    # 获取一个未爬取的member
    def get_new_member(self):
        new_member = self.new_members.pop()
        m = hashlib.md5()
        m.update(new_member.encode("utf-8"))
        self.old_members.add(m.hexdigest()[8:-8])
        return new_member

    def add_new_member(self,member):
        if member is None:
            return
        m = hashlib.md5()
        m.update(member.encode('utf-8'))
        member_md5 =  m.hexdigest()[8:-8]
        if member not in self.new_members and member_md5 not in self.old_members:
            self.new_members.add(member)

    def add_new_members(self,members):
        if members is None or len(members)==0:
            return
        for member in members:
            self.add_new_member(member)

    def new_member_size(self):
        return len(self.new_members)

    def old_member_size(self):
        return len(self.old_members)

  • 数据存储器
#coding:utf-8
import codecs
import time

class DataOutput(object):
    def __init__(self):
        self.filepath='zhihu_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()) )
        self.output_head(self.filepath)
        self.datas=[]

    def store_data(self,data):
        if data is None:
            return
        self.datas.append(data)
        if len(self.datas)>10:
            self.output_html(self.filepath)

    # 将HTML头写进去
    def output_head(self,path):
        fout=codecs.open(path,'w',encoding='utf-8')
        fout.write("<html>")
        fout.write(r'''<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />''')
        fout.write("<body>")
        fout.write("<table>")
        fout.close()

    # 将数据写入HTML文件中
    def output_html(self,path):
        fout=codecs.open(path,'a',encoding='utf-8')
        for data in self.datas:
            fout.write("<tr>")
            fout.write("<td>%s</td>" % data['name'])
            fout.write("<td>%s</td>" % data['member'])
            fout.write("<td>%s</td>" % data['id'])
            fout.write("<td>%s</td>" % data['gender'])
            fout.write("<td>%s</td>" % data['type'])
            fout.write("<td>%s</td>" % data['headline'])
            fout.write("<td>%s</td>" % data['url'])
            fout.write("<td>%s</td>" % data['answer_count'])
            fout.write("<td>%s</td>" % data['articles_count'])
            fout.write("<td>%s</td>" % data['follower_count'])
            fout.write("<td>%s</td>" % data['badge'])
            fout.write("<td>%s</td>" % data['employments'])
            fout.write("</tr>")
        self.datas=[]
        fout.close()

    def ouput_end(self,path):
        fout=codecs.open(path,'a',encoding='utf-8')
        fout.write("</table>")
        fout.write("</body>")
        fout.write("</html>")
        fout.close()

爬虫节点部分(SpiderNode)

爬虫节点主要包含HTML下载器、HTML解析器和爬虫调度器。执行流程如下:

  1. 爬虫调度器从控制节点中的member_q队列读取URL
  2. 爬虫调度器调用HTML下载器、HTML解析器获取网页中新的URL和标题摘要
  3. 最后爬虫调度器将新的URL和标题摘要传入result_q队列交给控制节点
  • 下载器
#coding:utf-8
import requests
from requests.exceptions import ConnectionError

class HtmlDownloader(object):

    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36'}

    folowees_url = 'https://www.zhihu.com/api/v4/members/{member}/followees'
    member_url = 'https://www.zhihu.com/api/v4/members/{member}'

    follwees_query = {
        'include': 'data[*].answer_count,articles_count,gender,follower_count,is_followed,is_following,badge[?(type=best_answerer)].topics',
        'offset': None,
        'limit': 20
    }
    member_query = {
        'include': 'allow_message,is_followed,is_following,is_org,is_blocking,employments,answer_count,follower_count,articles_count,gender,badge[?(type=best_answerer)].topics'
    }


    def download(self,member):
        if member is None:
            return None
        followees_html = self.download_followees(member)
        member_html = self.download_member(member)
        return followees_html,member_html



    def download_followees(self,member):
        r = requests.get(self.folowees_url.format(member=member), data=self.follwees_query, headers=self.headers)
        if r.status_code==200:
            r.encoding='utf-8'
            followees_html = r.text
            return followees_html
        if r.status_code==403:
            return self.download_followees(member)



    def download_member(self,member):
        offset = 0
        self.follwees_query['offset'] = offset
        offset += 20
        r = requests.get(self.member_url.format(member=member), data=self.member_query,headers=self.headers)
        if r.status_code==200:
            r.encoding='utf-8'
            followees_html = r.text
            return followees_html
        if r.status_code == 403:
            return self.download_followees(member)
  • 解析器
#coding:utf-8
# import urllib.parse
import json
import requests

class HtmlParser(object):

    # 用于解析网页内容,抽取url和数据
    def parse(self, followees_html, member_html):
        if followees_html is None or member_html is None:
            return
        new_members = self.parse_followees(followees_html)
        new_data = self.parse_member(member_html)
        return new_members,new_data

    # 抽取新的url集合
    def parse_followees(self, html):
        if html is None:
            return
        result = json.loads(html)
        new_members = []
        if 'data' in result.keys():
            for data in result.get('data'):
                print(data.get('url_token'))
                new_member = data.get('url_token')
                print(new_member)
                new_members.append(new_member)

        return new_members

    # 抽取有效数据
    def parse_member(self,html):
        if html is None:
            return

        result = json.loads(html)
        data={}

        data['member']= result.get('url_token')
        data['id']=result.get('id')
        data['name']= result.get('name')
        data['headline']= result.get('headline')
        data['url']= result.get('url')
        data['gender']=result.get('gender')
        data['type']=result.get('type')
        data['badge']=result.get('badge')
        data['answer_count']=result.get('answer_count')
        data['articles_count']=result.get('articles_count')
        data['follower_count']=result.get('follower_count')
        data['employments']=result.get('employments')

        return data
  • 爬虫调度器
    爬虫调度器需要用到分布式进程中工作进程的代码。爬虫调度器需要先连接上控制节点,然后依次完成从member_q队列中获取URL,下载并解析网页,将获取的数据交给result_q队列,返回给控制节点等各项任务,代码如下:
#coding:utf-8
from multiprocessing.managers import BaseManager

from Downloader import HtmlDownloader
from Parser import HtmlParser


class SpiderWork(object):
    # 初始化分布式进程中的工作节点的连接工作
    def __init__(self):
        BaseManager.register('get_task_queue')
        BaseManager.register('get_result_queue')

        server_addr = '127.0.0.1'
        print(('Connect to server %s...' % server_addr))
        self.m = BaseManager(address=(server_addr, 8001), authkey='zhihuuser'.encode('utf-8'))
        self.m.connect()

        # 获取Queue的对象:
        self.task = self.m.get_task_queue()
        self.result = self.m.get_result_queue()

        # 初始化网页下载器和解析器
        self.downloader = HtmlDownloader()
        self.parser = HtmlParser()
        print('init finish')

    def crawl(self):
        while(True):
            try:
                if not self.task.empty():
                    member = self.task.get()

                    if member =='end':
                        print('控制节点通知爬虫节点停止工作...')
                        # 接着通知其它节点停止工作
                        self.result.put({'new_members':'end','data':'end'})
                        return
                    print('爬虫节点正在解析:%s'%member.encode('utf-8'))
                    f_html, m_html = self.downloader.download(member)
                    new_members,data = self.parser.parse(f_html, m_html)
                    self.result.put({"new_members":new_members,"data":data})
            except EOFError as e:
                print("连接工作节点失败")
                return
            except Exception as e:
                print(e)
                print('Crawl fail ')


if __name__=="__main__":
    spider = SpiderWork()
    spider.crawl()

总结

通过以上,我们完成了搭建了一个简单的分布式爬虫,不过,这个爬虫还有很多地方需要完善.比如:spider没有使用线程,控制节点如果出现故障,整个爬取过程就会中断等问题,不过,对于理解和开发分布式爬虫已经足够了!

最后

以上就是大方心锁为你收集整理的利用BaseManager搭建分布式爬虫的全部内容,希望文章能够帮你解决利用BaseManager搭建分布式爬虫所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部