request请求和json解析
任务描述
序列号接口:your_url
查看代理接口:your_url
代理地区接口:your_ur;
把序列号接口里的serial 参数遍历一遍;如果serial是172开头的,就通过查看代理接口访问一下,如果存在 addr值,就获取下来,到 代理地区接口里看下是哪儿个地区的;然后都记录下来,放到表里 =。
最终结果就是 表里有三个字段,serial,是否有代理,代理地区。
知识补充
Request
1. 基本 GET 请求
resp = requests.get(url, timeout=30)
requests.get() 发起一个 HTTP GET 请求,返回一个 Response 对象.
2. timeout 参数
requests.get(url, timeout=3)
requests.get(url, timeout=10)
requests.get(url, timeout=30)
设置请求超时时间(秒)。超过这个时间服务器没有响应,就会抛出 requests.exceptions.Timeout 异常。传单个数字表示连接+读取的总超时,也可以传元组 (connect_timeout, read_timeout) 分别控制。
3. raise_for_status()
resp.raise_for_status()
检查 HTTP 状态码,如果不是 2xx(比如 404、500),会抛出 requests.exceptions.HTTPError 异常。不调用的话,即使返回 404 也不会报错,需要自己判断 resp.status_code。
解决的是超时以外的异常状态,如果是404等等的话就不再往下走了,即HTTPError。
4. resp.json()
data = resp.json()
将响应体按 JSON 解析,返回 Python 字典/列表。如果响应内容不是合法 JSON,会抛出 ValueError(Python 3.5+实际是 json.JSONDecodeError,它是 ValueError 的子类)。
5. 异常体系
except requests.exceptions.Timeout:
...
except requests.exceptions.RequestException as e:
...
requests 的异常继承关系:
RequestException # 所有 requests 异常的基类
├── ConnectionError # 网络连接失败
├── Timeout # 超时
├── HTTPError # raise_for_status() 抛出
├── TooManyRedirects # 重定向过多
└── ...
文件中先捕获具体的 Timeout,再用基类 RequestException 兜底其他网络错误,这是推荐的写法——从具体到宽泛。
CSV
1. 打开文件的参数
with open("proxy_results.csv", "w", newline="", encoding="utf-8-sig") as f:
"w":写模式,会覆盖已有文件。newline="":必须加。csv 模块自己控制换行符,如果不设为空字符串,Windows 上会出现多余空行。open()的newline参数默认是None,不是""。newline=None(默认):Python 会做"通用换行符转换"。写入时把\n自动转成系统对应的换行符(Windows 上是\r\n)newline="":关闭这个转换,写什么就是什么。
问题在于 csv 模块写入时自己会加\r\n,如果你不设newline="",Python 又会把其中的\n再转一次成\r\n,最终变成\r\r\n,表现就是 CSV 每行之间多一个空行。
所以用 csv 模块写文件时,newline=""是固定搭配。encoding="utf-8-sig":UTF-8 带 BOM 头。这样 Excel 打开 CSV 时能正确识别中文,不会乱码。如果用普通utf-8,Excel 可能显示乱码。
2. DictWriter
writer = csv.DictWriter(f, fieldnames=["serial", "是否有代理", "代理地区"])
用字典写入 CSV。fieldnames 定义列的顺序和表头名称。写入时每行数据是一个字典,key 对应 fieldnames 中的列名。
3. writeheader()
writer.writeheader()
写入表头行(就是 fieldnames 列表里的内容)。不调用的话 CSV 文件没有标题行。
4. writerows()
writer.writerows(rows)
一次性写入多行数据。rows 是一个字典列表,每个字典的 key 要和 fieldnames 对应。也可以用 writerow() 一次写一行。
字典
1. dict.get(key, default)
device.get("serial", "")
proxy_data.get("data", {})
port2area.get(port)
安全取值。key 不存在时返回 default(不指定 default 则返回 None),不会抛 KeyError。对比直接用 dict[key],key 不存在会报错。
2. 链式 .get()
proxy_data.get("data", {}).get("result", {}).get("data", {}).get("addr")
逐层取嵌套字典的值。每层给默认值 {},这样即使中间某层不存在,下一个 .get() 也不会报 AttributeError(因为是对空字典调用 get,而不是对 None 调用)。最后一层不给默认值,取不到就是 None。
3. 字典字面量构造
rows.append({
"serial": serial,
"是否有代理": has_proxy,
"代理地区": area
})
用 {} 直接创建字典,key 是字符串,value 是变量。
4. 空字典初始化
port2area = {}
创建一个空字典,后续通过赋值往里填数据。
5. 字典赋值
port2area[port] = area
直接用 dict[key] = value 添加或覆盖键值对。key 存在就覆盖,不存在就新增。
6. item.get() 配合 if 判断
port = item.get("socks5_port")
area = item.get("area")
if port and area:
port2area[port] = area
先取值,再判断是否有效(非 None、非空字符串、非 0 等 falsy 值),有效才写入。
多线程工作
以下是文件中用到的多线程知识点:
1. ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=20) as executor:
线程池。max_workers=20 表示最多同时跑 20 个线程。用 with 语句管理,退出时自动等待所有任务完成并释放资源。
2. executor.submit()
futures = {
executor.submit(process_device, device, port2area): device
for device in devices
}
提交任务到线程池。第一个参数是要执行的函数,后面是传给该函数的参数。返回一个 Future 对象,代表一个"将来会有结果"的任务。这里用字典推导式一次性提交所有任务,key 是 future,value 是对应的 device(方便出错时定位是哪个设备)。
3. as_completed()
for future in as_completed(futures):
result = future.result()
哪个任务先完成就先返回哪个,不按提交顺序。适合你不关心结果顺序、只想尽快处理的场景。如果需要按提交顺序拿结果,可以直接遍历 futures 列表或用 executor.map()。
4. future.result()
result = future.result()
获取任务的返回值。如果任务内部抛了异常,调用 .result() 时会重新抛出那个异常。当前代码里 process_device 内部已经自己 try-except 了,所以不会抛到外面。
5. 线程安全注意点
rows.append(result)
list.append() 在 CPython 中是线程安全的(GIL 保护),所以这里直接 append 没问题。但如果是更复杂的操作(比如先读再写),就需要加锁(threading.Lock)。
完整代码
import requests
import csv
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_devices():
"""获取所有设备序列号"""
url = "your_url"
resp = requests.get(url, timeout=30)
resp.raise_for_status()
return resp.json()["data"]["devices"]
def query_proxy(serial: str):
"""查询代理接口,带超时和异常处理"""
url = (
"your_url" + serial
)
try:
# 设置超时 10 秒
resp = requests.get(url, timeout=3)
resp.raise_for_status() # HTTP 状态码不是 200 会抛异常
data = resp.json()
return data
except requests.exceptions.Timeout:
print(f"[TIMEOUT] 查询代理超时 serial={serial}")
return {}
except requests.exceptions.RequestException as e:
print(f"[ERROR] 查询代理失败 serial={serial}: {e}")
return {}
except ValueError as e:
# JSON decode 错误
print(f"[ERROR] 返回解析失败 serial={serial}: {e}")
return {}
def query_area():
"""查询代理地区"""
url = (
"your_url"
)
try:
resp = requests.get(url, timeout=10)
return resp.json()
except Exception as e:
print(f"[ERROR] 查询地区失败")
return {}
def process_device(device, port2area):
"""处理单个设备,返回结果字典或 None(跳过的设备)"""
serial = device.get("serial", "")
if not serial.startswith("172"):
return None
proxy_data = query_proxy(serial)
addr = (
proxy_data
.get("data", {})
.get("result", {})
.get("data", {})
.get("addr")
)
has_proxy = "是" if addr else "否"
port = None
if addr:
try:
port_str = addr.split(":")[-1]
port = int(port_str)
except Exception as e:
print(f"解析端口失败: {e}")
area = ""
if port:
area = port2area.get(port)
return {
"serial": serial,
"是否有代理": has_proxy,
"代理地区": area
}
def main():
data_list = query_area()
port2area = {}
for item in data_list:
port = item.get("socks5_port")
area = item.get("area")
if port and area:
port2area[port] = area
print("开始遍历...")
devices = get_devices()
rows = []
count = 0
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {
executor.submit(process_device, device, port2area): device
for device in devices
}
for future in as_completed(futures):
count += 1
if count % 20 == 0:
print(f"已处理 {count}/{len(devices)} 条")
result = future.result()
if result is not None:
rows.append(result)
# 写入CSV
with open(
"proxy_results.csv",
"w",
newline="",
encoding="utf-8-sig"
) as f:
writer = csv.DictWriter(
f,
fieldnames=[
"serial",
"是否有代理",
"代理地区"
]
)
writer.writeheader()
writer.writerows(rows)
print("完成,已输出 proxy_result.csv")
if __name__ == "__main__":
main()