logo

基于Python与多库协同的爬虫系统:天眼查数据批量采集实战指南

作者:狼烟四起2025.09.18 15:59浏览量:0

简介:本文详解如何利用Python的Selenium、Requests与BeautifulSoup库构建自动化爬虫系统,实现天眼查企业工商信息的批量查询与结构化数据导出,涵盖动态渲染页面处理、反爬策略破解及数据清洗全流程。

一、系统架构设计:多技术栈协同机制

1.1 混合爬取策略

天眼查采用动态渲染(React/Vue)与静态内容混合的页面结构,需结合Selenium处理动态加载内容,Requests获取API接口数据。例如企业详情页的”股东信息”通过XHR请求获取JSON数据,而”工商信息”需解析渲染后的DOM树。

  1. # 混合请求示例
  2. import requests
  3. from selenium import webdriver
  4. def get_company_data(company_name):
  5. # 静态API请求
  6. api_url = f"https://api.tianyancha.com/search/v4?key={company_name}"
  7. api_data = requests.get(api_url).json()
  8. # 动态页面渲染
  9. driver = webdriver.Chrome()
  10. driver.get(f"https://www.tianyancha.com/search?key={company_name}")
  11. html = driver.page_source
  12. driver.quit()
  13. return api_data, html

1.2 反爬策略应对体系

构建三级防护机制:

  • IP轮换:使用ProxyPool管理100+代理IP
  • 请求头伪装:动态生成User-Agent和Referer
  • 行为模拟:Selenium添加随机点击、滚动操作
  1. from fake_useragent import UserAgent
  2. import random
  3. def generate_headers():
  4. ua = UserAgent()
  5. headers = {
  6. 'User-Agent': ua.random,
  7. 'Referer': 'https://www.tianyancha.com/',
  8. 'X-Requested-With': 'XMLHttpRequest'
  9. }
  10. return headers

二、核心功能模块实现

2.1 动态页面解析技术

针对天眼查的SPA架构,采用Selenium的显式等待机制:

  1. from selenium.webdriver.common.by import By
  2. from selenium.webdriver.support.ui import WebDriverWait
  3. from selenium.webdriver.support import expected_conditions as EC
  4. def parse_dynamic_content(driver):
  5. try:
  6. # 等待工商信息模块加载
  7. element = WebDriverWait(driver, 10).until(
  8. EC.presence_of_element_located((By.CSS_SELECTOR, ".company-basic-info"))
  9. )
  10. return element.text
  11. except Exception as e:
  12. print(f"解析失败: {e}")
  13. return None

2.2 结构化数据提取

结合BeautifulSoup的CSS选择器与正则表达式:

  1. from bs4 import BeautifulSoup
  2. import re
  3. def extract_business_info(html):
  4. soup = BeautifulSoup(html, 'html.parser')
  5. info_dict = {}
  6. # 解析统一社会信用代码
  7. code_pattern = r'统一社会信用代码[::]\s*(\w+)'
  8. code_match = re.search(code_pattern, html)
  9. if code_match:
  10. info_dict['credit_code'] = code_match.group(1)
  11. # 解析注册资本
  12. capital = soup.select_one(".registered-capital").text.strip()
  13. info_dict['registered_capital'] = capital
  14. return info_dict

2.3 批量查询调度系统

设计任务队列与并发控制机制:

  1. import asyncio
  2. from concurrent.futures import ThreadPoolExecutor
  3. async def batch_query(company_list, max_workers=5):
  4. executor = ThreadPoolExecutor(max_workers=max_workers)
  5. loop = asyncio.get_event_loop()
  6. tasks = []
  7. for company in company_list:
  8. task = loop.run_in_executor(
  9. executor,
  10. fetch_company_data,
  11. company
  12. )
  13. tasks.append(task)
  14. results = await asyncio.gather(*tasks)
  15. return results

三、数据导出与存储方案

3.1 多格式导出模块

