自动化爬虫脚本实践项目总结此脚本用于自动化爬取WOA爽文、科技文并尝试自行总结为精简的情节与信息,从而达到防沉迷的效果
目录导入模块 全局配置 工具项 随机延时函数 模拟人类行为函数 时间过滤器 文件保存器 浏览器锁文件清理 验证码处理守卫 dd推送底层请求 dd消息推送 AI总结接口 自动化发动机(包含全流程运作图) 主程序 总指挥 启动浏览器 检查登录 遍历WOA 返回书架自重试 进入文章列表 循环获取该WOA上的所有文章 终极战力 导入模块1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncio from playwright.async_api import async_playwright, Pageimport randomimport osimport refrom datetime import datetimeimport warningsimport sysimport requestsimport timeimport json
playwright:剧作家(浏览器自动化测试工具)
async:异步
asyncio:异步I/O
全局配置1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 RANDOM_PREFIXES = [ "🔔 发现一篇新文章:" , "📖 今日份深度阅读已送达:" , "🚀 抓取到一条新动态:" , "✨ 发现宝藏内容,请查收:" , "🤖 机器人小助提醒,新文更新:" , "📺 你订阅的频道有更新啦:" , "💡 这里的思考值得一看:" ] RANDOM_EMOJIS = ["🌟" , "✅" , "🔥" , "📢" , "👀" , "☕" , "🍀" ] warnings.filterwarnings("ignore" , category=RuntimeWarning, message=".*Future exception was never retrieved.*" ) SHUTDOWN_REQUESTED = False LAST_PROCESSED_MP = None DING_WEBHOOK_URL = "xxxxxaccess_token=xxxx" DEBUG_MODE = True MAX_ARTICLE_PER_ACCOUNT = 5 MAX_ACCOUNT_TO_PROCESS = 8 ONLY_TODAY_ARTICLE = True SHELF_URL = "xxxxx" PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) USER_DATA_DIR = os.path.join(PROJECT_ROOT, "xxxxx" ) import time
cookie 常指网站存储在用户浏览器中的一小段文本数据 ,用于记录 用户的登录状态或行为 ,后续自动携带从而保持状态、记录偏好、跟踪浏览行为。
dir:directory 文件夹、目录、名录
工具项 随机延时函数1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def random_sleep (min_s=2.0 , max_s=5.0 , reason="" ): """ 随机延时函数(异步):模拟人类操作的无规律停顿 :param min_s: 最小延时秒数,默认2.0秒 :param max_s: 最大延时秒数,默认5.0秒 :param reason: 延时原因描述,用于日志打印 """ delay = random.uniform(min_s, max_s) if reason: print (f"⏳ {reason} ,随机等待 {delay:.1 f} 秒..." ) await asyncio.sleep(delay)
这个函数用来模拟人类操作的停顿,避免操作太规整被识别成机器人~
模拟人类行为函数1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 async def simulate_human_behavior (page ): """ 模拟真实用户行为:随机鼠标移动 + 随机小滚动 :param page: Playwright的Page对象,代表浏览器标签页 """ try : vp = page.viewport_size if vp: width = vp['width' ] height = vp['height' ] for _ in range (random.randint(1 , 3 )): x = random.randint(100 , width - 100 ) y = random.randint(100 , height - 100 ) await page.mouse.move(x, y, steps=random.randint(5 , 15 )) await asyncio.sleep(random.uniform(0.1 , 0.4 )) if random.random() > 0.5 : await page.mouse.wheel(0 , random.randint(-50 , 150 )) except Exception: pass
用来模拟真实用户的鼠标移动和滚动,尽量让操作看起来像真人,减少被反爬的概率~
时间过滤器1 2 3 4 5 6 7 8 9 10 def is_today_article (time_text: str ) -> bool : """判断是否是当天发布的文章""" if not time_text: return False if "今天" in time_text or "小时前" in time_text or "分钟前" in time_text or "刚刚" in time_text: return True return False
这里比原来的版本多了对“刚刚”的判断,覆盖更多的时间表述~
文件保存器1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def save_article (title, content, mp_name ): """ 保存原始文章内容到本地Markdown文件 :param title: 文章标题 :param content: 文章正文 :param mp_name: WOA名称 :return: 保存成功返回True,失败返回False """ try : base_dir = os.path.join(PROJECT_ROOT, "data/raw_articles" ) if not os.path.exists(base_dir): os.makedirs(base_dir) safe_title=re.sub(r'[\\/:*?"<>|]' ,'_' , title).strip() date_str = datetime.now().strftime("%Y%m%d" ) filename=f"{mp_name} _{date_str} _{safe_title[:15 ]} .md" file_path=os.path.join(base_dir,filename) with open (file_path,'w' ,encoDING='utf-8' ) as f: f.write(f"---\ntitle: \"{title} \"\nmp: \"{mp_name} \"\ndate: {datetime.now()} \n---\n\n" ) f.write(content) print (f"📁 已保存至: {file_path} " ) return True except Exception as e: print (f"❌ 存储失败: {e} " ) return False
意思是:如果在此之前代码报错了,不要直接卡死 ,而是把错误原因抓住 (存到变量e中 ),执行补救措施
mp_name代表WOA名称:media platform 浏览器锁文件清理1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def clean_user_data_locks (): """ 清理Chrome用户数据目录下的崩溃锁文件:避免浏览器启动时因锁文件导致的异常 递归查找,精准匹配常见Chromium锁文件名 """ if not os.path.exists(USER_DATA_DIR): return lock_filenames = {"SingletonLock" , "SingletonSocket" , "SingletonCookie" , "Lock" , ".com.google.Chrome.*.lock" } for root, dirs, files in os.walk(USER_DATA_DIR): for filename in files: if any (lock_name in filename for lock_name in lock_filenames): filepath = os.path.join(root, filename) try : if os.path.islink(filepath) or os.path.isfile(filepath): os.remove(filepath) print (f"🧹 已清除浏览器异常锁文件: {filepath} " ) except Exception as e: print (f"⚠️ 清理锁文件失败 {filepath} : {e} " )
这个是用来处理浏览器异常退出后留下的锁文件,避免下次启动浏览器的时候报错,很实用的小工具~
验证码处理守卫1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 async def handle_captcha (page ): """ 检测并等待用户手动处理验证码 :param page: Playwright的Page对象 :return: 验证通过返回True,超时/失败返回False """ global SHUTDOWN_REQUESTED try : captcha_mask = page.locator("xxxxx" ) if await captcha_mask.count() > 0 and await captcha_mask.first.is_visible(timeout=2000 ): print ("\n🚨🚨🚨 触发人机验证!程序已自动暂停 🚨🚨🚨" ) print ("👉 请在弹出的浏览器窗口中手动完成滑块/点选验证..." ) try : await captcha_mask.first.wait_for(state="hidden" , timeout=300000 ) print ("✅ 验证通过!恢复自动执行..." ) await asyncio.sleep(2 ) return True except TimeoutError: print ("\n⏰ 验证码处理超时(5分钟),程序将退出!" ) SHUTDOWN_REQUESTED = True return False except Exception: pass return False
遇到人机验证的时候,程序会自动停下来等你手动处理,处理完了就继续跑,不用重启程序
dd推送底层请求1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 def _request_DING (payload: dict , retry_times=3 ): """ 底层dd推送请求逻辑:负责实际HTTP请求,包含指数退避重试 :param payload: dd推送的JSON请求体 :param retry_times: 重试次数,默认3次 :return: ddAPI响应(JSON格式)或None """ if not DING_WEBHOOK_URL: print ("❌ dd Webhook未配置,跳过推送" ) return None headers = {'Content-Type' : 'application/json' } for retry in range (retry_times): try : response = requests.post(DING_WEBHOOK_URL, json=payload, headers=headers, timeout=10 ) response.raise_for_status() return response.json() except Exception as e: if retry < retry_times - 1 : wait_time = 2 ** retry print (f"❌ dd推送失败(第{retry+1 } 次): {e} ,{wait_time} 秒后重试..." ) time.sleep(wait_time) else : print (f"❌ dd推送重试{retry_times} 次均失败: {e} " ) return None
这个是推送的底层逻辑,带了重试机制,网络不好的时候也能尽量推送成功,不会丢消息
dd消息推送1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 def send_to_DING (title: str , summary: str , mp_name: str , time_text: str ): """ 发送摘要到dd机器人:分两步(前置消息+Markdown主体) :param title: 文章标题 :param summary: AI生成的摘要 :param mp_name: WOA名称 :param time_text: 文章发布时间 """ if not DING_WEBHOOK_URL: return prefix_text = random.choice(RANDOM_PREFIXES) emoji = random.choice(RANDOM_EMOJIS) prefix_content = f"{emoji} {prefix_text} 【{mp_name} 】" prefix_payload = { "msgtype" : "text" , "text" : { "content" : prefix_content } } print (f"📡 正在推送前置提醒: {prefix_content} " ) _request_DING(prefix_payload) markdown_text = f"### {title} \n**来源**:{mp_name} \n**时间**:{time_text} \n---\n{summary} " main_payload = { "msgtype" : "markdown" , "markdown" : { "title" : f"【摘要】{title} " , "text" : markdown_text } } print (f"📡 正在推送文章摘要..." ) result = _request_DING(main_payload) if result and result.get('errcode' ) == 0 : print ("✅ dd全部推送成功" ) else : print (f"❌ dd摘要推送失败: {result} " )
推送的时候分两步,先发个提醒,再发摘要,而且每次的提醒语都是随机的,看起来更像真人发的,不会被当成机器人消息~
AI总结接口1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 async def summarize_with_ai (title: str , content: str , mp_name: str , time_text: str ): """ DS AI总结逻辑:调用API生成固定格式摘要,写入本地文件并推送dd :param title: 文章标题 :param content: 文章正文 :param mp_name: WOA名称 :param time_text: 文章发布时间 :return: AI生成的摘要或None """ global LAST_PROCESSED_MP print (f"\n🤖 正在调用 DS 生成摘要:{title} ..." ) api_key = "xxxx" url = "xxxx" prompt = f""" 请阅读以下WOA文章,并生成一段客观、冷静、不浮夸的摘要。 只输出摘要内容,不要任何开场白。 格式严格如下(请替换内容): 事件:xx公司的xx模型/xxAPP/xx+突破xx领域/实现了xx功能+昭示当前xx领域处于xx态势 文章内容: {content[:4000 ]} """ payload = { "model" : "deepseek-chat" , "messages" : [ {"role" : "user" , "content" : prompt} ], "temperature" : 0.5 } headers = { "Authorization" : f"Bearer {api_key} " , "Content-Type" : "application/json" } try : response = await asyncio.to_thread(requests.post, url, json=payload, headers=headers) response.raise_for_status() result = response.json() ai_summary = result['choices' ][0 ]['message' ]['content' ].strip() summary_content = "" if mp_name != LAST_PROCESSED_MP: summary_content += f"\n{mp_name} :\n" LAST_PROCESSED_MP = mp_name summary_content += f"文章标题:{title} \n" summary_content += f"时间:{time_text} \n" summary_content += f"{ai_summary} \n" summary_file = os.path.join(PROJECT_ROOT, "AI_Daily_Summary.txt" ) with open (summary_file, "a" , encoDING="utf-8" ) as f: f.write(summary_content) print (f"📝 摘要已写入 {summary_file} " ) await asyncio.to_thread(send_to_DING, title, ai_summary, mp_name, time_text) return ai_summary except Exception as e: print (f"❌ DS 调用失败: {e} " ) return None
这里调用DS的API来生成文章的摘要,生成完了会先写到本地的摘要文件里,然后推送到dd上,这样你不用看全文,就能快速知道文章讲了啥~~,防沉迷的效果~~
自动化发动机1 2 3 4 5 6 7 8 9 10 11 12 graph TD A[main: 程序入口] -->|调用| B[run_summarizer: 总指挥] B -->|1.启动| C[launch_browser: 开浏览器+反爬] B -->|2.登录| D[wait_for_shelf_login: 检查登录] B -->|3.遍历| E[process_all_accounts: 遍历所有WOA] E -->|3.1 回书架| F[navigate_to_shelf: 跳转回书架] E -->|3.2 进列表| G[enter_article_list_page: 进入文章列表] E -->|3.3 抓文章| H[process_account_articles: 抓单个WOA] H -->|循环调用| I[process_single_article: 抓单篇文章] I -->|调用| J[summarize_with_ai: AI总结预览] I -->|调用| K[save_article: 保存文件] I -->|调用| L[is_today_article: 时间过滤]
主程序1 2 3 4 5 6 7 8 9 if __name__=="__main__" : try : asyncio.run(run_summarizer()) except KeyboardInterrupt: pass
关于asyncio的函数:
函数/类 说明 asyncio.run()运行一个入口协程,自动管理事件循环,程序启动用。 asyncio.create_task()将协程包装为任务并立即调度,实现后台并发。 asyncio.gather()并发运行多个协程/任务,等待所有完成并收集结果。 asyncio.sleep()暂停当前协程指定时间,主动让出控制权给事件循环。 asyncio.wait()等待一组任务完成,可设置超时,返回已完成/未完成集合。 asyncio.Lock异步互斥锁,用于保护共享资源,防止数据竞争。 asyncio.Queue异步队列,用于生产者-消费者模式,安全传递数据。 asyncio.wait_for()为协程/任务设置超时,超时则取消并抛出异常。
总指挥1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 async def run_summarizer (): async with async_playwright() as p: context,page=await launch_browser(p) try : await page.goto(SHELF_URL,wait_until="domcontentloaded" ) await wait_for_shelf_login(page) await process_all_accounts(page,context) except Exception as e: print (f"❌程序运行出错:{str (e)} " ) finally : await context.close() print ("程序运行结束,浏览器关闭" )
await:挂起当前协程 (在async def定义的异步函数内使用)
此处await配套async使得多个请求同时发送,节省时间wait_until后的常用取值:
取值 含义 说明 "load"等待 load 事件触发 页面的所有资源(图片、样式、脚本等)都加载完成。这是默认值,但通常比较慢。 "domcontentloaded"等待 DOMContentLoaded 事件触发 初始 HTML 文档被完全加载和解析,但可能还在加载外部资源(如图片)。速度较快。 "networkidle"等待网络空闲(没有超过 0 个连接) 相当于所有网络请求都结束。有 "networkidle0" 和 "networkidle2" 两种变体,后者允许少量连接。最慢但最稳妥。 "commit"等待导航被提交(收到响应头) 最快,但此时页面内容可能还没开始解析。
Playwright 的 Page 对象提供了大量方法来控制浏览器页面。以下是按功能分类的最常用函数,所有方法均为异步,调用时需加 await。
分类 函数名 说明 导航 goto(url, options)跳转到指定 URL,可设置 wait_until 等选项 go_back() / go_forward()后退/前进到历史记录中的上一页/下一页 reload()刷新当前页面 元素定位 query_selector(selector)返回第一个匹配 CSS 选择器的元素(ElementHandle) query_selector_all(selector)返回所有匹配元素的列表 wait_for_selector(selector, options)等待元素出现,超时则抛出异常 交互操作 click(selector, options)点击指定元素 fill(selector, value)填充输入框(先清空再输入) type(selector, text, options)逐个字符输入(模拟真实打字) select_option(selector, values)选择下拉框选项 check(selector) / uncheck(selector)勾选/取消勾选复选框或单选按钮 hover(selector)鼠标悬停 获取内容 inner_text(selector)获取元素的可见文本 inner_html(selector)获取元素的内部 HTML text_content(selector)获取元素的文本内容(包含隐藏元素) get_attribute(selector, name)获取元素的指定属性值 等待 wait_for_timeout(timeout)等待指定毫秒数(通常不推荐,优先使用其他等待方法) wait_for_function(pageFunction, arg, options)等待页面内函数返回真值 wait_for_load_state(state, options)等待特定加载状态(如 load、domcontentloaded) 截图与PDF screenshot(options)截图,可指定路径、质量、全屏等 pdf(options)将页面导出为 PDF(仅无头 Chrome 支持) 执行脚本 evaluate(pageFunction, arg)在页面上下文中执行 JavaScript 函数,返回序列化的结果 evaluate_handle(pageFunction, arg)同上,但返回 JSHandle(可操作复杂对象) Cookies context.cookies(urls)通过 page.context 获取 Cookies context.add_cookies(cookies)添加 Cookies 事件监听 on(event, callback)监听页面事件(如 'dialog'、'popup'、'request') 其他 url 属性获取当前页面 URL title()获取页面标题 content()获取整个页面的 HTML set_viewport_size(size)设置视口大小 close()关闭页面,释放资源
提示 :以上只是最常用的一部分,Playwright 的 API 非常丰富,完整列表可查阅 官方文档 。
启动浏览器1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 async def launch_browser (p ): clean_user_data_locks() context = await p.chromium.launch_persistent_context( user_data_dir=USER_DATA_DIR, headless="new" if not DEBUG_MODE else False , args=[ '--no-sandbox' , '--disable-blink-features=AutomationControlled' , '--disable-infobars' , '--window-size=1440,900' , '--user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36' ], ignore_https_errors=True , viewport={"width" : 1440 , "height" : 900 }, locale='zh-CN' , timezone_id='Asia/Shanghai' ) await context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); """ ) page = context.pages[0 ] if context.pages else await context.new_page() print ("🚀 浏览器已以真实指纹启动,正在访问WOA书架..." ) return context, page
context 是 BrowserContext 类的对象,可以认为是一个独立的浏览器会话 (类似于无痕模式下的隔离环境)。拥有自己的 cookies、缓存和页面,常用于模拟多用户或多标签页操作
函数 说明 new_page()创建一个新页面(标签页)。 cookies([urls])获取当前上下文的 cookies。 add_cookies(cookies)添加 cookies。 clear_cookies()清除所有 cookies。 add_init_script(script)在每个页面加载前执行指定脚本(常用于绕过反爬)。 expect_page(options)监听并等待一个新页面被创建(如点击链接打开新标签页)。 pages()返回当前上下文所有打开的页面列表。 close()关闭上下文,释放资源。 storage_state(**kwargs)获取当前上下文的存储状态(cookies、localStorage),可用于保存登录态。
此外,还可以通过 context 设置全局的 timeout、viewport 等属性
p.chromium :「用 Playwright 启动 Chrome 浏览器」(Playwright 还支持 Firefox、Safari)
launch_persistent_context :记住登录状态
context:「浏览器上下文」整个浏览器窗口的遥控,可以管理多个标签页、设置全局反爬
内部参数:
对应配置项 :USER_DATA_DIR = "./wechat_workdir"(删去后无cookie,每次都要手动登录)headless="new" if not DEBUG_MODE else False对应配置项:DEBUG_MODE = True (“new”表示无头模式)add_init_script :意思是「添加初始化脚本」。
作用 :在每一个网页加载之前 ,先执行这段 JavaScript 代码,在网页还没反应过来先机器人标记抹掉。
1 Object .defineProperty (navigator, 'webdriver' , {get : () => undefined })
所有自动化浏览器(Playwright、Selenium),都会在网页里留一个「机器人指纹」:navigator.webdriver = true,改为undefined去除标记 检查登录1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 async def wait_for_shelf_login (page ): """ 等待书架加载并处理登录逻辑:检测登录状态,未登录则等待用户扫码 :param page: Playwright的Page对象 """ print ("正在检查登录状态..." ) try : await page.wait_for_selector("xxxxx" , timeout=5000 ) except : print ("⚠️ 未检测到书架元素,请在弹出的浏览器中手动扫码登录..." ) await handle_captcha(page) await page.wait_for_selector("xxxxx" , timeout=60000 ) print ("书架加载完成,正在获取WOA列表..." )
如何确定xxxxx:F12找到找到“元素”<a class="xxxxx" ...>WOA名称</a>
构造选择器 标签名是 a(超链接)。 类名是 xxxxx(注意有时可能有多个类名,用点号连接)。 因此 CSS 选择器可以写成 "xxxxx",表示“所有 class 包含对应标识的 <a> 元素”。 验证选择器 在开发者工具的“控制台”(Console)中,输入 document.querySelectorAll("xxxxx"),如果返回的元素列表与页面上的WOA卡片数量一致,说明选择器正确。 遍历WOA1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 async def process_all_accounts (page,context ): account_selector="xxxxx" accounts_initial=await page.query_selector_all(account_selector) total_account=len (accounts_initial) print (f"共找到{total_account} 个元素" ) for account_idx in range (total_account): goto_success=await navigate_to_shelf(page) if not goto_success: print ("false" ) break accounts = await page.query_selector_all(account_selector) if account_idx >= len (accounts): break current_account = accounts[account_idx] account_name = await current_account.inner_text() account_name = account_name.replace("\n" , "" ).strip() print (f"---------- 正在处理WOA:{account_name} ----------" ) await current_account.click() try : await page.wait_for_selector("xxxxx" , timeout=10000 ) except : print (f"⚠️ {account_name} 不是,跳过" ) await page.goto(SHELF_URL) await page.wait_for_selector(account_selector, timeout=5000 ) continue article_list_page = await enter_article_list_page(page, context) await process_account_articles(article_list_page, account_name)
current_account 是什么类型的对象?1 2 accounts = await page.query_selector_all(account_selector) current_account = accounts[account_idx]
page.query_selector_all() 返回的是一个 ElementHandle 对象的列表 。ElementHandle 是 Playwright 中代表页面中真实 DOM 元素 的句柄,可以通过它来获取元素的属性、文本、点击元素等。这里代表当前索引的WOA卡片元素。 inner_text() 是成员函数吗?1 account_name = await current_account.inner_text()
是的 ,inner_text() 是 ElementHandle 类的一个异步成员方法 。它的作用是获取该 DOM 元素的可见文本内容 (JavaScript 中的 element.innerText) 关于3.7 - 关于xxxxx:文章页顶部的一个元素(通常是WOA名称或标题链接),标志一个正常的WOA文章页已经加载完成 - await page.wait_for_selector(account_selector) 是 Playwright 中的一个等待方法 ,作用是一直等待,直到页面上出现能够匹配 account_selector 的 DOM 元素
关于DOM元素
- **DOM** 的全称 Document Object Model(文档对象模型)。当浏览器打开一个网页时,它会将 HTML 代码转换成一个树形结构,这个结构就是 DOM。
- **元素** 是 DOM 树中的基本组成部分,对应 HTML 中的各种标签。比如:
- `<div>` 是一个元素
- `<a>` 是一个元素
- `<p>` 是一个元素
- `<img>` 也是一个元素
返回书架自重试1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 async def navigate_to_shelf (page ): """导航回书架并确保元素加载,自重试防网络波动""" account_selector = "xxxxx" retry_count = 0 goto_success = False while retry_count < 3 and not goto_success: try : await page.goto(SHELF_URL, wait_until="domcontentloaded" , timeout=10000 ) await page.wait_for_selector(account_selector, timeout=5000 ) goto_success = True except Exception as e: print (f"跳转书架失败,重试第{retry_count+1 } 次" ) retry_count += 1 await asyncio.sleep(1 ) return goto_success
进入文章列表1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def enter_article_list_page (page, context ): try : async with context.expect_page(timeout=5000 ) as new_page_info: await page.click("xxxxx" ) article_list_page = await new_page_info.value except : article_list_page = page await article_list_page.wait_for_selector("xxxxx" , state="visible" , timeout=5000 ) return article_list_page
关于"xxxxx",F12检查该元素,发现标签是 <span>,并且类名包含对应标识,因此写此用于唯一标识
expect_page():开始监听浏览器上下文中是否有新页面被创建(例如点击一个链接打开了新标签页)
new_page_info 是 expect_page 上下文管理器返回的事件信息对象 ,它包含新页面的相关信息
.value 是该对象的一个可等待属性 ,最终返回新创建的 Page 对象(Playwright 的页面类),此处为了获取真正的 Page 实例,赋值给 article_list_page
循环获取该WOA上的所有文章1 2 3 4 5 6 7 async def process_account_articles (article_list_page, account_name ): for idx in range (min (MAX_ARTICLE_PER_ACCOUNT, 20 )): should_continue = await process_single_article(article_list_page, account_name, idx) if not should_continue: break
终极战力对于单篇文章的过滤处理
处理单篇文章 的全流程:
打开列表(之前函数) → 选中文章 → 加载正文 → 智能提取 → 保存文件 → AI 预览→回到列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 async def process_single_article (article_list_page,account_name,idx ): catalog_toggle_selector = "xxxxx" catalog_container_selector = "xxxxx" target_selector = "xxxxx, xxxxx" try : is_list_visible = await article_list_page.is_visible(catalog_container_selector) if not is_list_visible: await article_list_page.click(catalog_toggle_selector) await article_list_page.wait_for_selector(catalog_container_selector, state="visible" , timeout=5000 ) except Exception as e: print (f"⚠️ 列表呼出失败,跳过当前WOA | 错误: {e} " ) return False current_items = await article_list_page.query_selector_all(target_selector) if idx >= len (current_items): print ("✅ 已遍历完所有可抓取的文章" ) return False current_item = current_items[idx] await current_item.scroll_into_view_if_needed() title_el = await current_item.query_selector("xxxxx, xxxxx" ) if not title_el: return True preview_title = (await title_el.inner_text()).strip() if ONLY_TODAY_ARTICLE: time_el = await current_item.query_selector("xxxxx" ) if not time_el: time_text = await current_item.evaluate("""(node) => { const parentLi = node.closest('xxxxx'); const timeNode = parentLi ? parentLi.querySelector('xxxxx') : null; return timeNode ? timeNode.innerText : ""; }""" ) else : time_text = await time_el.inner_text() if not time_text: print (f"⚠️ 无法获取时间,跳过: {preview_title} " ) return True if not is_today_article(time_text): print (f"⏭️ 跳过旧文: {preview_title} ({time_text} )" ) return True print (f"🚀 正在加载文章: {preview_title} " ) try : await current_item.click() except Exception as e: print (f"⚠️ 文章点击失败,跳过 | 标题: {preview_title} | 错误: {e} " ) return True try : await article_list_page.wait_for_selector("xxxxx" , timeout=10000 ) print ("✅ 文章加载成功,开始提取正文" ) except Exception as e: print (f"❌ 文章加载超时,跳过: {preview_title} " ) return True
于是,自动小助手便大功告成啦极度疲惫