微信小程序自动爬取
一、整体架构
┌─────────────────────────────────────────────┐
main()
while True:
spider.crawl_incremental()
time.sleep(1800)
└─────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
TataSpider
- token_mgr: TokenManager (管理认证)
- session: requests.Session (复用连接)
- fetch_page(): 请求单页数据
- crawl_incremental(): 增量抓取主逻辑
- _load_existing(): 加载本地已有数据
- _save_posts(): 保存到 JSON 文件
└─────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
TokenManager
- get_token(): 获取有效token(过期自动刷新)
- _load_token(): (从文件读取)
- _refresh_token(): lamda→code→token
└─────────────────────────────────────────────┘
二、运行流程
启动
│
├─ 1. 初始化 TokenManager
│ └─ 尝试从 token.json 加载已有 token
│
├─ 2. 检查 token 是否有效
│ ├─ 有效 → 直接用
│ └─ 无效 → 自动刷新(lamda → code → 后端换token)
│
├─ 3. 进入 while True 循环
│ │
│ ├─ crawl_incremental()
│ │ ├─ 加载本地 posts.json
│ │ ├─ 逐页请求 API
│ │ ├─ 跳过已有/重复帖子
│ │ ├─ 无新增时停止翻页
│ │ └─ 保存结果到 posts.json
│ │
│ └─ time.sleep(1800) 等待下一轮
│
└─ 循环往复...
三、核心知识点
3.1 requests.Session
self.session = requests.Session()
self.session.headers.update({...})
保持 TCP 连接复用,比每次
requests.get()新建连接更快设置一次 headers,后续所有请求自动带上
自动管理 cookie
3.2 JWT Token 结构
eyJhbGci...(头部).eyJ1c2Vy...(payload).yaXRVikx...(签名)
三段用 . 分隔:
头部:算法信息
payload:用户数据(user_id、过期时间等),base64 编码
签名:防篡改,我们不验证
关键字段:iat(issued at):签发时间exp(expiration):过期时间
3.3 threading.Lock
with self._lock:
# 同一时刻只有一个线程能执行这里的代码
防止多线程同时刷新 token,避免重复请求浪费资源。
3.4 @staticmethod
@staticmethod
def _decode_jwt_payload(token: str) -> dict:
不需要 self,纯输入输出的工具函数,逻辑上归属于类所以放在里面。
3.5 增量抓取逻辑
# 核心判断
if latest_time and create_time < latest_time:
stop = True # 遇到比本地最新还旧的数据,停止
break
if post_id in seen_ids:
continue # 跳过本次已见过的重复帖子
if page_new == 0:
break # 整页无新增,停止翻页
3.6 json.dump 参数
json.dump(data, f, ensure_ascii=False, indent=2)
ensure_ascii=False:允许中文直接写入,不转成\uXXXXindent=2:格式化输出,每层缩进2空格,方便人看
四、配置参数说明
# lamda 设备(模拟微信登录用)
LAMDA_URL = 'your_url'
LAMDA_DEVICE_SERIAL = 'your_serial'
MINI_APP_ID = 'your_app_id'
# 后端接口
LOGIN_URL = '...' # 用 code 换 token 的接口
POSTS_URL = '...' # 帖子列表 API
# 抓取参数
COLLEGE_ID = 25 # 学校ID
PAGE_SIZE = 15 # 每页条数(可尝试改大,取决于服务端限制)
MAX_PAGES = 50 # 最多翻页数(安全上限)
REQUEST_DELAY = 1 # 每页间隔秒数(太快可能被封)
CRAWL_INTERVAL = 1800 # 定时抓取间隔(30分钟)
五、Token 刷新流程
1. lamda 设备模拟微信小程序登录
GET http://lamda/wechat_js_login?app_id=xxx&device_serial=xxx
返回: {"code": 0, "data": {"payload": "{\"code\": \"0a1b2c...\"}"}}
2. 用微信 code 调后端登录接口
POST https://xxx/getMiniTokenByOauth2Code
Body: {"code": "0a1b2c..."}
返回: {"code": 10000, "data": {"token": "eyJ..."}
3. 解码 JWT 获取过期时间,保存到 token.json
刷新时机:token 剩余有效期不足 1 小时时自动触发。
六、数据存储
存储位置:./data/posts.json
格式:按 create_time 降序排列的数组
[
{
"id": 12345,
"title": "帖子标题",
"content": "帖子内容...",
"create_time": "2026-05-20 15:30:00",
"category": "闲聊吃瓜",
"user_id": 89198,
"nickname": "用户昵称",
"is_anonymous": 0
},
...
]
局限性:
每次全量读写,数据量大了会慢
几千到一两万条没问题
更大量级考虑换 SQLite 或数据库
七、可改进的点
PAGE_SIZE 调大:减少请求次数,前提是服务端支持
换 SQLite 存储:避免全量读写 JSON
异常重试:网络请求失败时自动重试几次
日志文件:把 print 换成 logging,输出到文件方便排查
多学校支持:COLLEGE_ID 改成列表,循环抓取多个学校
八、与框架版的区别
完整代码
# -*- coding:utf-8 -*-
"""
小程序爬虫(独立版,不依赖框架)
功能:
1. 自动通过 lamda 获取微信 code → 换取 JWT token
2. token 过期自动刷新,有效期内复用本地缓存
3. 增量抓取帖子列表,自动去重,按时间判断停止
4. 数据存本地 JSON 文件
5. 定时循环抓取
使用方式:
python standalone_spider.py
"""
import os
import json
import time
import base64
import datetime
import requests
import threading
# ==================== 配置区 ====================
# lamda 设备(用于自动获取微信登录 code)
LAMDA_URL = 'your_url'
LAMDA_DEVICE_SERIAL = 'your_serial'
MINI_APP_ID = 'your_app_id'
# 后端接口
LOGIN_URL = 'your_url'
POSTS_URL = 'your_url'
# 抓取参数
COLLEGE_ID = 25
PAGE_SIZE = 15
MAX_PAGES = 50
REQUEST_DELAY = 1 # 每页请求间隔(秒)
CRAWL_INTERVAL = 1800 # 定时抓取间隔(秒),30分钟
# 文件路径
TOKEN_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'token.json')
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data')
DATA_FILE = os.path.join(DATA_DIR, 'posts.json')
# ==================== Token 管理 ====================
class TokenManager:
"""
JWT Token 管理器
- 从本地文件加载 token,有效则直接用
- 过期或不存在时,自动通过 lamda → code → token 刷新
- 线程安全(多线程场景下不会重复刷新)
"""
def __init__(self, token_file=TOKEN_FILE):
self.token_file = token_file
self.token = None
self.exp_timestamp = 0
self._lock = threading.Lock() # 防止多线程同时刷新
self._load_token()
def _load_token(self):
"""从本地文件加载已保存的 token"""
if not os.path.exists(self.token_file):
return
try:
with open(self.token_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.token = data.get('token')
self.exp_timestamp = data.get('exp_timestamp', 0)
except (json.JSONDecodeError, IOError):
pass
def _save_token(self, token: str, payload: dict):
"""保存 token 到本地文件"""
data = {
'token': token,
'user_id': payload.get('user_id'),
'main_user_id': payload.get('main_user_id'),
'client_type': payload.get('client_type'),
'iat_timestamp': payload.get('iat'),
'exp_timestamp': payload.get('exp'),
'iat_readable': datetime.datetime.fromtimestamp(payload.get('iat', 0)).strftime('%Y-%m-%d %H:%M:%S'),
'exp_readable': datetime.datetime.fromtimestamp(payload.get('exp', 0)).strftime('%Y-%m-%d %H:%M:%S'),
'extracted_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
}
with open(self.token_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"[Token] 已保存,有效期至: {data['exp_readable']}")
@staticmethod
def _decode_jwt_payload(token: str) -> dict:
"""解码 JWT payload,提取里面的用户信息和过期时间(通用方法,不验证签名)"""
try:
parts = token.split('.')
if len(parts) != 3:
return {}
payload = parts[1]
payload += '=' * (4 - len(payload) % 4) # base64 补齐
decoded = base64.urlsafe_b64decode(payload)
return json.loads(decoded)
except Exception:
return {}
def is_valid(self) -> bool:
"""判断 token 是否有效(剩余超过 1 小时视为有效)"""
if not self.token:
return False
remaining = self.exp_timestamp - time.time()
return remaining > 3600
def get_token(self) -> str:
"""
获取有效 token
- 有效直接返回
- 无效则自动刷新
- 线程安全:多线程调用时只会刷新一次
"""
with self._lock:
if self.is_valid():
return self.token
print("[Token] 需要刷新...")
new_token = self._refresh_token()
if new_token:
self.token = new_token
payload = self._decode_jwt_payload(new_token)
self.exp_timestamp = payload.get('exp', 0)
self._save_token(new_token, payload)
return self.token
def _refresh_token(self) -> str:
"""刷新流程:lamda 获取 code → 后端换 token"""
code = self._get_code_from_lamda()
if not code:
print("[Token] 获取 code 失败")
return None
token = self._exchange_token(code)
if not token:
print("[Token] 换取 token 失败")
return None
return token
def _get_code_from_lamda(self) -> str:
"""通过 lamda 设备触发微信小程序登录,获取 code"""
try:
url = f"{LAMDA_URL}?app_id={MINI_APP_ID}&device_serial={LAMDA_DEVICE_SERIAL}"
resp = requests.get(url, timeout=30)
result = resp.json()
if result.get('code') != 0:
print(f"[Token] lamda 返回错误: {result.get('message')}")
return None
payload = result.get('data', {}).get('payload', '{}')
if isinstance(payload, str):
payload = json.loads(payload)
return payload.get('code')
except Exception as e:
print(f"[Token] 请求 lamda 异常: {e}")
return None
def _exchange_token(self, code: str) -> str:
"""用微信 code 调用后端登录接口换取 JWT token"""
try:
resp = requests.post(
LOGIN_URL,
json={'code': code},
headers={'Content-Type': 'application/json'},
timeout=10
)
result = resp.json()
if result.get('code') != 10000:
print(f"[Token] 登录接口返回: {result.get('msg')}")
return None
return result.get('data', {}).get('token')
except Exception as e:
print(f"[Token] 请求登录接口异常: {e}")
return None
# ==================== 爬虫主体 ====================
class TataSpider:
"""塔塔校园圈帖子爬虫"""
def __init__(self):
self.token_mgr = TokenManager()
# Session 保持连接复用,设置一次 headers 后续请求自动带上
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Linux; Android 12) AppleWebKit/537.36',
})
def _get_headers(self) -> dict:
"""获取带 token 的请求头"""
token = self.token_mgr.get_token()
if not token:
raise RuntimeError("无法获取有效 token")
return {'Authorization': f'Bearer {token}'}
def fetch_page(self, page: int) -> list:
"""请求单页帖子列表,返回帖子数组"""
params = {
'page': page,
'list_rows': PAGE_SIZE,
'posts_category_id': '',
'title': '',
'content': '',
'college_id': COLLEGE_ID,
}
try:
resp = self.session.get(
POSTS_URL,
headers=self._get_headers(),
params=params,
timeout=15,
)
result = resp.json()
if result.get('code') == 10000:
return result.get('data', {}).get('data', [])
else:
print(f" [错误] 第{page}页: {result.get('msg')}")
if 'token' in str(result.get('msg', '')).lower():
self.token_mgr.token = None # token 失效,下次自动刷新
return []
except Exception as e:
print(f" [异常] 第{page}页: {e}")
return []
@staticmethod
def extract_fields(post: dict) -> dict:
"""从原始帖子数据中提取需要的字段"""
user = post.get('user', {}) or {}
category = post.get('posts_category', {}) or {}
return {
'id': post.get('id'),
'title': post.get('title', ''),
'content': post.get('content', ''),
'create_time': post.get('create_time', ''),
'category': category.get('category_name', ''),
'user_id': post.get('user_id'),
'nickname': user.get('nickname', ''),
'is_anonymous': post.get('is_anonymous', 0),
}
def crawl_incremental(self) -> list:
"""
增量抓取:
- 加载本地已有数据
- 逐页请求,跳过已有的帖子
- 遇到旧数据或无新增时停止
- 保存结果
"""
# 加载已有数据
existing = self._load_existing()
latest_time = ''
if existing:
latest_time = max(p.get('create_time', '') for p in existing.values())
print(f"[本地] 已有 {len(existing)} 条,最新: {latest_time}")
else:
print("[本地] 无历史数据,全量抓取")
# 记录本次已见过的 id(处理重复出现的帖子)
seen_ids = set()
new_count = 0
stop = False
for page in range(1, MAX_PAGES + 1):
if stop:
break
print(f" 第 {page} 页...", end=' ')
posts = self.fetch_page(page)
if not posts:
print("无数据")
break
page_new = 0
for post in posts:
post_id = post.get('id')
# 跳过本次已见过的(同一帖子可能在多页出现)
if post_id in seen_ids:
continue
seen_ids.add(post_id)
create_time = post.get('create_time', '')
# 增量判断:遇到比本地最新还旧的数据就停止
if latest_time and create_time <= latest_time:
if post_id in existing:
continue
if create_time < latest_time:
stop = True
break
# 新数据,保存
existing[post_id] = self.extract_fields(post)
page_new += 1
new_count += 1
print(f"新增 {page_new} 条" + (" [到达边界]" if stop else ""))
# 本页无新增就停止,避免空翻页
if page_new == 0:
break
time.sleep(REQUEST_DELAY)
# 保存到文件
self._save_posts(existing)
print(f"\n[完成] 新增 {new_count} 条,总计 {len(existing)} 条")
return list(existing.values())
def _load_existing(self) -> dict:
"""加载本地已有数据,返回 {id: post} 字典"""
if not os.path.exists(DATA_FILE):
return {}
try:
with open(DATA_FILE, 'r', encoding='utf-8') as f:
posts_list = json.load(f)
return {p['id']: p for p in posts_list}
except (json.JSONDecodeError, IOError):
return {}
def _save_posts(self, posts_dict: dict):
"""保存数据到 JSON 文件,按时间降序排列"""
posts_list = sorted(
posts_dict.values(),
key=lambda x: x.get('create_time', ''),
reverse=True
)
os.makedirs(DATA_DIR, exist_ok=True)
with open(DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(posts_list, f, ensure_ascii=False, indent=2)
print(f"[保存] {DATA_FILE}")
# ==================== 入口 ====================
def main():
print("=" * 60)
print("塔塔校园圈爬虫(独立版)")
print(f"目标接口: {POSTS_URL}")
print(f"学校ID: {COLLEGE_ID}")
print(f"抓取间隔: {CRAWL_INTERVAL} 秒")
print("=" * 60)
spider = TataSpider()
# 检查 token 是否可用
token = spider.token_mgr.get_token()
if not token:
print("\n[错误] 无法获取有效 token")
print("请确保:")
print(" 1. lamda 设备在线且可访问")
print(" 2. 或手动将 token 写入 token.json")
return
remaining = (spider.token_mgr.exp_timestamp - time.time()) / 3600
print(f"[Token] 有效,剩余 {remaining:.1f} 小时\n")
# 定时循环抓取
while True:
print(f"\n[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始抓取")
print("-" * 40)
try:
spider.crawl_incremental()
except Exception as e:
print(f"[异常] 抓取出错: {e}")
print(f"\n下次抓取在 {CRAWL_INTERVAL // 60} 分钟后...")
time.sleep(CRAWL_INTERVAL)
if __name__ == '__main__':
main()
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员Orion
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果