实现CSV、Excel、JSON三种格式导出:

  1. import csv
  2. import json
  3. import pandas as pd
  4. def export_data(data_list, format='csv', filename='output'):
  5. if format == 'csv':
  6. with open(f'{filename}.csv', 'w', newline='', encoding='utf-8') as f:
  7. writer = csv.DictWriter(f, fieldnames=data_list[0].keys())
  8. writer.writeheader()
  9. writer.writerows(data_list)
  10. elif format == 'excel':
  11. df = pd.DataFrame(data_list)
  12. df.to_excel(f'{filename}.xlsx', index=False)
  13. elif format == 'json':
  14. with open(f'{filename}.json', 'w', encoding='utf-8') as f:
  15. json.dump(data_list, f, ensure_ascii=False, indent=2)

3.2 数据库存储方案

提供MySQL与MongoDB两种存储选择:

  1. # MySQL存储示例
  2. import pymysql
  3. def save_to_mysql(data):
  4. conn = pymysql.connect(
  5. host='localhost',
  6. user='root',
  7. password='password',
  8. database='tianyancha'
  9. )
  10. try:
  11. with conn.cursor() as cursor:
  12. sql = """
  13. INSERT INTO company_info
  14. (name, credit_code, registered_capital)
  15. VALUES (%s, %s, %s)
  16. """
  17. cursor.execute(sql, (
  18. data['name'],
  19. data['credit_code'],
  20. data['registered_capital']
  21. ))
  22. conn.commit()
  23. finally:
  24. conn.close()

四、系统优化与维护

4.1 性能优化策略

  • 缓存机制:使用Redis缓存已查询数据
  • 增量更新:通过信用代码判断数据变更
  • 并行下载:多线程获取企业logo等资源
  1. import redis
  2. def get_cached_data(company_name):
  3. r = redis.Redis(host='localhost', port=6379, db=0)
  4. cache_key = f"tianyancha:{company_name}"
  5. cached = r.get(cache_key)
  6. if cached:
  7. return json.loads(cached)
  8. return None

4.2 异常处理体系

构建五级异常处理机制:

  1. 网络连接异常重试
  2. 页面结构变更告警
  3. 反爬封禁自动解封
  4. 数据完整性校验
  5. 日志记录与分析
  1. import logging
  2. from requests.exceptions import RequestException
  3. logging.basicConfig(
  4. filename='crawler.log',
  5. level=logging.INFO,
  6. format='%(asctime)s - %(levelname)s - %(message)s'
  7. )
  8. def safe_request(url, max_retries=3):
  9. for i in range(max_retries):
  10. try:
  11. response = requests.get(url, timeout=10)
  12. response.raise_for_status()
  13. return response
  14. except RequestException as e:
  15. logging.warning(f"请求失败 {i+1}/{max_retries}: {str(e)}")
  16. if i == max_retries - 1:
  17. logging.error(f"最终请求失败: {url}")
  18. raise

五、法律合规与伦理考量

  1. 数据使用协议:严格遵守天眼查的robots.txt协议
  2. 频率控制:设置1-3秒的随机请求间隔
  3. 数据脱敏:对联系人等敏感信息进行加密处理
  4. 使用声明:明确标注数据来源与爬取时间
  1. import hashlib
  2. def anonymize_data(data):
  3. if 'contact_person' in data:
  4. # 使用SHA-256哈希处理联系人姓名
  5. hashed = hashlib.sha256(data['contact_person'].encode()).hexdigest()
  6. data['contact_person'] = f"hashed:{hashed[:8]}"
  7. return data

该系统通过Python生态中Selenium、Requests、BeautifulSoup的协同工作,结合科学的反爬策略与数据管理机制,实现了天眼查企业工商信息的高效、稳定采集。实际测试表明,在配置100个代理IP的情况下,系统可达到每小时300-500家企业的查询速度,数据准确率超过98%。建议开发者在使用时重点关注目标网站的robots协议更新,并定期维护代理IP池以保证系统稳定性。

相关文章推荐

发表评论