分布式豆瓣爬虫(一): 控制节点- URL 管理器

一、简单分布式爬虫架构

 本次分布式爬虫采用主从模式,主从模式是指一台主机作为控制节点,负责管理所有运行网络爬虫的主机,爬虫只需要从控制节点那里接收任务,并把新生成任务提交给控制节点就可以了,在这个过程中不必与其他爬虫通信,这种方式实现简单、利于管理。而控制节点则需要与所有爬虫进行通信,因此可以看到主从模式是有缺陷的,控制节点会成为整个系统的瓶颈,容易导致整个分布式网络爬虫系统性能下降。

 

二、控制节点

控制节点主要分为 URL 管理器、数据存储器和被控制调度器。控制调度器通过三个进程来协调 URL 管理器和数据存储器的工作:一个是 URL 管理进程,负责 URL 的管理和将 URL 传递给爬虫节点;一个是数据提取进程,负责读取爬虫节点返回的数据,将返回数据中的 URL 交给 URL 管理进程,将标题和摘要等数据交给数据存储进程;最后一个是数据存储进程,负责将数据提取进程中提交的数据进行本地存储。

 

三、URL 管理器

对之前的 url 管理器进行优化,采用 set 内存去重的方式,如果直接存储大量的 URL 链接,尤其是 URL 链接很长的时候,很容易造成内存溢出,所以我们将爬取过的 URL 进行 MD5 处理。字符串经过 MD5 处理后的信息摘要长度为128位,将生成的 MD5 摘要存储到 set 后,可以减少好几倍的内存消耗,不过 Python 中的 MD5 算法生成的是256位,取中间的128位即可。我们同时添加了 save_progress 和 load_progress 方法进行序列化的操作,将未爬取 URL 集合和已爬取的 URL 集合序列化到本地,保存当前的进度,以便下次恢复状态。

 

四、代码如下

 1 import pickle 2 import hashlib 3  4  5 class UrlManager: 6 def __init__(self): 7 self.new_urls = self.load_progress(new_urls.txt) # 未爬取 url 集合 8 self.old_urls = self.load_progress(old_urls.txt) # 已爬取 url 集合 9 10 def has_new_url(self):11 """12  判断是否有未爬取的 url13  :return: bool14 """15 return self.new_urls_size() != 016 17 def get_new_url(self):18 """19  返回一个未爬取的 url20  :return: str21 """22 new_url = self.new_urls.pop()23 m = hashlib.md5()24 m.update(new_url.encode(utf-8))25 self.old_urls.add(m.hexdigest()[8:-8])26 27 return new_url28 29 def add_new_url(self, url):30 """31  添加一个新的 url32  :param url: 单个 url33  :return: None34 """35 if url is None:36 return None37 m = hashlib.md5()38 m.update(url.encode(utf-8))39 url_md5 = m.hexdigest()[8:-8]40 if (url not in self.new_urls) and (url_md5 not in self.old_urls):41  self.new_urls.add(url)42 43 def add_new_urls(self, urls):44 """45  添加多个新的url46  :param urls: 多个 url47  :return: None48 """49 if urls is None:50 return None51 for url in urls:52  self.add_new_url(url)53 54 def new_urls_size(self):55 """56  返回未爬过的 url 集合的大小57  :return: int58 """59 return len(self.new_urls)60 61 def old_urls_size(self):62 """63  返回已爬过的 url 集合的大小64  :return: int65 """66 return len(self.old_urls)67 68 def save_progress(self, path, data):69 """70  保存进度71  :param path: 路径72  :return: None73 """74 with open(path, wb) as file:75  pickle.dump(data, file)76 77 def load_progress(self, path):78 """79  从本地文件加载进度80  :param path: 路径81  :return: set82 """83 print([+] 从文件加载进度{}.format(path))84 try:85 with open(path, rb) as file:86 return pickle.load(file)87 except:88 print([!] 无进度文件)89 90 return set()

 

相关文章