如何高效獲取咸魚商品:省錢秘籍與購物技巧

閑魚多線程商品爬蟲程序(帶GUI界面)
功能特點:

  1. 多線程爬取提高效率
  2. 圖形界面操作更友好
  3. 自動保存和加載Cookie
  4. 支持下載商品圖片并插入到Excel
  5. 修復了.mpo等特殊圖片格式導致的保存失敗問題
  6. 實時日志顯示

注意事項:

  1. 本程序僅用于學習交流,請勿用于商業或非法用途
  2. 請遵守網站robots協議,合理控制請求頻率
import requests
import time
import hashlib
import threading
import queue
import json
import os
from openpyxl import Workbook
from openpyxl.drawing.image import Image
from openpyxl.utils import get_column_letter
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
from datetime import datetime
 
 
# 常量配置
API_URL = "https://h5api.m.goofish.com/h5/mtop.taobao.idlemtopsearch.pc.search/1.0/"
APP_KEY = "34839810"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0"
REQUEST_DELAY = 1.5  # 請求間隔時間(秒)
COOKIE_FILE = "xianyu_cookie.json"  # Cookie存儲文件
MAX_WORKERS = 5  # 最大工作線程數
IMAGE_FOLDER = "xianyu_images"  # 圖片保存文件夾
SUPPORTED_IMAGE_FORMATS = ['jpg', 'jpeg', 'png', 'gif', 'bmp']  # 支持的圖片格式
 
 
class XianyuSpiderGUI:
    def __init__(self, root):
        self.root = root
        self.root.title("閑魚商品爬蟲 v2.1(修復圖片格式問題)")
        self.root.geometry("800x600")
        self.root.resizable(True, True)
 
        # 創建日志隊列
        self.log_queue = queue.Queue()
 
        # 創建狀態變量
        self.is_running = False
        self.cookie = ""
        self.token = ""
 
        # 確保圖片文件夾存在
        if not os.path.exists(IMAGE_FOLDER):
            os.makedirs(IMAGE_FOLDER)
 
        # 加載保存的Cookie
        self.load_cookie()
 
        # 創建界面
        self.create_widgets()
 
        # 啟動日志更新線程
        threading.Thread(target=self.update_log, daemon=True).start()
 
    def create_widgets(self):
        # 創建主框架
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.pack(fill=tk.BOTH, expand=True)
 
        # 輸入區域
        input_frame = ttk.LabelFrame(main_frame, text="爬取設置", padding="10")
        input_frame.pack(fill=tk.X, pady=(0, 10))
 
        # Cookie輸入
        ttk.Label(input_frame, text="Cookie:").grid(row=0, column=0, sticky=tk.W, pady=2)
        self.cookie_var = tk.StringVar(value=self.cookie)
        self.cookie_entry = ttk.Entry(input_frame, textvariable=self.cookie_var, width=80)
        self.cookie_entry.grid(row=0, column=1, sticky=tk.EW, padx=(5, 0), pady=2)
 
        # 關鍵詞輸入
        ttk.Label(input_frame, text="關鍵詞:").grid(row=1, column=0, sticky=tk.W, pady=2)
        self.keyword_var = tk.StringVar()
        self.keyword_entry = ttk.Entry(input_frame, textvariable=self.keyword_var, width=30)
        self.keyword_entry.grid(row=1, column=1, sticky=tk.W, padx=(5, 0), pady=2)
 
        # 頁數設置
        ttk.Label(input_frame, text="爬取頁數:").grid(row=2, column=0, sticky=tk.W, pady=2)
        self.page_var = tk.StringVar(value="1")
        self.page_entry = ttk.Entry(input_frame, textvariable=self.page_var, width=10)
        self.page_entry.grid(row=2, column=1, sticky=tk.W, padx=(5, 0), pady=2)
 
        # 線程控制
        ttk.Label(input_frame, text="線程數:").grid(row=3, column=0, sticky=tk.W, pady=2)
        self.thread_var = tk.StringVar(value=str(MAX_WORKERS))
        self.thread_combo = ttk.Combobox(input_frame, textvariable=self.thread_var, width=5, state="readonly")
        self.thread_combo['values'] = tuple(str(i) for i in range(1, MAX_WORKERS + 1))
        self.thread_combo.grid(row=3, column=1, sticky=tk.W, padx=(5, 0), pady=2)
 
        # 按鈕區域
        btn_frame = ttk.Frame(main_frame)
        btn_frame.pack(fill=tk.X, pady=(0, 10))
 
        self.start_btn = ttk.Button(btn_frame, text="開始爬取", command=self.start_crawling)
        self.start_btn.pack(side=tk.LEFT, padx=(0, 10))
 
        self.stop_btn = ttk.Button(btn_frame, text="停止", command=self.stop_crawling, state=tk.DISABLED)
        self.stop_btn.pack(side=tk.LEFT)
 
        ttk.Button(btn_frame, text="清除日志", command=self.clear_log).pack(side=tk.RIGHT)
        ttk.Button(btn_frame, text="保存Cookie", command=self.save_cookie).pack(side=tk.RIGHT, padx=(0, 10))
 
        # 日志區域
        log_frame = ttk.LabelFrame(main_frame, text="日志信息", padding="10")
        log_frame.pack(fill=tk.BOTH, expand=True)
 
        self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, state=tk.DISABLED)
        self.log_text.pack(fill=tk.BOTH, expand=True)
 
        # 狀態欄
        self.status_var = tk.StringVar(value="就緒")
        status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
        status_bar.pack(side=tk.BOTTOM, fill=tk.X)
 
    def log_message(self, message):
        """將消息添加到日志隊列"""
        timestamp = datetime.now().strftime("%H:%M:%S")
        self.log_queue.put(f"[{timestamp}] {message}")
 
    def update_log(self):
        """定期檢查并更新日志顯示"""
        try:
            while True:
                # 從隊列獲取所有可用消息
                messages = []
                while not self.log_queue.empty():
                    messages.append(self.log_queue.get_nowait())
 
                if messages:
                    self.log_text.config(state=tk.NORMAL)
                    for msg in messages:
                        self.log_text.insert(tk.END, msg + "\n")
                    self.log_text.config(state=tk.DISABLED)
                    self.log_text.yview(tk.END)
 
                time.sleep(0.1)
        except Exception as e:
            print(f"日志更新線程錯誤: {e}")
 
    def clear_log(self):
        """清除日志內容"""
        self.log_text.config(state=tk.NORMAL)
        self.log_text.delete(1.0, tk.END)
        self.log_text.config(state=tk.DISABLED)
 
    def load_cookie(self):
        """從文件加載Cookie"""
        try:
            if os.path.exists(COOKIE_FILE):
                with open(COOKIE_FILE, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                    self.cookie = data.get('cookie', '')
                    self.log_message(f"已加載保存的Cookie")
        except Exception as e:
            self.log_message(f"⚠️ 加載Cookie失敗: {e}")
 
    def save_cookie(self):
        """保存Cookie到文件"""
        self.cookie = self.cookie_var.get().strip()
        if not self.cookie:
            messagebox.showwarning("警告", "Cookie不能為空")
            return
 
        try:
            with open(COOKIE_FILE, 'w', encoding='utf-8') as f:
                json.dump({'cookie': self.cookie}, f, ensure_ascii=False, indent=2)
            self.log_message("✅ Cookie保存成功")
        except Exception as e:
            self.log_message(f"❌ 保存Cookie失敗: {e}")
 
    def extract_token(self):
        """從cookie中提取token"""
        cookie = self.cookie_var.get().strip()
        if not cookie:
            self.log_message("❌ Cookie不能為空")
            return None
 
        try:
            # 查找_m_h5_tk在cookie中的位置
            if "_m_h5_tk=" not in cookie:
                self.log_message("❌ Cookie中缺少_m_h5_tk值")
                return None
 
            start_idx = cookie.find("_m_h5_tk=") + len("_m_h5_tk=")
            end_idx = cookie.find(";", start_idx)
            if end_idx == -1:
                end_idx = len(cookie)
 
            m_h5_tk_value = cookie[start_idx:end_idx]
            token = m_h5_tk_value.split('_')[0]
            return token
        except Exception as e:
            self.log_message(f"❌ 提取Token失敗: {e}")
            return None
 
    def validate_inputs(self):
        """驗證用戶輸入"""
        # 驗證Cookie
        self.cookie = self.cookie_var.get().strip()
        if not self.cookie:
            messagebox.showwarning("警告", "Cookie不能為空")
            return False
 
        # 提取token
        self.token = self.extract_token()
        if not self.token:
            return False
 
        # 驗證關鍵詞
        keyword = self.keyword_var.get().strip()
        if not keyword:
            messagebox.showwarning("警告", "關鍵詞不能為空")
            return False
 
        # 驗證頁數
        try:
            pages = int(self.page_var.get())
            if pages <= 0:
                messagebox.showwarning("警告", "頁數必須是正整數")
                return False
        except ValueError:
            messagebox.showwarning("警告", "頁數必須是數字")
            return False
 
        # 驗證線程數
        try:
            threads = int(self.thread_var.get())
            if threads <= 0 or threads > MAX_WORKERS:
                messagebox.showwarning("警告", f"線程數必須在1-{MAX_WORKERS}之間")
                return False
        except ValueError:
            messagebox.showwarning("警告", "線程數必須是數字")
            return False
 
        return True
 
    def start_crawling(self):
        """開始爬取"""
        if self.is_running:
            return
 
        if not self.validate_inputs():
            return
 
        # 更新界面狀態
        self.is_running = True
        self.start_btn.config(state=tk.DISABLED)
        self.stop_btn.config(state=tk.NORMAL)
        self.status_var.set("運行中...")
 
        # 獲取參數
        keyword = self.keyword_var.get().strip()
        pages = int(self.page_var.get())
        threads = int(self.thread_var.get())
 
        # 創建任務隊列
        self.task_queue = queue.Queue()
        for page in range(1, pages + 1):
            self.task_queue.put(page)
 
        # 創建結果列表
        self.results = []
        self.failed_pages = []
 
        # 創建并啟動工作線程
        self.workers = []
        for i in range(threads):
            worker = threading.Thread(target=self.worker_task, args=(keyword,))
            worker.daemon = True
            worker.start()
            self.workers.append(worker)
            self.log_message(f"啟動工作線程 #{i + 1}")
 
        # 啟動監視線程
        threading.Thread(target=self.monitor_workers).start()
 
    def worker_task(self, keyword):
        """工作線程任務"""
        while not self.task_queue.empty() and self.is_running:
            try:
                page = self.task_queue.get_nowait()
                self.log_message(f"線程 {threading.current_thread().name} 開始爬取第 {page} 頁")
 
                # 發送請求
                products = self.fetch_products(keyword, page)
 
                if products is None:
                    self.failed_pages.append(page)
                    self.log_message(f"⚠️ 第 {page} 頁爬取失敗")
                else:
                    # 解析商品
                    for product in products:
                        parsed = self.parse_product(product)
                        if parsed:
                            self.results.append(parsed)
 
                    self.log_message(f"✅ 第 {page} 頁完成, 獲取 {len(products)} 條商品")
 
                # 任務完成
                self.task_queue.task_done()
 
                # 請求間隔
                time.sleep(REQUEST_DELAY)
 
            except queue.Empty:
                break
            except Exception as e:
                self.log_message(f"⚠️ 線程錯誤: {str(e)}")
 
    def monitor_workers(self):
        """監視工作線程狀態"""
        while any(worker.is_alive() for worker in self.workers):
            time.sleep(0.5)
 
        # 所有線程完成后
        self.root.after(0, self.finish_crawling)
 
    def finish_crawling(self):
        """爬取完成后的處理"""
        self.is_running = False
 
        # 保存結果
        if self.results:
            keyword = self.keyword_var.get().strip()
            self.save_results(keyword)
            self.log_message(f"✅ 爬取完成! 共獲取 {len(self.results)} 條商品數據")
        else:
            self.log_message("⚠️ 未獲取到任何商品數據")
 
        # 報告失敗頁
        if self.failed_pages:
            self.log_message(f"⚠️ 以下頁爬取失敗: {', '.join(map(str, self.failed_pages))}")
 
        # 更新界面狀態
        self.start_btn.config(state=tk.NORMAL)
        self.stop_btn.config(state=tk.DISABLED)
        self.status_var.set("就緒")
 
    def stop_crawling(self):
        """停止爬取"""
        self.is_running = False
        self.log_message("⏹ 正在停止爬取...")
        self.status_var.set("正在停止...")
 
    def fetch_products(self, keyword, page):
        """獲取商品數據"""
        try:
            # 生成簽名和請求參數
            sign, timestamp, request_data = self.generate_sign(page, keyword)
 
            # 構建請求頭
            headers = {
                "cookie": self.cookie,
                "origin": "https://www.goofish.com",
                "referer": "https://www.goofish.com/",
                "user-agent": USER_AGENT
            }
 
            # 構建請求參數
            params = {
                "jsv": "2.7.2",
                "appKey": APP_KEY,
                "t": timestamp,
                "sign": sign,
                "v": "1.0",
                "type": "originaljson",
                "accountSite": "xianyu",
                "dataType": "json",
                "timeout": "20000",
                "api": "mtop.taobao.idlemtopsearch.pc.search",
                "sessionOption": "AutoLoginOnly",
                "spm_cnt": "a21ybx.search.0.0",
                "spm_pre": "a21ybx.home.searchSuggest.1.4c053da64Wswaf",
                "log_id": "4c053da64Wswaf"
            }
 
            # 發送POST請求
            response = requests.post(
                url=API_URL,
                headers=headers,
                params=params,
                data={"data": request_data},
                timeout=15
            )
 
            # 檢查響應狀態
            response.raise_for_status()
 
            # 檢查是否Token失效
            result = response.json()
            if "ret" in result and "FAIL_SYS_TOKEN_EXOIRED" in result["ret"][0]:
                self.log_message("❌ Token已過期,請更新Cookie")
                self.root.after(0, self.handle_token_expired)
                return None
 
            # 檢查返回數據是否包含商品列表
            if "data" in result and "resultList" in result["data"]:
                return result["data"]["resultList"]
            else:
                self.log_message(f"❌ 第{page}頁數據格式異常")
                return None
 
        except requests.exceptions.RequestException as e:
            self.log_message(f"❌ 第{page}頁請求失敗: {str(e)}")
            return None
        except Exception as e:
            self.log_message(f"❌ 第{page}頁數據處理錯誤: {str(e)}")
            return None
 
    def handle_token_expired(self):
        """處理Token過期"""
        self.stop_crawling()
        messagebox.showwarning("Cookie失效", "您的Cookie已過期,請更新Cookie后重試")
 
    def generate_sign(self, page, keyword):
        """生成簽名"""
        # 生成當前時間戳(毫秒級)
        timestamp = int(time.time() * 1000)
 
        # 構建請求數據
        request_data = (
            f'{{"pageNumber":{page},"keyword":"{keyword}","fromFilter":false,'
            f'"rowsPerPage":30,"sortValue":"","sortField":"","customDistance":"",'
            f'"gps":"","propValueStr":"","customGps":"","searchReqFromPage":"pcSearch",'
            f'"extraFilterValue":"","userPositionJson":""}}'
        )
 
        # 構建簽名原始字符串
        sign_str = f"{self.token}&{timestamp}&{APP_KEY}&{request_data}"
 
        # 計算MD5簽名
        md5 = hashlib.md5()
        md5.update(sign_str.encode("utf-8"))
        sign = md5.hexdigest()
 
        return sign, timestamp, request_data
 
    def parse_product(self, product):
        """解析商品數據(包含圖片URL提取)"""
        try:
            # 從原始數據中提取核心字段
            item_data = product["data"]["item"]["main"]["exContent"]
            click_params = product["data"]["item"]["main"]["clickParam"]["args"]
 
            # 提取圖片URL
            pic_url = item_data.get("picUrl", "")
            if not pic_url:
                pic_url = click_params.get("picUrl", "無圖片鏈接")
 
            # 提取用戶昵稱
            user_name = item_data.get("userNick", "未知用戶").strip()
 
            # 提取標題和包郵信息
            title = item_data.get("title", "").strip()
            post_fee = click_params.get("tagname", "不包郵")
            description = f"{post_fee} +++ {title}"
 
            # 提取商品鏈接
            item_id = item_data.get("itemId", "")
            product_url = f"https://www.goofish.com/item?id={item_id}"
 
            # 提取價格和地區
            price = click_params.get("price", "未知")
            area = item_data.get("area", "未知地區").strip()
 
            return {
                "user_name": user_name,
                "description": description,
                "url": product_url,
                "price": price,
                "area": area,
                "pic_url": pic_url,  # 新增圖片URL字段
                "item_id": item_id  # 新增商品ID用于圖片命名
            }
 
        except Exception as e:
            self.log_message(f"⚠️ 商品數據解析異常: {str(e)}")
            return None
 
    def download_image(self, pic_url, item_id):
        """下載圖片到本地,支持格式過濾和轉換"""
        try:
            # 1. 跳過無圖片鏈接的情況
            if pic_url == "無圖片鏈接":
                return None
 
            # 2. 處理URL中的特殊字符,補全協議頭
            if not pic_url.startswith(('http://', 'https://')):
                pic_url = f"http:{pic_url}" if pic_url.startswith('//') else f"https://{pic_url}"
 
            # 3. 提取并驗證文件后綴
            file_ext = pic_url.split(".")[-1].split("?")[0].lower()
 
            # 處理不支持的格式(如.mpo)
            if file_ext not in SUPPORTED_IMAGE_FORMATS:
                self.log_message(f"⚠️ 檢測到不支持的圖片格式: {file_ext},將自動轉換為jpg")
                file_ext = "jpg"  # 強制使用支持的格式
 
            # 4. 圖片文件名:用item_id避免重復
            file_name = f"{IMAGE_FOLDER}/{item_id}.{file_ext}"
 
            # 已下載則直接返回路徑
            if os.path.exists(file_name):
                return file_name
 
            # 5. 發送請求下載圖片
            headers = {"User-Agent": USER_AGENT}
            response = requests.get(pic_url, headers=headers, timeout=10)
            response.raise_for_status()
 
            # 6. 保存圖片到本地
            with open(file_name, "wb") as f:
                f.write(response.content)
 
            # 7. 嘗試轉換特殊格式圖片為jpg(如果是從mpo等格式轉換而來)
            if file_ext == "jpg" and pic_url.lower().endswith(('mpo', 'mpo?')):
                try:
                    from PIL import Image as PILImage
                    # 打開圖片并轉換為RGB模式(兼容jpg)
                    img = PILImage.open(file_name)
                    rgb_img = img.convert('RGB')
                    # 覆蓋保存為jpg
                    rgb_img.save(file_name)
                    self.log_message(f"✅ 特殊圖片格式已成功轉換為jpg: {item_id}.jpg")
                except Exception as e:
                    self.log_message(f"⚠️ 圖片格式轉換失敗: {str(e)},使用原始文件")
 
            return file_name
 
        except Exception as e:
            self.log_message(f"⚠️ 圖片下載失敗({pic_url}): {str(e)}")
            return None
 
    def save_results(self, keyword):
        """保存結果到Excel(包含圖片插入)"""
        try:
            # 創建Excel工作簿和工作表
            wb = Workbook()
            ws = wb.active
            # 添加表頭(包含圖片列)
            ws.append(["用戶名字", "簡介", "鏈接", "價格", "地區", "圖片"])
 
            # 調整列寬
            ws.column_dimensions["A"].width = 15  # 用戶名
            ws.column_dimensions["B"].width = 40  # 簡介
            ws.column_dimensions["C"].width = 30  # 鏈接
            ws.column_dimensions["F"].width = 20  # 圖片列
 
            # 寫入數據
            for row_idx, data in enumerate(self.results, start=2):  # 從第2行開始(跳過表頭)
                # 寫入文字信息
                ws.cell(row=row_idx, column=1, value=data["user_name"])
                ws.cell(row=row_idx, column=2, value=data["description"])
                ws.cell(row=row_idx, column=3, value=data["url"])
                ws.cell(row=row_idx, column=4, value=data["price"])
                ws.cell(row=row_idx, column=5, value=data["area"])
 
                # 下載并插入圖片
                pic_path = self.download_image(data["pic_url"], data["item_id"])
                if pic_path and os.path.exists(pic_path):
                    try:
                        # 插入圖片
                        img = Image(pic_path)
                        # 調整圖片大小
                        img.width = 100
                        img.height = 100
                        # 插入到F列當前行
                        ws.add_image(img, anchor=f"F{row_idx}")
                        # 調整行高以適應圖片
                        ws.row_dimensions[row_idx].height = 80
                    except Exception as e:
                        self.log_message(f"⚠️ 圖片插入失敗({pic_path}): {str(e)}")
 
            # 生成文件名
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"{keyword}_{timestamp}.xlsx"
            wb.save(filename)
            self.log_message(f"✅ 數據已保存到 {filename}(含圖片)")
 
        except Exception as e:
            self.log_message(f"❌ 保存Excel文件失敗: {str(e)}")
 
 
def main():
    root = tk.Tk()
    app = XianyuSpiderGUI(root)
    root.mainloop()
 
 
if __name__ == "__main__":
    main()
圖片[1]-如何高效獲取咸魚商品:省錢秘籍與購物技巧
圖片[2]-如何高效獲取咸魚商品:省錢秘籍與購物技巧
------本頁內容已結束,喜歡請分享------
溫馨提示:由于項目或工具都有失效性,如遇到不能做的項目或不能使用的工具,可以根據關鍵詞在站點搜索相關內容,查看最近更新的或者在網頁底部給我們留言反饋。
? 版權聲明
THE END
喜歡就支持一下吧
點贊1807 分享