閑魚多線程商品爬蟲程序(帶GUI界面)
功能特點:
- 多線程爬取提高效率
- 圖形界面操作更友好
- 自動保存和加載Cookie
- 支持下載商品圖片并插入到Excel
- 修復了.mpo等特殊圖片格式導致的保存失敗問題
- 實時日志顯示
注意事項:
- 本程序僅用于學習交流,請勿用于商業或非法用途
- 請遵守網站robots協議,合理控制請求頻率
import requests
import time
import hashlib
import threading
import queue
import json
import os
from openpyxl import Workbook
from openpyxl.drawing.image import Image
from openpyxl.utils import get_column_letter
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
from datetime import datetime
# 常量配置
API_URL = "https://h5api.m.goofish.com/h5/mtop.taobao.idlemtopsearch.pc.search/1.0/"
APP_KEY = "34839810"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0"
REQUEST_DELAY = 1.5 # 請求間隔時間(秒)
COOKIE_FILE = "xianyu_cookie.json" # Cookie存儲文件
MAX_WORKERS = 5 # 最大工作線程數
IMAGE_FOLDER = "xianyu_images" # 圖片保存文件夾
SUPPORTED_IMAGE_FORMATS = ['jpg', 'jpeg', 'png', 'gif', 'bmp'] # 支持的圖片格式
class XianyuSpiderGUI:
def __init__(self, root):
self.root = root
self.root.title("閑魚商品爬蟲 v2.1(修復圖片格式問題)")
self.root.geometry("800x600")
self.root.resizable(True, True)
# 創建日志隊列
self.log_queue = queue.Queue()
# 創建狀態變量
self.is_running = False
self.cookie = ""
self.token = ""
# 確保圖片文件夾存在
if not os.path.exists(IMAGE_FOLDER):
os.makedirs(IMAGE_FOLDER)
# 加載保存的Cookie
self.load_cookie()
# 創建界面
self.create_widgets()
# 啟動日志更新線程
threading.Thread(target=self.update_log, daemon=True).start()
def create_widgets(self):
# 創建主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 輸入區域
input_frame = ttk.LabelFrame(main_frame, text="爬取設置", padding="10")
input_frame.pack(fill=tk.X, pady=(0, 10))
# Cookie輸入
ttk.Label(input_frame, text="Cookie:").grid(row=0, column=0, sticky=tk.W, pady=2)
self.cookie_var = tk.StringVar(value=self.cookie)
self.cookie_entry = ttk.Entry(input_frame, textvariable=self.cookie_var, width=80)
self.cookie_entry.grid(row=0, column=1, sticky=tk.EW, padx=(5, 0), pady=2)
# 關鍵詞輸入
ttk.Label(input_frame, text="關鍵詞:").grid(row=1, column=0, sticky=tk.W, pady=2)
self.keyword_var = tk.StringVar()
self.keyword_entry = ttk.Entry(input_frame, textvariable=self.keyword_var, width=30)
self.keyword_entry.grid(row=1, column=1, sticky=tk.W, padx=(5, 0), pady=2)
# 頁數設置
ttk.Label(input_frame, text="爬取頁數:").grid(row=2, column=0, sticky=tk.W, pady=2)
self.page_var = tk.StringVar(value="1")
self.page_entry = ttk.Entry(input_frame, textvariable=self.page_var, width=10)
self.page_entry.grid(row=2, column=1, sticky=tk.W, padx=(5, 0), pady=2)
# 線程控制
ttk.Label(input_frame, text="線程數:").grid(row=3, column=0, sticky=tk.W, pady=2)
self.thread_var = tk.StringVar(value=str(MAX_WORKERS))
self.thread_combo = ttk.Combobox(input_frame, textvariable=self.thread_var, width=5, state="readonly")
self.thread_combo['values'] = tuple(str(i) for i in range(1, MAX_WORKERS + 1))
self.thread_combo.grid(row=3, column=1, sticky=tk.W, padx=(5, 0), pady=2)
# 按鈕區域
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=tk.X, pady=(0, 10))
self.start_btn = ttk.Button(btn_frame, text="開始爬取", command=self.start_crawling)
self.start_btn.pack(side=tk.LEFT, padx=(0, 10))
self.stop_btn = ttk.Button(btn_frame, text="停止", command=self.stop_crawling, state=tk.DISABLED)
self.stop_btn.pack(side=tk.LEFT)
ttk.Button(btn_frame, text="清除日志", command=self.clear_log).pack(side=tk.RIGHT)
ttk.Button(btn_frame, text="保存Cookie", command=self.save_cookie).pack(side=tk.RIGHT, padx=(0, 10))
# 日志區域
log_frame = ttk.LabelFrame(main_frame, text="日志信息", padding="10")
log_frame.pack(fill=tk.BOTH, expand=True)
self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, state=tk.DISABLED)
self.log_text.pack(fill=tk.BOTH, expand=True)
# 狀態欄
self.status_var = tk.StringVar(value="就緒")
status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
status_bar.pack(side=tk.BOTTOM, fill=tk.X)
def log_message(self, message):
"""將消息添加到日志隊列"""
timestamp = datetime.now().strftime("%H:%M:%S")
self.log_queue.put(f"[{timestamp}] {message}")
def update_log(self):
"""定期檢查并更新日志顯示"""
try:
while True:
# 從隊列獲取所有可用消息
messages = []
while not self.log_queue.empty():
messages.append(self.log_queue.get_nowait())
if messages:
self.log_text.config(state=tk.NORMAL)
for msg in messages:
self.log_text.insert(tk.END, msg + "\n")
self.log_text.config(state=tk.DISABLED)
self.log_text.yview(tk.END)
time.sleep(0.1)
except Exception as e:
print(f"日志更新線程錯誤: {e}")
def clear_log(self):
"""清除日志內容"""
self.log_text.config(state=tk.NORMAL)
self.log_text.delete(1.0, tk.END)
self.log_text.config(state=tk.DISABLED)
def load_cookie(self):
"""從文件加載Cookie"""
try:
if os.path.exists(COOKIE_FILE):
with open(COOKIE_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
self.cookie = data.get('cookie', '')
self.log_message(f"已加載保存的Cookie")
except Exception as e:
self.log_message(f"⚠️ 加載Cookie失敗: {e}")
def save_cookie(self):
"""保存Cookie到文件"""
self.cookie = self.cookie_var.get().strip()
if not self.cookie:
messagebox.showwarning("警告", "Cookie不能為空")
return
try:
with open(COOKIE_FILE, 'w', encoding='utf-8') as f:
json.dump({'cookie': self.cookie}, f, ensure_ascii=False, indent=2)
self.log_message("✅ Cookie保存成功")
except Exception as e:
self.log_message(f"❌ 保存Cookie失敗: {e}")
def extract_token(self):
"""從cookie中提取token"""
cookie = self.cookie_var.get().strip()
if not cookie:
self.log_message("❌ Cookie不能為空")
return None
try:
# 查找_m_h5_tk在cookie中的位置
if "_m_h5_tk=" not in cookie:
self.log_message("❌ Cookie中缺少_m_h5_tk值")
return None
start_idx = cookie.find("_m_h5_tk=") + len("_m_h5_tk=")
end_idx = cookie.find(";", start_idx)
if end_idx == -1:
end_idx = len(cookie)
m_h5_tk_value = cookie[start_idx:end_idx]
token = m_h5_tk_value.split('_')[0]
return token
except Exception as e:
self.log_message(f"❌ 提取Token失敗: {e}")
return None
def validate_inputs(self):
"""驗證用戶輸入"""
# 驗證Cookie
self.cookie = self.cookie_var.get().strip()
if not self.cookie:
messagebox.showwarning("警告", "Cookie不能為空")
return False
# 提取token
self.token = self.extract_token()
if not self.token:
return False
# 驗證關鍵詞
keyword = self.keyword_var.get().strip()
if not keyword:
messagebox.showwarning("警告", "關鍵詞不能為空")
return False
# 驗證頁數
try:
pages = int(self.page_var.get())
if pages <= 0:
messagebox.showwarning("警告", "頁數必須是正整數")
return False
except ValueError:
messagebox.showwarning("警告", "頁數必須是數字")
return False
# 驗證線程數
try:
threads = int(self.thread_var.get())
if threads <= 0 or threads > MAX_WORKERS:
messagebox.showwarning("警告", f"線程數必須在1-{MAX_WORKERS}之間")
return False
except ValueError:
messagebox.showwarning("警告", "線程數必須是數字")
return False
return True
def start_crawling(self):
"""開始爬取"""
if self.is_running:
return
if not self.validate_inputs():
return
# 更新界面狀態
self.is_running = True
self.start_btn.config(state=tk.DISABLED)
self.stop_btn.config(state=tk.NORMAL)
self.status_var.set("運行中...")
# 獲取參數
keyword = self.keyword_var.get().strip()
pages = int(self.page_var.get())
threads = int(self.thread_var.get())
# 創建任務隊列
self.task_queue = queue.Queue()
for page in range(1, pages + 1):
self.task_queue.put(page)
# 創建結果列表
self.results = []
self.failed_pages = []
# 創建并啟動工作線程
self.workers = []
for i in range(threads):
worker = threading.Thread(target=self.worker_task, args=(keyword,))
worker.daemon = True
worker.start()
self.workers.append(worker)
self.log_message(f"啟動工作線程 #{i + 1}")
# 啟動監視線程
threading.Thread(target=self.monitor_workers).start()
def worker_task(self, keyword):
"""工作線程任務"""
while not self.task_queue.empty() and self.is_running:
try:
page = self.task_queue.get_nowait()
self.log_message(f"線程 {threading.current_thread().name} 開始爬取第 {page} 頁")
# 發送請求
products = self.fetch_products(keyword, page)
if products is None:
self.failed_pages.append(page)
self.log_message(f"⚠️ 第 {page} 頁爬取失敗")
else:
# 解析商品
for product in products:
parsed = self.parse_product(product)
if parsed:
self.results.append(parsed)
self.log_message(f"✅ 第 {page} 頁完成, 獲取 {len(products)} 條商品")
# 任務完成
self.task_queue.task_done()
# 請求間隔
time.sleep(REQUEST_DELAY)
except queue.Empty:
break
except Exception as e:
self.log_message(f"⚠️ 線程錯誤: {str(e)}")
def monitor_workers(self):
"""監視工作線程狀態"""
while any(worker.is_alive() for worker in self.workers):
time.sleep(0.5)
# 所有線程完成后
self.root.after(0, self.finish_crawling)
def finish_crawling(self):
"""爬取完成后的處理"""
self.is_running = False
# 保存結果
if self.results:
keyword = self.keyword_var.get().strip()
self.save_results(keyword)
self.log_message(f"✅ 爬取完成! 共獲取 {len(self.results)} 條商品數據")
else:
self.log_message("⚠️ 未獲取到任何商品數據")
# 報告失敗頁
if self.failed_pages:
self.log_message(f"⚠️ 以下頁爬取失敗: {', '.join(map(str, self.failed_pages))}")
# 更新界面狀態
self.start_btn.config(state=tk.NORMAL)
self.stop_btn.config(state=tk.DISABLED)
self.status_var.set("就緒")
def stop_crawling(self):
"""停止爬取"""
self.is_running = False
self.log_message("⏹ 正在停止爬取...")
self.status_var.set("正在停止...")
def fetch_products(self, keyword, page):
"""獲取商品數據"""
try:
# 生成簽名和請求參數
sign, timestamp, request_data = self.generate_sign(page, keyword)
# 構建請求頭
headers = {
"cookie": self.cookie,
"origin": "https://www.goofish.com",
"referer": "https://www.goofish.com/",
"user-agent": USER_AGENT
}
# 構建請求參數
params = {
"jsv": "2.7.2",
"appKey": APP_KEY,
"t": timestamp,
"sign": sign,
"v": "1.0",
"type": "originaljson",
"accountSite": "xianyu",
"dataType": "json",
"timeout": "20000",
"api": "mtop.taobao.idlemtopsearch.pc.search",
"sessionOption": "AutoLoginOnly",
"spm_cnt": "a21ybx.search.0.0",
"spm_pre": "a21ybx.home.searchSuggest.1.4c053da64Wswaf",
"log_id": "4c053da64Wswaf"
}
# 發送POST請求
response = requests.post(
url=API_URL,
headers=headers,
params=params,
data={"data": request_data},
timeout=15
)
# 檢查響應狀態
response.raise_for_status()
# 檢查是否Token失效
result = response.json()
if "ret" in result and "FAIL_SYS_TOKEN_EXOIRED" in result["ret"][0]:
self.log_message("❌ Token已過期,請更新Cookie")
self.root.after(0, self.handle_token_expired)
return None
# 檢查返回數據是否包含商品列表
if "data" in result and "resultList" in result["data"]:
return result["data"]["resultList"]
else:
self.log_message(f"❌ 第{page}頁數據格式異常")
return None
except requests.exceptions.RequestException as e:
self.log_message(f"❌ 第{page}頁請求失敗: {str(e)}")
return None
except Exception as e:
self.log_message(f"❌ 第{page}頁數據處理錯誤: {str(e)}")
return None
def handle_token_expired(self):
"""處理Token過期"""
self.stop_crawling()
messagebox.showwarning("Cookie失效", "您的Cookie已過期,請更新Cookie后重試")
def generate_sign(self, page, keyword):
"""生成簽名"""
# 生成當前時間戳(毫秒級)
timestamp = int(time.time() * 1000)
# 構建請求數據
request_data = (
f'{{"pageNumber":{page},"keyword":"{keyword}","fromFilter":false,'
f'"rowsPerPage":30,"sortValue":"","sortField":"","customDistance":"",'
f'"gps":"","propValueStr":"","customGps":"","searchReqFromPage":"pcSearch",'
f'"extraFilterValue":"","userPositionJson":""}}'
)
# 構建簽名原始字符串
sign_str = f"{self.token}&{timestamp}&{APP_KEY}&{request_data}"
# 計算MD5簽名
md5 = hashlib.md5()
md5.update(sign_str.encode("utf-8"))
sign = md5.hexdigest()
return sign, timestamp, request_data
def parse_product(self, product):
"""解析商品數據(包含圖片URL提取)"""
try:
# 從原始數據中提取核心字段
item_data = product["data"]["item"]["main"]["exContent"]
click_params = product["data"]["item"]["main"]["clickParam"]["args"]
# 提取圖片URL
pic_url = item_data.get("picUrl", "")
if not pic_url:
pic_url = click_params.get("picUrl", "無圖片鏈接")
# 提取用戶昵稱
user_name = item_data.get("userNick", "未知用戶").strip()
# 提取標題和包郵信息
title = item_data.get("title", "").strip()
post_fee = click_params.get("tagname", "不包郵")
description = f"{post_fee} +++ {title}"
# 提取商品鏈接
item_id = item_data.get("itemId", "")
product_url = f"https://www.goofish.com/item?id={item_id}"
# 提取價格和地區
price = click_params.get("price", "未知")
area = item_data.get("area", "未知地區").strip()
return {
"user_name": user_name,
"description": description,
"url": product_url,
"price": price,
"area": area,
"pic_url": pic_url, # 新增圖片URL字段
"item_id": item_id # 新增商品ID用于圖片命名
}
except Exception as e:
self.log_message(f"⚠️ 商品數據解析異常: {str(e)}")
return None
def download_image(self, pic_url, item_id):
"""下載圖片到本地,支持格式過濾和轉換"""
try:
# 1. 跳過無圖片鏈接的情況
if pic_url == "無圖片鏈接":
return None
# 2. 處理URL中的特殊字符,補全協議頭
if not pic_url.startswith(('http://', 'https://')):
pic_url = f"http:{pic_url}" if pic_url.startswith('//') else f"https://{pic_url}"
# 3. 提取并驗證文件后綴
file_ext = pic_url.split(".")[-1].split("?")[0].lower()
# 處理不支持的格式(如.mpo)
if file_ext not in SUPPORTED_IMAGE_FORMATS:
self.log_message(f"⚠️ 檢測到不支持的圖片格式: {file_ext},將自動轉換為jpg")
file_ext = "jpg" # 強制使用支持的格式
# 4. 圖片文件名:用item_id避免重復
file_name = f"{IMAGE_FOLDER}/{item_id}.{file_ext}"
# 已下載則直接返回路徑
if os.path.exists(file_name):
return file_name
# 5. 發送請求下載圖片
headers = {"User-Agent": USER_AGENT}
response = requests.get(pic_url, headers=headers, timeout=10)
response.raise_for_status()
# 6. 保存圖片到本地
with open(file_name, "wb") as f:
f.write(response.content)
# 7. 嘗試轉換特殊格式圖片為jpg(如果是從mpo等格式轉換而來)
if file_ext == "jpg" and pic_url.lower().endswith(('mpo', 'mpo?')):
try:
from PIL import Image as PILImage
# 打開圖片并轉換為RGB模式(兼容jpg)
img = PILImage.open(file_name)
rgb_img = img.convert('RGB')
# 覆蓋保存為jpg
rgb_img.save(file_name)
self.log_message(f"✅ 特殊圖片格式已成功轉換為jpg: {item_id}.jpg")
except Exception as e:
self.log_message(f"⚠️ 圖片格式轉換失敗: {str(e)},使用原始文件")
return file_name
except Exception as e:
self.log_message(f"⚠️ 圖片下載失敗({pic_url}): {str(e)}")
return None
def save_results(self, keyword):
"""保存結果到Excel(包含圖片插入)"""
try:
# 創建Excel工作簿和工作表
wb = Workbook()
ws = wb.active
# 添加表頭(包含圖片列)
ws.append(["用戶名字", "簡介", "鏈接", "價格", "地區", "圖片"])
# 調整列寬
ws.column_dimensions["A"].width = 15 # 用戶名
ws.column_dimensions["B"].width = 40 # 簡介
ws.column_dimensions["C"].width = 30 # 鏈接
ws.column_dimensions["F"].width = 20 # 圖片列
# 寫入數據
for row_idx, data in enumerate(self.results, start=2): # 從第2行開始(跳過表頭)
# 寫入文字信息
ws.cell(row=row_idx, column=1, value=data["user_name"])
ws.cell(row=row_idx, column=2, value=data["description"])
ws.cell(row=row_idx, column=3, value=data["url"])
ws.cell(row=row_idx, column=4, value=data["price"])
ws.cell(row=row_idx, column=5, value=data["area"])
# 下載并插入圖片
pic_path = self.download_image(data["pic_url"], data["item_id"])
if pic_path and os.path.exists(pic_path):
try:
# 插入圖片
img = Image(pic_path)
# 調整圖片大小
img.width = 100
img.height = 100
# 插入到F列當前行
ws.add_image(img, anchor=f"F{row_idx}")
# 調整行高以適應圖片
ws.row_dimensions[row_idx].height = 80
except Exception as e:
self.log_message(f"⚠️ 圖片插入失敗({pic_path}): {str(e)}")
# 生成文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{keyword}_{timestamp}.xlsx"
wb.save(filename)
self.log_message(f"✅ 數據已保存到 {filename}(含圖片)")
except Exception as e:
self.log_message(f"❌ 保存Excel文件失敗: {str(e)}")
def main():
root = tk.Tk()
app = XianyuSpiderGUI(root)
root.mainloop()
if __name__ == "__main__":
main()
![圖片[1]-如何高效獲取咸魚商品:省錢秘籍與購物技巧](http://www.oilmaxhydraulic.com.cn/wp-content/uploads/2025/07/d2b5ca33bd20250725131123-1024x602.png)
![圖片[2]-如何高效獲取咸魚商品:省錢秘籍與購物技巧](http://www.oilmaxhydraulic.com.cn/wp-content/uploads/2025/07/d2b5ca33bd20250725131142-1024x506.png)
? 版權聲明
THE END