一、整体架构

┌─────────────────────────────────────────────┐
                   main()                     
                                             
   while True:                               
        spider.crawl_incremental()            
        time.sleep(1800)                      
└─────────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────┐
               TataSpider                      
                                              
    - token_mgr: TokenManager (管理认证)       
    - session: requests.Session (复用连接)     
    - fetch_page(): 请求单页数据                
    - crawl_incremental(): 增量抓取主逻辑       
    - _load_existing(): 加载本地已有数据        
    - _save_posts(): 保存到 JSON 文件          
└─────────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────┐
              TokenManager                    
                                              
    - get_token(): 获取有效token(过期自动刷新)   
    - _load_token(): (从文件读取)              
    - _refresh_token(): lamda→code→token      
└─────────────────────────────────────────────┘

二、运行流程

启动
 │
 ├─ 1. 初始化 TokenManager
 │      └─ 尝试从 token.json 加载已有 token
 │
 ├─ 2. 检查 token 是否有效
 │      ├─ 有效 → 直接用
 │      └─ 无效 → 自动刷新(lamda → code → 后端换token)
 │
 ├─ 3. 进入 while True 循环
 │      │
 │      ├─ crawl_incremental()
 │      │    ├─ 加载本地 posts.json
 │      │    ├─ 逐页请求 API
 │      │    ├─ 跳过已有/重复帖子
 │      │    ├─ 无新增时停止翻页
 │      │    └─ 保存结果到 posts.json
 │      │
 │      └─ time.sleep(1800) 等待下一轮
 │
 └─ 循环往复...

三、核心知识点

3.1 requests.Session

self.session = requests.Session()

self.session.headers.update({...})
  • 保持 TCP 连接复用,比每次 requests.get() 新建连接更快

  • 设置一次 headers,后续所有请求自动带上

  • 自动管理 cookie

3.2 JWT Token 结构

eyJhbGci...(头部).eyJ1c2Vy...(payload).yaXRVikx...(签名)

三段用 . 分隔:

  • 头部:算法信息

  • payload:用户数据(user_id、过期时间等),base64 编码

  • 签名:防篡改,我们不验证
    关键字段:

  • iat(issued at):签发时间

  • exp(expiration):过期时间

3.3 threading.Lock

with self._lock:
    # 同一时刻只有一个线程能执行这里的代码

防止多线程同时刷新 token,避免重复请求浪费资源。

3.4 @staticmethod

@staticmethod
def _decode_jwt_payload(token: str) -> dict:

不需要 self,纯输入输出的工具函数,逻辑上归属于类所以放在里面。

3.5 增量抓取逻辑

# 核心判断
if latest_time and create_time < latest_time:
    stop = True   # 遇到比本地最新还旧的数据,停止
    break
if post_id in seen_ids:
    continue      # 跳过本次已见过的重复帖子
if page_new == 0:
    break         # 整页无新增,停止翻页

3.6 json.dump 参数

json.dump(data, f, ensure_ascii=False, indent=2)
  • ensure_ascii=False:允许中文直接写入,不转成 \uXXXX

  • indent=2:格式化输出,每层缩进2空格,方便人看

四、配置参数说明

# lamda 设备(模拟微信登录用)
LAMDA_URL = 'your_url'
LAMDA_DEVICE_SERIAL = 'your_serial'
MINI_APP_ID = 'your_app_id'
# 后端接口
LOGIN_URL = '...'   # 用 code 换 token 的接口
POSTS_URL = '...'   # 帖子列表 API
# 抓取参数
COLLEGE_ID = 25       # 学校ID
PAGE_SIZE = 15        # 每页条数(可尝试改大,取决于服务端限制)
MAX_PAGES = 50        # 最多翻页数(安全上限)
REQUEST_DELAY = 1     # 每页间隔秒数(太快可能被封)
CRAWL_INTERVAL = 1800 # 定时抓取间隔(30分钟)

五、Token 刷新流程

1. lamda 设备模拟微信小程序登录
   GET http://lamda/wechat_js_login?app_id=xxx&device_serial=xxx
   返回: {"code": 0, "data": {"payload": "{\"code\": \"0a1b2c...\"}"}}

