Skip to content

Commit

Permalink
Built-in 多线程抓取支持.
Browse files Browse the repository at this point in the history
  • Loading branch information
CaoZ committed Oct 26, 2017
1 parent 3a3d726 commit 1005e72
Showing 1 changed file with 55 additions and 30 deletions.
85 changes: 55 additions & 30 deletions app/page_crawler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
from datetime import datetime
from pathlib import Path
from queue import Queue
from threading import Thread

import requests
from sqlalchemy import func

from config import config
from lian_jia.community import Community
Expand All @@ -12,6 +13,12 @@
DATA_DIR = Path(__file__).parent.joinpath('../data/').resolve()
DATA_DIR.mkdir(exist_ok=True)

_counts = {
'total': 0,
'completed': 0,
'failed': 0
}


def fetch_page(community_id):
"""
Expand All @@ -27,51 +34,69 @@ def fetch_page(community_id):
save_file.write_bytes(r.content)


def fetch_all_pages(city_id):
def do_fetch(communities_queue: Queue):
# 抓取, 直到没东西可抓

# http://docs.sqlalchemy.org/en/latest/orm/session_basics.html#is-the-session-thread-safe
db_session = Session()

total_count = db_session.query(func.count('*')).filter(
Community.city_id == city_id,
Community.page_fetched_at == None
).scalar()
while not communities_queue.empty():

total_got = 0
a_community = communities_queue.get()

logging.info(f'city_id={city_id}, 待抓取={total_count}')
try:
fetch_page(a_community.id)
a_community.page_fetched_at = datetime.now()
except Exception as e:
_counts['failed'] += 1
logging.error(f'# 抓取失败, community_id={a_community.id}, message="{e}"')

while True:
per_size = 5
else:
db_session.add(a_community)
db_session.commit()

# 每次随机取若干个未抓取过的小区, 以便可以开启多个进程同时抓取互不干扰
communities = db_session.query(Community).filter(
Community.city_id == city_id,
Community.page_fetched_at == None
).order_by(func.random())[:per_size]
_counts['completed'] += 1

if not communities:
break
if _counts['completed'] % 10 == 0:
count_remaining = _counts["total"] - _counts["completed"]
logging.info(f'进度={_counts["completed"]}/{_counts["total"]}, 剩余={count_remaining}')

logging.info('抓取中...')
communities_queue.task_done()

for a_community in communities:
try:
fetch_page(a_community.id)
a_community.page_fetched_at = datetime.now()
db_session.commit()
except Exception as e:
logging.error(f'# 抓取失败, community_id={a_community.id}, message="{e}"')
db_session.close()

total_got += len(communities)

# 若启动多个进程进行抓取, 显示的剩余数量就不准确了, 不过是小问题, 不慌...
logging.info(f'进度={total_got}/{total_count}, 剩余={total_count - total_got}')
def fetch_all_pages(city_id, threads_num=10):
db_session = Session()

all_communities = db_session.query(Community).filter(
Community.city_id == city_id,
Community.page_fetched_at == None
).all()

logging.info('已全部抓取完成.')
db_session.close()

communities_queue = Queue()

for a_community in all_communities:
communities_queue.put(a_community)

_counts['total'] = len(all_communities)

logging.info(f'city_id={city_id}, 待抓取={_counts["total"]}')
logging.info('抓取中...')

for _ in range(threads_num):
worker = Thread(target=do_fetch, args=[communities_queue])
worker.start()

communities_queue.join()

logging.info('已全部抓取完成.')


def main():
fetch_all_pages(config.city_id)
fetch_all_pages(config.city_id, threads_num=10)


if __name__ == '__main__':
Expand Down

0 comments on commit 1005e72

Please sign in to comment.