安全人员的爬虫技巧
从selenium+undetected_chromedriver开始
最近有爬取暗网和公网泄露资源的爬虫需求,研究学习了一些selenium的实战技巧,当然我爬虫也不是专业的,有错误请指正
我写爬虫遇到的最大的问题反爬检测,大部分的ddos墙可以用undetected_chromedriver这个库来实现绕过 https://github.com/ultrafunkamsterdam/undetected-chromedriver 这个库确实也很好用,很多基础的爬虫检测和很多小厂的ddos墙都能绕过,就是需要配一个和浏览器版本相同的chromedriver麻烦了些
这里贴一个使用这个库爬取hydra的泄露信息的脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import undetected_chromedriver as uc
from selenium.common.exceptions import NoSuchElementException
from bs4 import BeautifulSoup
import time
import pandas as pd
import re
from datetime import datetime
from collections import Counter
options = webdriver.ChromeOptions()
options.add_argument('--ignore-certificate-errors')
options.add_argument('--log-level=3')
driver = uc.Chrome(executable_path='D:/anaconda/chromedriver.exe',options = options)
wait = WebDriverWait(driver, 25)
base_url = 'https://hydramarket.org/Forums-vip-database?page={}&sortby=started'
df = pd.DataFrame(columns=['序号', '数据名称', '数据简介', '数据大小', '泄露渠道', '获取方式', '数据泄漏时间'])
# login
driver.get('https://hydramarket.org/member.php?action=login')
username = wait.until(EC.presence_of_all_elements_located((By.ID, 'username')))
passwd = wait.until(EC.presence_of_all_elements_located((By.ID, 'password')))
username[1].send_keys('e4l4')
passwd[1].send_keys('xxx')
login = wait.until(EC.presence_of_all_elements_located((By.XPATH, '/html/body/div[3]/form/div/div/div[5]/div[1]/input')))
login[0].click()
input("登录后按 Enter 继续...")
# 登录完成后,开始抓取
counter = 1 # 用于记录序号
# 设定日期区间
start_date = datetime(2023, 7, 1)
end_date = datetime(2023, 11, 21)
# 提取Size的正则表达式
size_pattern = re.compile(r'\b\d+(?:[.,]\d+)?[kKmMgGbBkkKK]+\w*')
# # 提取Format的正则表达式,任何包含'format'的行都被视为有效
# format_pattern = re.compile(r'format\s*[:-]?\s*(.*)', re.IGNORECASE)
found = False
for page in range(1,89): # 这里从1开始,因为已经登录了
if found:
break
url = base_url.format(page)
try:
driver.get(url)
if 'googleseo_404_notfound' in driver.page_source:
print(f"404错误:{url},{page}跳过此链接")
continue # 跳过此次
except WebDriverException:
print(f"无法访问链接:{url},跳过此链接")
continue # 跳过此次循环,继续爬取下一个链接
title_elements = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'div.col a[style="font-size: 18px; font-weight: 500"]')))
titles = [element.text for element in title_elements]
link_elements = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'div.col a[style="font-size: 18px; font-weight: 500"]')))
links = [element.get_attribute("href") for element in link_elements]
links = list(Counter(links).keys())
print(links)
for title, link in zip(titles, links):
try:
driver.get(link)
if 'googleseo_404_notfound' in driver.page_source:
print(f"404错误:{link},跳过此链接")
continue
except WebDriverException:
print(f"无法访问链接:{link},跳过此链接")
continue
print(link)
time_element = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".col.align-self-center span.text-muted")))
time_string = time_element[0].text
if 'hour ago' in time_string:
continue
if 'hours ago' in time_string:
continue
if 'minutes ago' in time_string:
continue
if 'Yesterday' in time_string:
continue
print('ok1')
postfix = " (This post was last modified:"
if postfix in time_string:
time_string = time_string.split(postfix)[0]
time_info = datetime.strptime(time_string, '%m-%d-%Y, %I:%M %p') # 解析时间字符串
if time_info < start_date:
found = True
break
if start_date <= time_info <= end_date:
description_element = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '.card.shadow-sm.mb-4.border-0.border.rounded.bg-white.p-5.m-0 span.mt-4')))
description = description_element.text
print('ok2')
size_info_in_title = re.findall(size_pattern, title) # 寻找标题中的Size
size_info_in_description = re.findall(size_pattern, description) # 寻找描述中的Size
size_info = list(set(size_info_in_title + size_info_in_description)) # 合并两者并删除重复元素
size = ', '.join(size_info) if size_info else ''
# format_lines = [line for line in description.split('\n') if 'format' in line.lower()]
# if format_lines:
# format_info = re.findall(format_pattern, format_lines[0])
# format = format_info[0].strip() if format_info else ''
# else:
# format = ''
df_row = pd.DataFrame([{'序号': counter, '数据名称': title, '数据简介': description, '数据大小': size, '泄露渠道': 'hydra', '获取方式': link, '数据泄漏时间': time_info.strftime('%Y-%m-%d %H:%M:%S')}])
df = pd.concat([df, df_row], ignore_index=True)
# print(f"序号: {counter}, 数据名称: {title}, 数据简介: {description}, 数据大小: {size}, 数据格式: {format}, 获取方式: {link}, 数据泄漏时间: {time_info.strftime('%Y-%m-%d %H:%M:%S')}")
counter += 1
# time.sleep(2)
time.sleep(2)
try:
driver.quit()
except Exception:
pass
# df.replace(to_replace='[^ -~]+', value='', regex=True, inplace=True)
df.to_excel('test.xlsx', index=False)
这里是按创建时间顺序排序,它的url有规律性,主要的逻辑是爬取列表页的标题和URL,然后进去爬取时间,满足时间需求再爬取描述内容,后续在标题和描述内容中提取size字段,还有格式,然后存储一条。
其中获取元素的主要的语法
1
2
wait = WebDriverWait(driver, 25)
username = wait.until(EC.presence_of_all_elements_located((By.ID, 'username')))
逻辑是设置等待时间,等待直到元素出现,这里的By用的最多的还是以下这些方法
1
2
3
login = wait.until(EC.presence_of_all_elements_located((By.XPATH, '/html/body/div[3]/form/div/div/div[5]/div[1]/input')))
time_element = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".col.align-self-center span.text-muted")))
xpath用来指定一些固定点比较好用,比如登录页的登录按钮,css选择器用来搜索一些重复标签比较好用 xpath就是F12右键直接复制路径,css选择器的写法则一般以.
开头 比如这样的情况
1
2
3
<div class="post-block-text md1">
STS Aviation Group is a service provider for the aviation industry. The company was founded in 1986 and is headquartered in Jensen Beach, Florida.
</div>
就应该是
1
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".post-block-text.md1")))
不同的class属性之间用.
间隔,不同的标签之间用空格键间隔,第一个标签不用写标签名而用.
代替 再者就是登录的逻辑,登录主要是用send_keys方法输入数据,click方法完成点击
一般来说最坏的情况也可以手动登录,但是这个库面对cloudflare就束手无策了,会出现一直校验的情况
暗网爬虫
暗网爬虫主要针对于一些需要通过tor浏览器访问的网站进行作业。由于tor和firefox基于同一内核架构,基于selenium对于firefox的支持,我们可以使用seleninum实现对暗网的爬虫作业。 这里主要是在windows下的操作,linux同理 首先是关于如何启动 tor的启动需要借助梯子,然后在tor网络设置里使用socks5代理,添加为梯子的socks5代理端口即可,然后你的tor就🛫了 然后是如何通过脚本去调用tor,配置firfox的驱动,这里需要用到Geckodriver,是一个firfox的webdiver 在windows上,tor连接网络后会启动在9150端口,我们编辑firefox配置,增加socks5代理为tor的端口 我们单独启动tor连接上网络,此时再运行脚本能正常拉起firfox,同时能访问暗网了
可以用以下代码在终端测试连接情况
1
2
# tor爬虫验证
curl --socks5 localhost:9150 --socks5-hostname localhost:9150 -s https://check.torproject.org/ | cat | grep -m 1 Congratulations | xargs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import TimeoutException
from bs4 import BeautifulSoup
import time
import pandas as pd
import re
from datetime import datetime
from collections import Counter
gecko_driver_path = r'C:\Users\e4l4\Desktop\geckodriver-v0.33.0-win32\geckodriver.exe'
options = Options()
options.set_preference('permissions.default.image', 2)# 无图模式
# options.add_argument('-headless')# 无头模式
options.set_preference('network.proxy.type', 1)
options.set_preference('network.proxy.socks', '127.0.0.1')
options.set_preference('network.proxy.socks_port', 9150)
options.set_preference("network.proxy.socks_remote_dns", True)
service = Service(gecko_driver_path)
driver = webdriver.Firefox(service=service, options=options)
由于暗网访问十分不稳定(可能有网络质量的因素),所以最好采用无头或无图的模式
这里贴一份爬取lockbit3的爬虫脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import TimeoutException
from bs4 import BeautifulSoup
import time
import pandas as pd
import re
from datetime import datetime
from collections import Counter
gecko_driver_path = r'C:\Users\e4l4\Desktop\geckodriver-v0.33.0-win32\geckodriver.exe'
options = Options()
options.set_preference('permissions.default.image', 2)
# options.add_argument('-headless')
options.set_preference('network.proxy.type', 1)
options.set_preference('network.proxy.socks', '127.0.0.1')
options.set_preference('network.proxy.socks_port', 9150)
options.set_preference("network.proxy.socks_remote_dns", True)
service = Service(gecko_driver_path)
driver = webdriver.Firefox(service=service, options=options)
wait = WebDriverWait(driver, 70)
driver.get("http://lockbitapt6vx57t3eeqjofwgcglmutr3a35nygvokja5uuccip4ykyd.onion/")
df = pd.DataFrame(columns=["序号", "数据名称", "数据简介", "数据大小", "泄露渠道", "获取方式", "数据泄漏时间"])
with pd.ExcelWriter('lockbit.xlsx', engine='openpyxl', mode='a', if_sheet_exists='replace') as writer:
df.to_excel(writer, index=False, sheet_name='Sheet1')
record_blocks = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.post-block.good')))
links = [element.get_attribute("href") for element in record_blocks]
record_blocks = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.post-block.good div.post-title')))
titles = [element.text for element in record_blocks]
start_date = datetime(2023, 7, 1)
end_date = datetime(2023, 11, 21)
size_pattern = re.compile(r'\b\d+(?:[.,]\d+)?[kKmMgGbBkkKK]+\w*')
counter = 1 # 用于记录序号
for title, link in zip(titles[169:], links[169:]):
if title == 'shakeys.com':
break
driver.get(link)
print(link)
# if 'hour ago' in time_string:
# continue
# if 'hours ago' in time_string:
# continue
# if 'minutes ago' in time_string:
# continue
# if 'Yesterday' in time_string:
# continue
time.sleep(1)
time_element = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".uploaded-date-utc")))
time_string = time_element[0].text
time_info = datetime.strptime(time_string, '%d %b, %Y %H:%M UTC')
print(time_info)
if start_date <= time_info <= end_date:
description_element = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '.desc')))
description = description_element.text
print('ok2')
try:
wait = WebDriverWait(driver, 2)
size_element = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '.post-download-btn.file-download-btn span span')))
size = size_element.text
print(size)
except TimeoutException:
size_info_in_title = re.findall(size_pattern, title)
size_info_in_description = re.findall(size_pattern, description)
size_info = list(set(size_info_in_title + size_info_in_description))
size = ', '.join(size_info) if size_info else ''
df_row = pd.DataFrame([{'序号': counter, '数据名称': title, '数据简介': description, '数据大小': size, '泄露渠道': 'Lockbit', '获取方式': link, '数据泄漏时间': time_info.strftime('%Y年%m月%d日')}])
df = pd.concat([df, df_row], ignore_index=True)
df.to_excel(writer, index=False, header=False, sheet_name='Sheet1') # 在每次获取到数据后即刻追加到 Excel 文件
# print(f"序号: {counter}, 数据名称: {title}, 数据简介: {description}, 数据大小: {size}, 数据格式: {format}, 获取方式: {link}, 数据泄漏时间: {time_info.strftime('%Y-%m-%d %H:%M:%S')}") # 在控制台输出
counter += 1
driver.quit()
# df.to_excel('lockbit.xlsx', index=False)
和前面略有不同的点在于,之前使用df进行存储,但是很容易出现断连的情况导致数据全部丢失,所以就采用了爬一条存一条的方式。其余逻辑都基本相同,用的原生的selenium。
DrissionPage唯一真神
由于遇到了cloudflare人工校验的问题,在看undetected_chromedriver的时候发现了一个爬虫项目DrissionPage,本身自带cloudflare的绕过,于是开始了这个框架的学习。 这个框架只能驱动Chrome和Edge,所以暂时不太能处理暗网爬虫的问题 具体的功能可以参考文档,写的很全面了,这里记录一些我的经验
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from DrissionPage import WebPage, ChromiumOptions, SessionOptions
from DrissionPage import ChromiumPage
import time
import pandas as pd
import re
from bs4 import BeautifulSoup
from datetime import datetime
co = ChromiumOptions()
so = SessionOptions()
page = WebPage(driver_or_options=co, session_or_options=so)
df = pd.DataFrame(columns=['序号', '数据名称', '数据简介', '数据大小', '泄露渠道', '获取方式', '数据泄漏时间'])
# login
page.get('https://breachforums.is/member?action=login')
# time.sleep(20)
page.ele("@name=username").input("e4l4")
page.ele("@name=password").input("xxx")
# page.ele("xpath:/html/body/div/div[2]/div/div[2]/div[2]/div[1]/form/div[4]/input").click(by_js=True)
input("请在浏览器中手动登录,然后按 Enter 继续...")
# ele.click
base_url = "https://breachforums.is/Forum-Databases?page={}&sortby=started"
# page.get("https://darkforums.me/Forum-Databases?page={}&sortby=started")
# page.get('https://breachforums.is/Thread-Victoria-University-Db-Psql-105Mb')
# description = page.ele("@class=post_body scaleimages").text
# print(description)
size_pattern = re.compile(r'\b\d+(?:[.,]\d+)?[kKmMgGbBkkKK]+\w*')
start_date = datetime(2023, 7, 1)
end_date = datetime(2023, 11, 21)
found = False
counter = 1
for i in range(2,89):
if found:
break
url = base_url.format(i)
page.get(url)
nodes = page.eles("@class= subject_new")
times = page.eles("@class=forum-display__thread-date")
titles = [node('tag:a').text for node in nodes]
links = [node('tag:a').link for node in nodes]
time_strings = [time.text for time in times]
for title, link,time_string in zip(titles, links,time_strings):
if 'hour ago' in time_string:
continue
if 'hours ago' in time_string:
continue
if 'minutes ago' in time_string:
continue
if 'Yesterday' in time_string:
continue
#
time_info = datetime.strptime(time_string, '%m-%d-%Y, %I:%M %p')
if time_info < start_date:
found = True
break
if start_date <= time_info <= end_date:
page.get(link)
print(link)
description = page.ele("@class=post_body scaleimages").text
size_info_in_title = re.findall(size_pattern, title) # 寻找标题中的Size
size_info_in_description = re.findall(size_pattern, description) # 寻找描述中的Size
size_info = list(set(size_info_in_title + size_info_in_description)) # 合并两者并删除重复元素
size = ', '.join(size_info) if size_info else ''
df_row = pd.DataFrame([{'序号': counter, '数据名称': title, '数据简介': description, '数据大小': size, '泄露渠道': 'breachforums', '获取方式': link, '数据泄漏时间': time_info.strftime('%Y-%m-%d %H:%M:%S')}])
df = pd.concat([df, df_row], ignore_index=True)
counter += 1
time.sleep(1)
time.sleep(1)
# df.replace(to_replace='[^ -~]+', value='', regex=True, inplace=True)
df.to_excel('test.xlsx', index=False)
这个框架主要简化了获取元素的环节,减少了代码量,由于比较小众,可以绕过cloudflare 同时还支持状态的切换可以同时使用浏览器模式和requests模式,为未来绕过有了更多的可能性 简单记录一下,具体的详细使用大家还是看文档吧
1
2
3
4
items = page.eles('t:h3')
for item in items[:-1]:
lnk = item('tag:a')
print(lnk.text, lnk.link)# 打印文本和href属性
1
2
3
4
5
6
7
8
9
10
11
# 获取 id 为 one 的元素
div1 = page.ele('#one')
# 获取 name 属性为 row1 的元素
p1 = page.ele('@name=row1')
# 获取包含“第二个div”文本的元素
div2 = page.ele('第二个div')
# 获取所有div元素
div_list = page.eles('tag:div')
#
意思是按id
属性查找元素 @
表示按属性名查找 .
表示class查找
1
2
3
4
5
6
7
8
9
10
# 查找class属性为p_cls的元素
ele2 = ele1.ele('.p_cls')
# 查找class属性'_cls'文本开头的元素
ele2 = ele1.ele('.^_cls')
ele2 = ele1.ele('@@name=row1@@class:cls')
# 查找文本以“第二”开头且class属性为p_cls的元素 []
ele2 = ele1.ele('@@text()^第二@@class=p_cls')
静态爬虫思路
静态爬虫思路主要是用于解决网络情况较差的情况,又不需要更深层次的链接访问,只需要主页的信息,这时候不妨 在浏览器里右键页面保存下来(tor也可以)
这里贴一个之前写的lockbit的爬虫,也是一个暗网的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from bs4 import BeautifulSoup
import pandas as pd
import re
from datetime import datetime
with open("LockBit BLOG.htm", "r", encoding='utf-8') as f:
contents = f.read()
soup = BeautifulSoup(contents, 'html.parser')
post_blocks = soup.find_all(class_=['post-block good', 'post-block bad'])
data = []
base_url = "http://lockbitapt6vx57t3eeqjofwgcglmutr3a35nygvokja5uuccip4ykyd.onion"
size_re = re.compile(r'\b\d+(\.\d+)?\s*[kKmMgGtTpP][bB]\b')
for i, block in enumerate(post_blocks):
row = {}
row['序号'] = i + 1
row['数据名称'] = block.find(class_='post-title').text.strip()
row['数据简介'] = block.find(class_='post-block-text').text.strip()
size_match = size_re.search(row['数据简介'])
row['数据大小'] = size_match.group() if size_match else ''
row['渠道'] = ''
row['获取方式'] = base_url + block.get('onclick').split("'")[1]
leak_time = datetime.strptime(block.find(class_='updated-post-date').text.strip(), 'Updated: %d %b, %Y, %H:%M UTC')
row['数据泄露时间'] = leak_time.strftime('%Y年%m月%d日')
data.append(row)
illegal_char_re = re.compile(r'[\000-\037\177]')
for row in data:
for key, value in row.items():
if isinstance(value, str):
row[key] = illegal_char_re.sub('', value)
df = pd.DataFrame(data)
df.to_excel("output.xlsx", index=False)
主要逻辑就是用BeautifulSoup去抓取元素,然后处理放进row里存储
存在的问题
- 对于一些网页卡死的情况,需要增加一些自动重复访问的代码
- 利用google浏览器自带的翻译,翻译后再进行爬虫操作,正则匹配会很麻烦。但是如果爬取完再利用翻译API进行翻译,量比较大的话更麻烦
- 爬取是一次保存,万一因为网络原因中断,全盘皆无。目前较好的解决办法是分段保存,不知道有没有别的代码写法。