2. 用微信 code 调后端登录接口
   POST https://xxx/getMiniTokenByOauth2Code
   Body: {"code": "0a1b2c..."}
   返回: {"code": 10000, "data": {"token": "eyJ..."}
3. 解码 JWT 获取过期时间,保存到 token.json

刷新时机:token 剩余有效期不足 1 小时时自动触发。

六、数据存储

存储位置:./data/posts.json
格式:按 create_time 降序排列的数组

[
  {
    "id": 12345,
    "title": "帖子标题",
    "content": "帖子内容...",
    "create_time": "2026-05-20 15:30:00",
    "category": "闲聊吃瓜",
    "user_id": 89198,
    "nickname": "用户昵称",
    "is_anonymous": 0
  },
  ...
]

局限性:

  • 每次全量读写,数据量大了会慢

  • 几千到一两万条没问题

  • 更大量级考虑换 SQLite 或数据库

七、可改进的点

  1. PAGE_SIZE 调大:减少请求次数,前提是服务端支持

  2. 换 SQLite 存储:避免全量读写 JSON

  3. 异常重试:网络请求失败时自动重试几次

  4. 日志文件:把 print 换成 logging,输出到文件方便排查

  5. 多学校支持:COLLEGE_ID 改成列表,循环抓取多个学校

八、与框架版的区别

对比项

独立版

框架版

定时调度

while True + sleep

框架 run.py 自动调度

去重

本地 JSON 的 id 判断

Redis dedup

入库

写 JSON 文件

Pipeline(Redis/MongoDB/MySQL)

代理

框架自动管理代理池

日志

print

框架日志系统 + Redis

监控

心跳上报 + 配置监测

部署

直接 python 运行

守护进程 + 多进程

完整代码

# -*- coding:utf-8 -*-
"""
小程序爬虫(独立版,不依赖框架)

功能:
1. 自动通过 lamda 获取微信 code → 换取 JWT token
2. token 过期自动刷新,有效期内复用本地缓存
3. 增量抓取帖子列表,自动去重,按时间判断停止
4. 数据存本地 JSON 文件
5. 定时循环抓取

使用方式:
    python standalone_spider.py
"""

import os
import json
import time
import base64
import datetime
import requests
import threading


# ==================== 配置区 ====================

# lamda 设备(用于自动获取微信登录 code)
LAMDA_URL = 'your_url'
LAMDA_DEVICE_SERIAL = 'your_serial'
MINI_APP_ID = 'your_app_id'

# 后端接口
LOGIN_URL = 'your_url'
POSTS_URL = 'your_url'

# 抓取参数
COLLEGE_ID = 25
PAGE_SIZE = 15
MAX_PAGES = 50
REQUEST_DELAY = 1       # 每页请求间隔(秒)
CRAWL_INTERVAL = 1800   # 定时抓取间隔(秒),30分钟

# 文件路径
TOKEN_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'token.json')
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data')
DATA_FILE = os.path.join(DATA_DIR, 'posts.json')


# ==================== Token 管理 ====================

