Python实现并行抓取整站40万条房价数据(可更换抓取城市)
作者:Data&Truth 发布时间:2021-09-24 10:56:43
写在前面
这次的爬虫是关于房价信息的抓取,目的在于练习10万以上的数据处理及整站式抓取。
数据量的提升最直观的感觉便是对函数逻辑要求的提高,针对Python的特性,谨慎的选择数据结构。以往小数据量的抓取,即使函数逻辑部分重复,I/O请求频率密集,循环套嵌过深,也不过是1~2s的差别,而随着数据规模的提高,这1~2s的差别就有可能扩展成为1~2h。
因此对于要抓取数据量较多的网站,可以从两方面着手降低抓取信息的时间成本。
1)优化函数逻辑,选择适当的数据结构,符合Pythonic的编程习惯。例如,字符串的合并,使用join()要比“+”节省内存空间。
2)依据I/O密集与CPU密集,选择多线程、多进程并行的执行方式,提高执行效率。
一、获取索引
包装请求request,设置超时timeout
# 获取列表页面
def get_page(url):
headers = {
'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3',
'Referer': r'http://bj.fangjia.com/ershoufang/',
'Host': r'bj.fangjia.com',
'Connection': 'keep-alive'
}
timeout = 60
socket.setdefaulttimeout(timeout) # 设置超时
req = request.Request(url, headers=headers)
response = request.urlopen(req).read()
page = response.decode('utf-8')
return page
一级位置:区域信息
二级位置:板块信息(根据区域位置得到板块信息,以key_value对的形式存储在dict中)
以dict方式存储,可以快速的查询到所要查找的目标。-> {'朝阳':{'工体','安贞','健翔桥'......}}
* 位置:地铁信息(搜索地铁周边房源信息)
将所属位置地铁信息,添加至dict中。 -> {'朝阳':{'工体':{'5号线','10号线' , '13号线'},'安贞','健翔桥'......}}
对应的url:http://bj.fangjia.com/ershoufang/--r-%E6%9C%9D%E9%98%B3%7Cw-5%E5%8F%B7%E7%BA%BF%7Cb-%E6%83%A0%E6%96%B0%E8%A5%BF%E8%A1%97
解码后的url:http://bj.fangjia.com/ershoufang/--r-朝阳|w-5号线|b-惠新西街
根据url的参数模式,可以有两种方式获取目的url:
1)根据索引路径获得目的url
# 获取房源信息列表(嵌套字典遍历)
def get_info_list(search_dict, layer, tmp_list, search_list):
layer += 1 # 设置字典层级
for i in range(len(search_dict)):
tmp_key = list(search_dict.keys())[i] # 提取当前字典层级key
tmp_list.append(tmp_key) # 将当前key值作为索引添加至tmp_list
tmp_value = search_dict[tmp_key]
if isinstance(tmp_value, str): # 当键值为url时
tmp_list.append(tmp_value) # 将url添加至tmp_list
search_list.append(copy.deepcopy(tmp_list)) # 将tmp_list索引url添加至search_list
tmp_list = tmp_list[:layer] # 根据层级保留索引
elif tmp_value == '': # 键值为空时跳过
layer -= 2 # 跳出键值层级
tmp_list = tmp_list[:layer] # 根据层级保留索引
else:
get_info_list(tmp_value, layer, tmp_list, search_list) # 当键值为列表时,迭代遍历
tmp_list = tmp_list[:layer]
return search_list
2)根据dict信息包装url
{'朝阳':{'工体':{'5号线'}}}
参数:
——r-朝阳
——b-工体
——w-5号线
组装参数:http://bj.fangjia.com/ershoufang/--r-朝阳|w-5号线|b-工体
1 # 根据参数创建组合url
2 def get_compose_url(compose_tmp_url, tag_args, key_args):
3 compose_tmp_url_list = [compose_tmp_url, '|' if tag_args != 'r-' else '', tag_args, parse.quote(key_args), ]
4 compose_url = ''.join(compose_tmp_url_list)
5 return compose_url
二、获取索引页最大页数
# 获取当前索引页面页数的url列表
def get_info_pn_list(search_list):
fin_search_list = []
for i in range(len(search_list)):
print('>>>正在抓取%s' % search_list[i][:3])
search_url = search_list[i][3]
try:
page = get_page(search_url)
except:
print('获取页面超时')
continue
soup = BS(page, 'lxml')
# 获取最大页数
pn_num = soup.select('span[class="mr5"]')[0].get_text()
rule = re.compile(r'\d+')
max_pn = int(rule.findall(pn_num)[1])
# 组装url
for pn in range(1, max_pn+1):
print('************************正在抓取%s页************************' % pn)
pn_rule = re.compile('[|]')
fin_url = pn_rule.sub(r'|e-%s|' % pn, search_url, 1)
tmp_url_list = copy.deepcopy(search_list[i][:3])
tmp_url_list.append(fin_url)
fin_search_list.append(tmp_url_list)
return fin_search_list
三、抓取房源信息Tag
这是我们要抓取的Tag:
['区域', '板块', '地铁', '标题', '位置', '平米', '户型', '楼层', '总价', '单位平米价格']
# 获取tag信息
def get_info(fin_search_list, process_i):
print('进程%s开始' % process_i)
fin_info_list = []
for i in range(len(fin_search_list)):
url = fin_search_list[i][3]
try:
page = get_page(url)
except:
print('获取tag超时')
continue
soup = BS(page, 'lxml')
title_list = soup.select('a[class="h_name"]')
address_list = soup.select('span[class="address]')
attr_list = soup.select('span[class="attribute"]')
price_list = soup.find_all(attrs={"class": "xq_aprice xq_esf_width"}) # select对于某些属性值(属性值中间包含空格)无法识别,可以用find_all(attrs={})代替
for num in range(20):
tag_tmp_list = []
try:
title = title_list[num].attrs["title"]
print(r'************************正在获取%s************************' % title)
address = re.sub('\n', '', address_list[num].get_text())
area = re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0)
layout = re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0)
floor = re.search('\d/\d', attr_list[num].get_text()).group(0)
price = re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0)
unit_price = re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0)
tag_tmp_list = copy.deepcopy(fin_search_list[i][:3])
for tag in [title, address, area, layout, floor, price, unit_price]:
tag_tmp_list.append(tag)
fin_info_list.append(tag_tmp_list)
except:
print('【抓取失败】')
continue
print('进程%s结束' % process_i)
return fin_info_list
四、分配任务,并行抓取
对任务列表进行分片,设置进程池,并行抓取。
# 分配任务
def assignment_search_list(fin_search_list, project_num): # project_num每个进程包含的任务数,数值越小,进程数越多
assignment_list = []
fin_search_list_len = len(fin_search_list)
for i in range(0, fin_search_list_len, project_num):
start = i
end = i+project_num
assignment_list.append(fin_search_list[start: end]) # 获取列表碎片
return assignment_list
p = Pool(4) # 设置进程池
assignment_list = assignment_search_list(fin_info_pn_list, 3) # 分配任务,用于多进程
result = [] # 多进程结果列表
for i in range(len(assignment_list)):
result.append(p.apply_async(get_info, args=(assignment_list[i], i)))
p.close()
p.join()
for result_i in range(len(result)):
fin_info_result_list = result[result_i].get()
fin_save_list.extend(fin_info_result_list) # 将各个进程获得的列表合并
通过设置进程池并行抓取,时间缩短为单进程抓取时间的3/1,总计时间3h。
电脑为4核,经过测试,任务数为3时,在当前电脑运行效率最高。
五、将抓取结果存储到excel中,等待可视化数据化处理
# 存储抓取结果
def save_excel(fin_info_list, file_name):
tag_name = ['区域', '板块', '地铁', '标题', '位置', '平米', '户型', '楼层', '总价', '单位平米价格']
book = xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls' % file_name) # 默认存储在桌面上
tmp = book.add_worksheet()
row_num = len(fin_info_list)
for i in range(1, row_num):
if i == 1:
tag_pos = 'A%s' % i
tmp.write_row(tag_pos, tag_name)
else:
con_pos = 'A%s' % i
content = fin_info_list[i-1] # -1是因为被表格的表头所占
tmp.write_row(con_pos, content)
book.close()
附上源码
#! -*-coding:utf-8-*-
# Function: 房价调查
# Author:蘭兹
from urllib import parse, request
from bs4 import BeautifulSoup as BS
from multiprocessing import Pool
import re
import lxml
import datetime
import cProfile
import socket
import copy
import xlsxwriter
starttime = datetime.datetime.now()
base_url = r'http://bj.fangjia.com/ershoufang/'
test_search_dict = {'昌平': {'霍营': {'13号线': 'http://bj.fangjia.com/ershoufang/--r-%E6%98%8C%E5%B9%B3|w-13%E5%8F%B7%E7%BA%BF|b-%E9%9C%8D%E8%90%A5'}}}
search_list = [] # 房源信息url列表
tmp_list = [] # 房源信息url缓存列表
layer = -1
# 获取列表页面
def get_page(url):
headers = {
'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3',
'Referer': r'http://bj.fangjia.com/ershoufang/',
'Host': r'bj.fangjia.com',
'Connection': 'keep-alive'
}
timeout = 60
socket.setdefaulttimeout(timeout) # 设置超时
req = request.Request(url, headers=headers)
response = request.urlopen(req).read()
page = response.decode('utf-8')
return page
# 获取查询关键词dict
def get_search(page, key):
soup = BS(page, 'lxml')
search_list = soup.find_all(href=re.compile(key), target='')
search_dict = {}
for i in range(len(search_list)):
soup = BS(str(search_list[i]), 'lxml')
key = soup.select('a')[0].get_text()
value = soup.a.attrs['href']
search_dict[key] = value
return search_dict
# 获取房源信息列表(嵌套字典遍历)
def get_info_list(search_dict, layer, tmp_list, search_list):
layer += 1 # 设置字典层级
for i in range(len(search_dict)):
tmp_key = list(search_dict.keys())[i] # 提取当前字典层级key
tmp_list.append(tmp_key) # 将当前key值作为索引添加至tmp_list
tmp_value = search_dict[tmp_key]
if isinstance(tmp_value, str): # 当键值为url时
tmp_list.append(tmp_value) # 将url添加至tmp_list
search_list.append(copy.deepcopy(tmp_list)) # 将tmp_list索引url添加至search_list
tmp_list = tmp_list[:layer] # 根据层级保留索引
elif tmp_value == '': # 键值为空时跳过
layer -= 2 # 跳出键值层级
tmp_list = tmp_list[:layer] # 根据层级保留索引
else:
get_info_list(tmp_value, layer, tmp_list, search_list) # 当键值为列表时,迭代遍历
tmp_list = tmp_list[:layer]
return search_list
# 获取房源信息详情
def get_info_pn_list(search_list):
fin_search_list = []
for i in range(len(search_list)):
print('>>>正在抓取%s' % search_list[i][:3])
search_url = search_list[i][3]
try:
page = get_page(search_url)
except:
print('获取页面超时')
continue
soup = BS(page, 'lxml')
# 获取最大页数
pn_num = soup.select('span[class="mr5"]')[0].get_text()
rule = re.compile(r'\d+')
max_pn = int(rule.findall(pn_num)[1])
# 组装url
for pn in range(1, max_pn+1):
print('************************正在抓取%s页************************' % pn)
pn_rule = re.compile('[|]')
fin_url = pn_rule.sub(r'|e-%s|' % pn, search_url, 1)
tmp_url_list = copy.deepcopy(search_list[i][:3])
tmp_url_list.append(fin_url)
fin_search_list.append(tmp_url_list)
return fin_search_list
# 获取tag信息
def get_info(fin_search_list, process_i):
print('进程%s开始' % process_i)
fin_info_list = []
for i in range(len(fin_search_list)):
url = fin_search_list[i][3]
try:
page = get_page(url)
except:
print('获取tag超时')
continue
soup = BS(page, 'lxml')
title_list = soup.select('a[class="h_name"]')
address_list = soup.select('span[class="address]')
attr_list = soup.select('span[class="attribute"]')
price_list = soup.find_all(attrs={"class": "xq_aprice xq_esf_width"}) # select对于某些属性值(属性值中间包含空格)无法识别,可以用find_all(attrs={})代替
for num in range(20):
tag_tmp_list = []
try:
title = title_list[num].attrs["title"]
print(r'************************正在获取%s************************' % title)
address = re.sub('\n', '', address_list[num].get_text())
area = re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0)
layout = re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0)
floor = re.search('\d/\d', attr_list[num].get_text()).group(0)
price = re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0)
unit_price = re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0)
tag_tmp_list = copy.deepcopy(fin_search_list[i][:3])
for tag in [title, address, area, layout, floor, price, unit_price]:
tag_tmp_list.append(tag)
fin_info_list.append(tag_tmp_list)
except:
print('【抓取失败】')
continue
print('进程%s结束' % process_i)
return fin_info_list
# 分配任务
def assignment_search_list(fin_search_list, project_num): # project_num每个进程包含的任务数,数值越小,进程数越多
assignment_list = []
fin_search_list_len = len(fin_search_list)
for i in range(0, fin_search_list_len, project_num):
start = i
end = i+project_num
assignment_list.append(fin_search_list[start: end]) # 获取列表碎片
return assignment_list
# 存储抓取结果
def save_excel(fin_info_list, file_name):
tag_name = ['区域', '板块', '地铁', '标题', '位置', '平米', '户型', '楼层', '总价', '单位平米价格']
book = xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls' % file_name) # 默认存储在桌面上
tmp = book.add_worksheet()
row_num = len(fin_info_list)
for i in range(1, row_num):
if i == 1:
tag_pos = 'A%s' % i
tmp.write_row(tag_pos, tag_name)
else:
con_pos = 'A%s' % i
content = fin_info_list[i-1] # -1是因为被表格的表头所占
tmp.write_row(con_pos, content)
book.close()
if __name__ == '__main__':
file_name = input(r'抓取完成,输入文件名保存:')
fin_save_list = [] # 抓取信息存储列表
# 一级筛选
page = get_page(base_url)
search_dict = get_search(page, 'r-')
# 二级筛选
for k in search_dict:
print(r'************************一级抓取:正在抓取【%s】************************' % k)
url = search_dict[k]
second_page = get_page(url)
second_search_dict = get_search(second_page, 'b-')
search_dict[k] = second_search_dict
# * 筛选
for k in search_dict:
second_dict = search_dict[k]
for s_k in second_dict:
print(r'************************二级抓取:正在抓取【%s】************************' % s_k)
url = second_dict[s_k]
third_page = get_page(url)
third_search_dict = get_search(third_page, 'w-')
print('%s>%s' % (k, s_k))
second_dict[s_k] = third_search_dict
fin_info_list = get_info_list(search_dict, layer, tmp_list, search_list)
fin_info_pn_list = get_info_pn_list(fin_info_list)
p = Pool(4) # 设置进程池
assignment_list = assignment_search_list(fin_info_pn_list, 2) # 分配任务,用于多进程
result = [] # 多进程结果列表
for i in range(len(assignment_list)):
result.append(p.apply_async(get_info, args=(assignment_list[i], i)))
p.close()
p.join()
for result_i in range(len(result)):
fin_info_result_list = result[result_i].get()
fin_save_list.extend(fin_info_result_list) # 将各个进程获得的列表合并
save_excel(fin_save_list, file_name)
endtime = datetime.datetime.now()
time = (endtime - starttime).seconds
print('总共用时:%s s' % time)
总结:
当抓取数据规模越大,对程序逻辑要求就愈严谨,对python语法要求就越熟练。如何写出更加pythonic的语法,也需要不断学习掌握的。
来源:http://www.cnblogs.com/Lands-ljk/p/5467236.html
猜你喜欢
- 在安装了IIS以后,缺省的服务器端脚本语言被设置成VBScript。许多Web 开发团队在他们的开发环境中保持了这些缺省设置,这是不幸的,因
- 其实发这篇博感觉并没有什么用,太简单了,会的人不屑看,不会的人自已动动脑子也想到了。但是看着自已的博客已经这么久没更,真心疼~。粗略算下一篇
- 1. 什么是 CSV 文件CSV(逗号分隔值)文件是使用逗号分隔信息的文本文件。该文件的每一行都是一条数据记录,也就意味着它可以用于以表格的
- 1、PHP中的抽象类PHP 5 支持抽象类和抽象方法。定义为抽象的类不能被实例化。任何一个类,如果它里面至少有一个方法是被声明为抽象的,那么
- python发起http请求,并解析返回的json字符串的小demo,方便以后用到。#! /usr/bin/env python  
- Python+matplotlib进行鼠标交互,实现动态标注,数据可视化显示,鼠标划过时画一条竖线并使用标签来显示当前值。Python3.6
- '====================================='功能:根据ip地址输出地区'参数:ip
- 本文实例讲解了php表单验证的实现方法,分享给大家供大家参考,具体内容如下1.PHP表单处理welcome.html<html>
- 有时我们有很多文件(如图片),我们需要对每一个文件进行操作。 我们还需要一份文件的名字来进行遍历,这时我们首先需要建立一份文件名单,有时还会
- SQL Server四类数据仓库建模的方法主要分为以下四类。第一类是关系数据库的三范式建模,通常我们将三范式建模方法用于建立各种操作型数据库
- 其实各大深度学习框架背后的原理都可以理解为拟合一个参数数量特别庞大的函数,所以各框架都能用来拟合任意函数,Pytorch也能。在这篇博客中,
- 下表列出 SQL Server 查询分析器提供的所有键盘快捷方式。活动 快捷方式 书签:清除所有书签。 CTRL-SHIFT-F2
- 一般情况下,网站的图片代码是这样的。<img src="./images/test.jpg"
- 本文主要是基于Python Opencv 实现的图像分割,其中使用到的opencv的函数有:使用 OpenCV 函数 cv::filter2
- 一.背景在现在的网站中,接入的渠道是越来越多了,技术也是越来越先进,WAP, SMS,EMAIL, 传统的Web, Socket等等,如果连
- 概述路由是自定义url地址执行指定的函数,良好的路由定义可以对seo起到很好的效果。1. 基本路由gin框架封装了http库,提供了 GET
- 我们之前要想在调度里面实现延时执行,我们可以使用管道阻塞,直到有人往管道里面写东西才变通畅,还可以使用sleep来睡觉,但是睡觉的过程,协程
- 最近真的喜欢上了用xheditor这个在线编辑器,但是美中不足的是我发现它暂时还不能取代FCKeditor,因为没有在线上传功能啊!当然,F
- 404页面对于站长来说应该并不陌生,其作用无碍乎二点:提高用户体验和增强对搜索引擎的友好性。去年在跟几个朋友在聊天的时候,跟我说404页面不
- 每次查询分析器寻找路径时,并不会每一次都去统计索引中包含的行数,值的范围等,而是根据一定条件创建和更新这些信息后保存到数据库中,这也就是所谓