动态爬虫

主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import scrapy
from scrapy.spiders import Spider
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from pydispatch import dispatcher
from scrapy import signals
from ..items import PicItem # 根据你的项目结构调整导入路径
import time
class MirrowSpider(Spider):
name = "moving_pic"
allowed_domains = ["dimtown.com"]
start_urls = ["https://dimtown.com/jxmt"]
title_count = 0 # 添加计数器
title_limit = 200 # 设置限制

def __init__(self, *args, **kwargs):
super(MirrowSpider, self).__init__(*args, **kwargs)
self.driver = webdriver.Chrome()
dispatcher.connect(self.spider_closed, signals.spider_closed)

def spider_closed(self, spider):
self.driver.quit()

def parse(self, response):
# 打印当前使用的 User-Agent
user_agent = response.request.headers.get('User-Agent').decode('utf-8')
self.logger.info(f"当前使用的 User-Agent: {user_agent}")

self.driver.get(response.url)
wait = WebDriverWait(self.driver, 20) # 增加等待时间

try:
# 找到并点击"评论最多"链接
comments_link = wait.until(EC.element_to_be_clickable((By.XPATH, '//a[@data-orderby="comment_count"]')))
self.logger.info("找到'评论最多'链接并点击")
comments_link.click()

# 获取页面的初始高度
last_height = self.driver.execute_script("return document.body.scrollHeight")

while True:
# 向下滚动页面
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
self.logger.info("页面向下滚动")
time.sleep(5) # 等待页面加载
new_height = self.driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height

# 提取所有详情页链接
links = self.driver.find_elements(By.XPATH, '//a[contains(@href, ".html")]')
detail_urls = [link.get_attribute('href') for link in links]
self.logger.info("所有详情页链接:%s", detail_urls)

for url in detail_urls:
yield scrapy.Request(url, callback=self.parse_detail)

except Exception as e:
self.logger.error("在解析初始页面时发生错误:%s", e)

def parse_detail(self, response):
# 打印当前使用的 User-Agent
user_agent = response.request.headers.get('User-Agent').decode('utf-8')
self.logger.info(f"当前使用的 User-Agent: {user_agent}")

self.driver.get(response.url)
wait = WebDriverWait(self.driver, 20) # 增加等待时间

try:
title = wait.until(EC.visibility_of_element_located((By.XPATH, '//h1'))).text.strip()
self.logger.info("标题:%s", title)

img_urls = [img.get_attribute('src') for img in self.driver.find_elements(By.XPATH, '//img[@decoding="async"]')]
self.logger.info("所有图片网址:%s", img_urls)

if not img_urls:
self.logger.warning("未找到任何图片网址。")

self.title_count += 1 # 增加计数器
if self.title_count >= self.title_limit:
self.crawler.engine.close_spider(self, '达到标题限制,爬虫停止')

item = PicItem(image_urls=img_urls, title=title)
yield item

except Exception as e:
self.logger.error("解析详情页时发生错误:%s", e)

Pipeline函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import os
import scrapy
from scrapy.pipelines.images import ImagesPipeline
from urllib.parse import urlparse

class CustomImagePipeline(ImagesPipeline):

def get_media_requests(self, item, info):
for image_url in item.get('image_urls', []):
yield scrapy.Request(image_url, meta={'item': item})

def file_path(self, request, response=None, info=None, *, item=None):
item = request.meta['item']
title = item.get('title', 'default_title').replace(' ', '_')
parsed_url = urlparse(request.url)
image_name = os.path.basename(parsed_url.path)
return f'{title}/{image_name}'

def item_completed(self, results, item, info):
if not results:
return item

image_paths = [x['path'] for ok, x in results if ok]
item['images'] = image_paths

return item

Middleware函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from scrapy.downloadermiddlewares.redirect import RedirectMiddleware
from fake_useragent import UserAgent

class CustomRedirectMiddleware(RedirectMiddleware):
def _redirect(self, redirected, request, spider, reason):
redirected = redirected.replace(url=request.url)
return super()._redirect(redirected, request, spider, reason)

class RandomUserAgentMiddleware(object):
def __init__(self):
self.ua = UserAgent()

def process_request(self, request, spider):
user_agent = self.ua.random
request.headers['User-Agent'] = user_agent
request.headers['Accept'] = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
request.headers['Accept-Language'] = "en"
request.headers['Referer'] = 'https://dimtown.com/cosplay/page/1' #需求修改

Settings函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# Scrapy settings for moving project

BOT_NAME = "moving"

SPIDER_MODULES = ["moving.spiders"]
NEWSPIDER_MODULE = "moving.spiders"

from shutil import which
from selenium.webdriver.chrome.service import Service

# 配置Selenium
SELENIUM_DRIVER_NAME = 'chrome'
SELENIUM_DRIVER_SERVICE_ARGS = ['--log-path=/path/to/chromedriver.log']
SELENIUM_DRIVER_ARGUMENTS = ['--headless'] # 如果需要无头浏览器模式

SELENIUM_SERVICE = Service(which('chromedriver')) # 使用Service类

ROBOTSTXT_OBEY = False

DOWNLOAD_DELAY = 3 # 每个请求之间的延迟时间(秒)
RANDOMIZE_DOWNLOAD_DELAY = True # 随机化下载延迟

DOWNLOADER_MIDDLEWARES = {
'scrapy_selenium.SeleniumMiddleware': 800,
'moving.middlewares.RandomUserAgentMiddleware': 800,
}

ITEM_PIPELINES = {
'moving.pipelines.MyImagePipeline': 1,
}

IMAGES_STORE = 'moving'

REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7"
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
FEED_EXPORT_ENCODING = "utf-8"

AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1 # 初始下载延迟(秒)
AUTOTHROTTLE_MAX_DELAY = 5 # 在高负载情况下的最大下载延迟(秒)
AUTOTHROTTLE_TARGET_CONCURRENCY = 3.0 # 每秒发送的请求数
AUTOTHROTTLE_DEBUG = False # 显示AutoThrottle的调试信息

RETRY_ENABLED = True
RETRY_TIMES = 5 # 重试次数
RETRY_HTTP_CODES = [429, 500, 502, 503, 504, 522, 524, 408] # 重试的HTTP状态码
RETRY_DELAY = 5 # 重试延迟时间(秒)

Item函数

1
2
3
4
5
6
import scrapy

class PicItem(scrapy.Item):
title = scrapy.Field()
image_urls = scrapy.Field()
images = scrapy.Field()

Start函数

1
2
from scrapy import cmdline
cmdline.execute('scrapy crawl moving_pic'.split(' '))

运行效果

运行效果示例