Skip to content

Commit

Permalink
remove multiprocess and update code, version 1.6.0
Browse files Browse the repository at this point in the history
  • Loading branch information
xianhu committed Nov 26, 2016
1 parent 1dd40a8 commit 01e62d8
Show file tree
Hide file tree
Showing 17 changed files with 228 additions and 328 deletions.
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# PSpider

Python3下极为简洁的爬虫框架, A simple spider frame written by Python
简单介绍点[这里](https://zhuanlan.zhihu.com/p/23017812), 实例点[这里](https://zhuanlan.zhihu.com/p/23250032)
Python3下极为简洁的爬虫框架, A simple spider frame written by Python

#### 包含以下三个大模块 (three modules)
#### 包含以下模块 (include modules)
1. utilities module: 定义爬虫需要的工具类/工具函数等
2. instances module: 定义抓取过程中的fetcher/parser/saver类
3. concurrent module: 定义多线程/多进程爬取策略, 并保证数据同步
3. abcbase module: 定义多线程、分布式等策略的基础类、函数等
4. concurrent module: 定义多线程爬取策略, 并保证数据同步

#### 其他文件 (other files)
1. setup.py为安装文件, 可将该框架安装到系统环境中
Expand All @@ -24,11 +24,16 @@ Python3下极为简洁的爬虫框架, A simple spider frame written by Python
6. 2016-11-08, 删除部分无用函数/类等, 简化框架
7. 2016-11-13, 更新pybloom至pybloom-live, 可直接利用pip安装
8. 2016-11-17, 添加Dockerfile文件和requirements.txt文件
9. 2016-11-27, 删除多进程策略, 此策略在Mac上可以, 但在Windows上有问题

#### 下一步计划 (next plan)
1. 利用Redis改为分布式爬虫

#### 问题汇总
1. 运行报错: Broken pipe
1.1 老版本会出现这个问题, 主要是由于多线程、多进程同时存在造成的, 更新至最新版本即可
2. 运行报错: No module named 'spider' 或者 'spider don't have attribute WebSpider'
2.1 该框架目前无法直接利用pip进行安装, 需要下载源文件后, 利用'python setup.py install'进行安装

### 欢迎大家在"Issues"中提出问题或者建议,也可以fork后提交"Pull requests"
### If you have any questions or advices, you can commit "Issues" or "Pull requests"
2 changes: 1 addition & 1 deletion pylint.conf
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ max-nested-blocks=5
[FORMAT]

# Maximum number of characters on a single line.
max-line-length=150
max-line-length=200

# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name="spider",
version="1.5.3",
version="1.6.0",
author="xianhu",
keywords=["spider", "crawler"],
packages=find_packages(exclude=("test", "test.*", "demos_*")),
Expand Down
3 changes: 1 addition & 2 deletions spider/abcbase/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@
define base classes which used in concurrent module and distributed module
"""

from .abc_base import TPEnum, BaseThread, BaseProcess, BasePool
from .abc_base import TPEnum
from .abc_insts import FetchThread, ParseThread, SaveThread, MonitorThread
from .abc_insts import FetchProcess, ParseProcess, SaveProcess
180 changes: 11 additions & 169 deletions spider/abcbase/abc_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import queue
import logging
import threading
import multiprocessing


class TPEnum(enum.Enum):
Expand All @@ -26,25 +25,26 @@ class TPEnum(enum.Enum):
ITEM_NOT_SAVE = "item_not_save" # flag of item_not_save


class BaseConcur(object):
class BaseThread(threading.Thread):
"""
class of BaseConcur, as base class of BaseThread and BaseProcess
class of BaseThread, as base class of each thread
"""

def __init__(self, name, worker, pool):
"""
constructor
"""
self.name = name # the name of each thread or process
self.worker = worker # the worker of each thread or process
self.pool = pool # the thread_pool or process_pool
threading.Thread.__init__(self, name=name)

self.worker = worker # the worker of each thread
self.pool = pool # the thread_pool of each thread
return

def run(self):
"""
rewrite run function of Thread or Process, auto running, and must call self.work()
rewrite run function, auto running and must call self.work()
"""
logging.warning("%s[%s] start", self.__class__.__name__, self.name)
logging.warning("%s[%s] start", self.__class__.__name__, self.getName())

while True:
try:
Expand All @@ -54,169 +54,11 @@ def run(self):
if self.pool.is_all_tasks_done():
break

logging.warning("%s[%s] end", self.__class__.__name__, self.name)
logging.warning("%s[%s] end", self.__class__.__name__, self.getName())
return

def work(self):
"""
procedure of each thread or process, return True to continue, False to stop
"""
assert False, "you must rewrite work function in subclass of %s" % self.__class__.__name__


class BaseThread(BaseConcur, threading.Thread):
"""
class of BaseThread, as base class of each thread
"""

def __init__(self, name, worker, pool):
"""
constructor
"""
threading.Thread.__init__(self, name=name)
BaseConcur.__init__(self, name, worker, pool)
return


class BaseProcess(BaseConcur, multiprocessing.Process):
"""
class of BaseProcess, as base class of each process
"""

def __init__(self, name, worker, pool):
"""
constructor
"""
multiprocessing.Process.__init__(self, name=name)
BaseConcur.__init__(self, name, worker, pool)
return


class BasePool(object):
"""
class of BasePool, as base class of thread_pool or process_pool
"""

def __init__(self, fetcher, parser, saver, pool_type="thread"):
"""
constructor
"""
assert pool_type in ("thread", "process"), "parameter pool_type must be 'thread' or 'process'"
self.pool_name = "ThreadPool" if pool_type == "thread" else "ProcessPool"
self.pool_type = pool_type # default: "thread", must be "thread" or "process", to identify the pool type

self.inst_fetcher = fetcher # fetcher instance, for fetch thread or process
self.inst_parser = parser # parser instance, for parse thread or process
self.inst_saver = saver # saver instance, for save thread or process
self.url_filter = None # default: None, also can be UrlFilter()

self.number_dict = {
TPEnum.TASKS_RUNNING: 0, # the count of tasks which are running

TPEnum.URL_FETCH: 0, # the count of urls which have been fetched successfully
TPEnum.HTM_PARSE: 0, # the count of urls which have been parsed successfully
TPEnum.ITEM_SAVE: 0, # the count of urls which have been saved successfully

TPEnum.URL_NOT_FETCH: 0, # the count of urls which haven't been fetched
TPEnum.HTM_NOT_PARSE: 0, # the count of urls which haven't been parsed
TPEnum.ITEM_NOT_SAVE: 0, # the count of urls which haven't been saved
}
self.lock = None # the lock which self.number_dict needs

self.fetch_queue = None # (priority, url, keys, deep, critical, fetch_repeat, parse_repeat)
self.parse_queue = None # (priority, url, keys, deep, critical, fetch_repeat, parse_repeat, content)
self.save_queue = None # (url, keys, item), item can be any type object
return

def set_start_url(self, url, keys, priority=0, deep=0, critical=False):
"""
set start url based on "keys", "priority", "deep" and "critical", fetch_repeat and parse_repeat must be 0
:param url: the url, which needs to be fetched in this spider
:param keys: some information of this url, and will be passed to fetcher, parser and saver
:param priority: the priority of this url, spider fetches url according to url's priority
:param deep: the deep of this url, when deep > max_deep, stop fetching, default 0
:param critical: the critical flag of this url, default False to identity that this url is normal, else is critical
"""
logging.warning("%s set_start_url: keys=%s, priority=%s, deep=%s, critical=%s, url=%s", self.pool_name, keys, priority, deep, critical, url)
self.add_a_task(TPEnum.URL_FETCH, (priority, url, keys, deep, critical, 0, 0))
return

def start_work_and_wait_done(self, fetcher_num=10, parser_num=1, is_over=True):
"""
start this pool, and wait for finishing
:param fetcher_num: the number of fetching thread
:param parser_num: the number of parsing thread or parsing process
:param is_over: whether to stop monitor when this pool stop, default True
"""
logging.debug("%s start, fetcher_num=%s, parser_num=%s, is_over=%s", self.pool_name, fetcher_num, parser_num, is_over)
assert False, "you must rewrite work function in subclass of %s" % self.__class__.__name__

def is_all_tasks_done(self):
procedure of each thread, return True to continue, False to stop
"""
check if all tasks are done, according to self.number_dict
"""
return False if self.number_dict[TPEnum.TASKS_RUNNING] or self.number_dict[TPEnum.URL_NOT_FETCH] or \
self.number_dict[TPEnum.HTM_NOT_PARSE] or self.number_dict[TPEnum.ITEM_NOT_SAVE] else True

def update_number_dict(self, key, value):
"""
update number_dict of this pool
"""
self.lock.acquire()
self.number_dict[key] += value
self.lock.release()
return

# ================================================================================================================================
def add_a_task(self, task_name, task_content):
"""
add a task based on task_name, if queue is full, blocking the queue
"""
if task_name == TPEnum.URL_FETCH:
if (task_content[-1] > 0) or (task_content[-2] > 0) or (not self.url_filter) or self.url_filter.check(task_content[1]):
self.fetch_queue.put(task_content, block=True)
self.update_number_dict(TPEnum.URL_NOT_FETCH, +1)
elif task_name == TPEnum.HTM_PARSE:
self.parse_queue.put(task_content, block=True)
self.update_number_dict(TPEnum.HTM_NOT_PARSE, +1)
elif task_name == TPEnum.ITEM_SAVE:
self.save_queue.put(task_content, block=True)
self.update_number_dict(TPEnum.ITEM_NOT_SAVE, +1)
else:
logging.error("%s add_a_task error: parameter[%s] is invalid", self.pool_name, task_name)
return

def get_a_task(self, task_name):
"""
get a task based on task_name, if queue is empty, raise queue.Empty
"""
task_content = None
if task_name == TPEnum.URL_FETCH:
task_content = self.fetch_queue.get(block=True, timeout=5)
self.update_number_dict(TPEnum.URL_NOT_FETCH, -1)
elif task_name == TPEnum.HTM_PARSE:
task_content = self.parse_queue.get(block=True, timeout=5)
self.update_number_dict(TPEnum.HTM_NOT_PARSE, -1)
elif task_name == TPEnum.ITEM_SAVE:
task_content = self.save_queue.get(block=True, timeout=5)
self.update_number_dict(TPEnum.ITEM_NOT_SAVE, -1)
else:
logging.error("%s get_a_task error: parameter[%s] is invalid", self.pool_name, task_name)
self.update_number_dict(TPEnum.TASKS_RUNNING, +1)
return task_content

def finish_a_task(self, task_name):
"""
finish a task based on task_name, call queue.task_done()
"""
if task_name == TPEnum.URL_FETCH:
self.fetch_queue.task_done()
elif task_name == TPEnum.HTM_PARSE:
self.parse_queue.task_done()
elif task_name == TPEnum.ITEM_SAVE:
self.save_queue.task_done()
else:
logging.error("%s finish_a_task error: parameter[%s] is invalid", self.pool_name, task_name)
self.update_number_dict(TPEnum.TASKS_RUNNING, -1)
return
# ================================================================================================================================
assert False, "you must rewrite work function in %s" % self.__class__.__name__
23 changes: 12 additions & 11 deletions spider/abcbase/abc_insts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import time
import logging
from .abc_base import BaseThread, BaseProcess, TPEnum
from .abc_base import TPEnum, BaseThread


# ===============================================================================================================================
Expand Down Expand Up @@ -35,7 +35,6 @@ def work_fetch(self):
return True

FetchThread = type("FetchThread", (BaseThread,), dict(work=work_fetch))
FetchProcess = type("FetchProcess", (BaseProcess,), dict(work=work_fetch))


# ===============================================================================================================================
Expand Down Expand Up @@ -67,7 +66,6 @@ def work_parse(self):
return True

ParseThread = type("ParseThread", (BaseThread,), dict(work=work_parse))
ParseProcess = type("ParseProcess", (BaseProcess,), dict(work=work_parse))


# ===============================================================================================================================
Expand All @@ -90,7 +88,6 @@ def work_save(self):
return True

SaveThread = type("SaveThread", (BaseThread,), dict(work=work_save))
SaveProcess = type("SaveProcess", (BaseProcess,), dict(work=work_save))


# ===============================================================================================================================
Expand All @@ -111,27 +108,31 @@ def init_monitor_thread(self, name, pool, sleep_time=5):

def work_monitor(self):
"""
monitor the pool, auto running, and return True to continue, False to stop
monitor the pool, auto running and return True to continue, False to stop
"""
time.sleep(self.sleep_time)

cur_fetch_num = self.pool.number_dict[TPEnum.URL_FETCH]
cur_parse_num = self.pool.number_dict[TPEnum.HTM_PARSE]
cur_save_num = self.pool.number_dict[TPEnum.ITEM_SAVE]

info = "%s status: running_tasks=%s;" % (self.pool.pool_name, self.pool.number_dict[TPEnum.TASKS_RUNNING])
info = "%s status: running_tasks=%s;" % (self.pool.__class__.__name__, self.pool.number_dict[TPEnum.TASKS_RUNNING])

info += " fetch=(%d, %d, %d/(%ds));" % \
(self.pool.number_dict[TPEnum.URL_NOT_FETCH], cur_fetch_num, cur_fetch_num-self.last_fetch_num, self.sleep_time)
self.last_fetch_num = cur_fetch_num

info += " parse=(%d, %d, %d/(%ds));" % \
(self.pool.number_dict[TPEnum.HTM_NOT_PARSE], cur_parse_num, cur_parse_num-self.last_parse_num, self.sleep_time)
self.last_parse_num = cur_parse_num

info += " save=(%d, %d, %d/(%ds));" % \
(self.pool.number_dict[TPEnum.ITEM_NOT_SAVE], cur_save_num, cur_save_num-self.last_save_num, self.sleep_time)
info += " total_seconds=%d" % (time.time() - self.init_time)
logging.warning(info)

self.last_fetch_num = cur_fetch_num
self.last_parse_num = cur_parse_num
self.last_save_num = cur_save_num

info += " total_seconds=%d" % (time.time() - self.init_time)

logging.warning(info)
return False if self.pool.monitor_stop else True

MonitorThread = type("MonitorThread", (BaseThread,), dict(__init__=init_monitor_thread, work=work_monitor))
4 changes: 2 additions & 2 deletions spider/concurrent/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# _*_ coding: utf-8 _*_

"""
define ConcurPool as WebSpider
define ThreadPool as WebSpider
"""

from .concur_pool import ConcurPool as WebSpider
from .concur_threads import ThreadPool as WebSpider
Loading

0 comments on commit 01e62d8

Please sign in to comment.