class TokenManager:
    """
    JWT Token 管理器
    - 从本地文件加载 token,有效则直接用
    - 过期或不存在时,自动通过 lamda → code → token 刷新
    - 线程安全(多线程场景下不会重复刷新)
    """

    def __init__(self, token_file=TOKEN_FILE):
        self.token_file = token_file
        self.token = None
        self.exp_timestamp = 0
        self._lock = threading.Lock()  # 防止多线程同时刷新
        self._load_token()

    def _load_token(self):
        """从本地文件加载已保存的 token"""
        if not os.path.exists(self.token_file):
            return
        try:
            with open(self.token_file, 'r', encoding='utf-8') as f:
                data = json.load(f)
            self.token = data.get('token')
            self.exp_timestamp = data.get('exp_timestamp', 0)
        except (json.JSONDecodeError, IOError):
            pass

    def _save_token(self, token: str, payload: dict):
        """保存 token 到本地文件"""
        data = {
            'token': token,
            'user_id': payload.get('user_id'),
            'main_user_id': payload.get('main_user_id'),
            'client_type': payload.get('client_type'),
            'iat_timestamp': payload.get('iat'),
            'exp_timestamp': payload.get('exp'),
            'iat_readable': datetime.datetime.fromtimestamp(payload.get('iat', 0)).strftime('%Y-%m-%d %H:%M:%S'),
            'exp_readable': datetime.datetime.fromtimestamp(payload.get('exp', 0)).strftime('%Y-%m-%d %H:%M:%S'),
            'extracted_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        }
        with open(self.token_file, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        print(f"[Token] 已保存,有效期至: {data['exp_readable']}")

    @staticmethod
    def _decode_jwt_payload(token: str) -> dict:
        """解码 JWT payload,提取里面的用户信息和过期时间(通用方法,不验证签名)"""
        try:
            parts = token.split('.')
            if len(parts) != 3:
                return {}
            payload = parts[1]
            payload += '=' * (4 - len(payload) % 4)  # base64 补齐
            decoded = base64.urlsafe_b64decode(payload)
            return json.loads(decoded)
        except Exception:
            return {}

    def is_valid(self) -> bool:
        """判断 token 是否有效(剩余超过 1 小时视为有效)"""
        if not self.token:
            return False
        remaining = self.exp_timestamp - time.time()
        return remaining > 3600

    def get_token(self) -> str:
        """
        获取有效 token
        - 有效直接返回
        - 无效则自动刷新
        - 线程安全:多线程调用时只会刷新一次
        """
        with self._lock:
            if self.is_valid():
                return self.token
            print("[Token] 需要刷新...")
            new_token = self._refresh_token()
            if new_token:
                self.token = new_token
                payload = self._decode_jwt_payload(new_token)
                self.exp_timestamp = payload.get('exp', 0)
                self._save_token(new_token, payload)
            return self.token

    def _refresh_token(self) -> str:
        """刷新流程:lamda 获取 code → 后端换 token"""
        code = self._get_code_from_lamda()
        if not code:
            print("[Token] 获取 code 失败")
            return None
        token = self._exchange_token(code)
        if not token:
            print("[Token] 换取 token 失败")
            return None
        return token

    def _get_code_from_lamda(self) -> str:
        """通过 lamda 设备触发微信小程序登录,获取 code"""
        try:
            url = f"{LAMDA_URL}?app_id={MINI_APP_ID}&device_serial={LAMDA_DEVICE_SERIAL}"
            resp = requests.get(url, timeout=30)
            result = resp.json()

            if result.get('code') != 0:
                print(f"[Token] lamda 返回错误: {result.get('message')}")
                return None

            payload = result.get('data', {}).get('payload', '{}')
            if isinstance(payload, str):
                payload = json.loads(payload)
            return payload.get('code')

        except Exception as e:
            print(f"[Token] 请求 lamda 异常: {e}")
            return None

    def _exchange_token(self, code: str) -> str:
        """用微信 code 调用后端登录接口换取 JWT token"""
        try:
            resp = requests.post(
                LOGIN_URL,
                json={'code': code},
                headers={'Content-Type': 'application/json'},
                timeout=10
            )
            result = resp.json()

            if result.get('code') != 10000:
                print(f"[Token] 登录接口返回: {result.get('msg')}")
                return None
            return result.get('data', {}).get('token')

        except Exception as e:
            print(f"[Token] 请求登录接口异常: {e}")
            return None


# ==================== 爬虫主体 ====================

class TataSpider:
    """塔塔校园圈帖子爬虫"""

    def __init__(self):
        self.token_mgr = TokenManager()
        # Session 保持连接复用,设置一次 headers 后续请求自动带上
        self.session = requests.Session()
        self.session.headers.update({
            'Content-Type': 'application/json',
            'User-Agent': 'Mozilla/5.0 (Linux; Android 12) AppleWebKit/537.36',
        })

    def _get_headers(self) -> dict:
        """获取带 token 的请求头"""
        token = self.token_mgr.get_token()
        if not token:
            raise RuntimeError("无法获取有效 token")
        return {'Authorization': f'Bearer {token}'}

    def fetch_page(self, page: int) -> list:
        """请求单页帖子列表,返回帖子数组"""
        params = {
            'page': page,
            'list_rows': PAGE_SIZE,
            'posts_category_id': '',
            'title': '',
            'content': '',
            'college_id': COLLEGE_ID,
        }
        try:
            resp = self.session.get(
                POSTS_URL,
                headers=self._get_headers(),
                params=params,
                timeout=15,
            )
            result = resp.json()

            if result.get('code') == 10000:
                return result.get('data', {}).get('data', [])
            else:
                print(f"  [错误] 第{page}页: {result.get('msg')}")
                if 'token' in str(result.get('msg', '')).lower():
                    self.token_mgr.token = None  # token 失效,下次自动刷新
                return []

        except Exception as e:
            print(f"  [异常] 第{page}页: {e}")
            return []

    @staticmethod
    def extract_fields(post: dict) -> dict:
        """从原始帖子数据中提取需要的字段"""
        user = post.get('user', {}) or {}
        category = post.get('posts_category', {}) or {}
        return {
            'id': post.get('id'),
            'title': post.get('title', ''),
            'content': post.get('content', ''),
            'create_time': post.get('create_time', ''),
            'category': category.get('category_name', ''),
            'user_id': post.get('user_id'),
            'nickname': user.get('nickname', ''),
            'is_anonymous': post.get('is_anonymous', 0),
        }

    def crawl_incremental(self) -> list:
        """
        增量抓取:
        - 加载本地已有数据
        - 逐页请求,跳过已有的帖子
        - 遇到旧数据或无新增时停止
        - 保存结果
        """
        # 加载已有数据
        existing = self._load_existing()
        latest_time = ''
        if existing:
            latest_time = max(p.get('create_time', '') for p in existing.values())
            print(f"[本地] 已有 {len(existing)} 条,最新: {latest_time}")
        else:
            print("[本地] 无历史数据,全量抓取")

        # 记录本次已见过的 id(处理重复出现的帖子)
        seen_ids = set()
        new_count = 0
        stop = False

        for page in range(1, MAX_PAGES + 1):
            if stop:
                break

            print(f"  第 {page} 页...", end=' ')
            posts = self.fetch_page(page)

            if not posts:
                print("无数据")
                break

            page_new = 0
            for post in posts:
                post_id = post.get('id')

                # 跳过本次已见过的(同一帖子可能在多页出现)
                if post_id in seen_ids:
                    continue
                seen_ids.add(post_id)

                create_time = post.get('create_time', '')

                # 增量判断:遇到比本地最新还旧的数据就停止
                if latest_time and create_time <= latest_time:
                    if post_id in existing:
                        continue
                    if create_time < latest_time:
                        stop = True
                        break

                # 新数据,保存
                existing[post_id] = self.extract_fields(post)
                page_new += 1
                new_count += 1

            print(f"新增 {page_new} 条" + (" [到达边界]" if stop else ""))

            # 本页无新增就停止,避免空翻页
            if page_new == 0:
                break

            time.sleep(REQUEST_DELAY)

        # 保存到文件
        self._save_posts(existing)
        print(f"\n[完成] 新增 {new_count} 条,总计 {len(existing)} 条")
        return list(existing.values())

    def _load_existing(self) -> dict:
        """加载本地已有数据,返回 {id: post} 字典"""
        if not os.path.exists(DATA_FILE):
            return {}
        try:
            with open(DATA_FILE, 'r', encoding='utf-8') as f:
                posts_list = json.load(f)
            return {p['id']: p for p in posts_list}
        except (json.JSONDecodeError, IOError):
            return {}

    def _save_posts(self, posts_dict: dict):
        """保存数据到 JSON 文件,按时间降序排列"""
        posts_list = sorted(
            posts_dict.values(),
            key=lambda x: x.get('create_time', ''),
            reverse=True
        )
        os.makedirs(DATA_DIR, exist_ok=True)
        with open(DATA_FILE, 'w', encoding='utf-8') as f:
            json.dump(posts_list, f, ensure_ascii=False, indent=2)
        print(f"[保存] {DATA_FILE}")


# ==================== 入口 ====================

def main():
    print("=" * 60)
    print("塔塔校园圈爬虫(独立版)")
    print(f"目标接口: {POSTS_URL}")
    print(f"学校ID: {COLLEGE_ID}")
    print(f"抓取间隔: {CRAWL_INTERVAL} 秒")
    print("=" * 60)

    spider = TataSpider()

    # 检查 token 是否可用
    token = spider.token_mgr.get_token()
    if not token:
        print("\n[错误] 无法获取有效 token")
        print("请确保:")
        print("  1. lamda 设备在线且可访问")
        print("  2. 或手动将 token 写入 token.json")
        return

    remaining = (spider.token_mgr.exp_timestamp - time.time()) / 3600
    print(f"[Token] 有效,剩余 {remaining:.1f} 小时\n")

    # 定时循环抓取
    while True:
        print(f"\n[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始抓取")
        print("-" * 40)

        try:
            spider.crawl_incremental()
        except Exception as e:
            print(f"[异常] 抓取出错: {e}")

        print(f"\n下次抓取在 {CRAWL_INTERVAL // 60} 分钟后...")
        time.sleep(CRAWL_INTERVAL)


if __name__ == '__main__':
    main()