Skip to content

Commit

Permalink
change code structure, preprcess for distributed spider, version 1.5.2
Browse files Browse the repository at this point in the history
  • Loading branch information
xianhu committed Nov 25, 2016
1 parent 4468b41 commit 58ac065
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 215 deletions.
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,8 @@ RUN pip3 install --upgrade pip
RUN pip3 install -r Dockerfile_requirements.txt
RUN rm -rf /root/*

WORKDIR /root/
RUN echo "alias python=python3" >> /root/.bashrc
RUN source /root/.bashrc

CMD /bin/bash
9 changes: 9 additions & 0 deletions spider/abcbase/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# _*_ coding: utf-8 _*_

"""
define base classes which used in concurrent module and distributed module
"""

from .abc_base import TPEnum, BaseThread, BaseProcess, BasePool
from .abc_insts import FetchThread, ParseThread, SaveThread, MonitorThread
from .abc_insts import FetchProcess, ParseProcess, SaveProcess
222 changes: 222 additions & 0 deletions spider/abcbase/abc_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# _*_ coding: utf-8 _*_

"""
abc_base.py by xianhu
"""

import enum
import queue
import logging
import threading
import multiprocessing


class TPEnum(enum.Enum):
"""
enum of TPEnum, to express the status of web_spider
"""
TASKS_RUNNING = "tasks_running" # flag of tasks_running

URL_FETCH = "url_fetch" # flag of url_fetched
HTM_PARSE = "htm_parse" # flag of htm_parsed
ITEM_SAVE = "item_save" # flag of item_saved

URL_NOT_FETCH = "url_not_fetch" # flag of url_not_fetch
HTM_NOT_PARSE = "htm_not_parse" # flag of htm_not_parse
ITEM_NOT_SAVE = "item_not_save" # flag of item_not_save


class BaseConcur(object):
"""
class of BaseConcur, as base class of BaseThread and BaseProcess
"""

def __init__(self, name, worker, pool):
"""
constructor
"""
self.name = name # the name of each thread or process
self.worker = worker # the worker of each thread or process
self.pool = pool # the thread_pool or process_pool
return

def run(self):
"""
rewrite run function of Thread or Process, auto running, and must call self.work()
"""
logging.warning("%s[%s] start", self.__class__.__name__, self.name)

while True:
try:
if not self.work():
break
except queue.Empty:
if self.pool.is_all_tasks_done():
break

logging.warning("%s[%s] end", self.__class__.__name__, self.name)
return

def work(self):
"""
procedure of each thread or process, return True to continue, False to stop
"""
assert False, "you must rewrite work function in subclass of %s" % self.__class__.__name__


class BaseThread(BaseConcur, threading.Thread):
"""
class of BaseThread, as base class of each thread
"""

def __init__(self, name, worker, pool):
"""
constructor
"""
threading.Thread.__init__(self, name=name)
BaseConcur.__init__(self, name, worker, pool)
return


class BaseProcess(BaseConcur, multiprocessing.Process):
"""
class of BaseProcess, as base class of each process
"""

def __init__(self, name, worker, pool):
"""
constructor
"""
multiprocessing.Process.__init__(self, name=name)
BaseConcur.__init__(self, name, worker, pool)
return


class BasePool(object):
"""
class of BasePool, as base class of thread_pool or process_pool
"""

def __init__(self, fetcher, parser, saver, pool_type="thread"):
"""
constructor
"""
assert pool_type in ("thread", "process"), "parameter pool_type must be 'thread' or 'process'"
self.pool_name = "ThreadPool" if pool_type == "thread" else "ProcessPool"
self.pool_type = pool_type # default: "thread", must be "thread" or "process", to identify the pool type

self.inst_fetcher = fetcher # fetcher instance, for fetch thread or process
self.inst_parser = parser # parser instance, for parse thread or process
self.inst_saver = saver # saver instance, for save thread or process
self.url_filter = None # default: None, also can be UrlFilter()

self.number_dict = {
TPEnum.TASKS_RUNNING: 0, # the count of tasks which are running

TPEnum.URL_FETCH: 0, # the count of urls which have been fetched successfully
TPEnum.HTM_PARSE: 0, # the count of urls which have been parsed successfully
TPEnum.ITEM_SAVE: 0, # the count of urls which have been saved successfully

TPEnum.URL_NOT_FETCH: 0, # the count of urls which haven't been fetched
TPEnum.HTM_NOT_PARSE: 0, # the count of urls which haven't been parsed
TPEnum.ITEM_NOT_SAVE: 0, # the count of urls which haven't been saved
}
self.lock = None # the lock which self.number_dict needs

self.fetch_queue = None # (priority, url, keys, deep, critical, fetch_repeat, parse_repeat)
self.parse_queue = None # (priority, url, keys, deep, critical, fetch_repeat, parse_repeat, content)
self.save_queue = None # (url, keys, item), item can be any type object
return

def set_start_url(self, url, keys, priority=0, deep=0, critical=False):
"""
set start url based on "keys", "priority", "deep" and "critical", fetch_repeat and parse_repeat must be 0
:param url: the url, which needs to be fetched in this spider
:param keys: some information of this url, and will be passed to fetcher, parser and saver
:param priority: the priority of this url, spider fetches url according to url's priority
:param deep: the deep of this url, when deep > max_deep, stop fetching, default 0
:param critical: the critical flag of this url, default False to identity that this url is normal, else is critical
"""
logging.warning("%s set_start_url: keys=%s, priority=%s, deep=%s, critical=%s, url=%s", self.pool_name, keys, priority, deep, critical, url)
self.add_a_task(TPEnum.URL_FETCH, (priority, url, keys, deep, critical, 0, 0))
return

def start_work_and_wait_done(self, fetcher_num=10, parser_num=1, is_over=True):
"""
start this pool, and wait for finishing
:param fetcher_num: the number of fetching thread
:param parser_num: the number of parsing thread or parsing process
:param is_over: whether to stop monitor when this pool stop, default True
"""
logging.debug("%s start, fetcher_num=%s, parser_num=%s, is_over=%s", self.pool_name, fetcher_num, parser_num, is_over)
assert False, "you must rewrite work function in subclass of %s" % self.__class__.__name__

def is_all_tasks_done(self):
"""
check if all tasks are done, according to self.number_dict
"""
return False if self.number_dict[TPEnum.TASKS_RUNNING] or self.number_dict[TPEnum.URL_NOT_FETCH] or \
self.number_dict[TPEnum.HTM_NOT_PARSE] or self.number_dict[TPEnum.ITEM_NOT_SAVE] else True

def update_number_dict(self, key, value):
"""
update number_dict of this pool
"""
self.lock.acquire()
self.number_dict[key] += value
self.lock.release()
return

# ================================================================================================================================
def add_a_task(self, task_name, task_content):
"""
add a task based on task_name, if queue is full, blocking the queue
"""
if task_name == TPEnum.URL_FETCH:
if (task_content[-1] > 0) or (task_content[-2] > 0) or (not self.url_filter) or self.url_filter.check(task_content[1]):
self.fetch_queue.put(task_content, block=True)
self.update_number_dict(TPEnum.URL_NOT_FETCH, +1)
elif task_name == TPEnum.HTM_PARSE:
self.parse_queue.put(task_content, block=True)
self.update_number_dict(TPEnum.HTM_NOT_PARSE, +1)
elif task_name == TPEnum.ITEM_SAVE:
self.save_queue.put(task_content, block=True)
self.update_number_dict(TPEnum.ITEM_NOT_SAVE, +1)
else:
logging.error("%s add_a_task error: parameter[%s] is invalid", self.pool_name, task_name)
return

def get_a_task(self, task_name):
"""
get a task based on task_name, if queue is empty, raise queue.Empty
"""
task_content = None
if task_name == TPEnum.URL_FETCH:
task_content = self.fetch_queue.get(block=True, timeout=5)
self.update_number_dict(TPEnum.URL_NOT_FETCH, -1)
elif task_name == TPEnum.HTM_PARSE:
task_content = self.parse_queue.get(block=True, timeout=5)
self.update_number_dict(TPEnum.HTM_NOT_PARSE, -1)
elif task_name == TPEnum.ITEM_SAVE:
task_content = self.save_queue.get(block=True, timeout=5)
self.update_number_dict(TPEnum.ITEM_NOT_SAVE, -1)
else:
logging.error("%s get_a_task error: parameter[%s] is invalid", self.pool_name, task_name)
self.update_number_dict(TPEnum.TASKS_RUNNING, +1)
return task_content

def finish_a_task(self, task_name):
"""
finish a task based on task_name, call queue.task_done()
"""
if task_name == TPEnum.URL_FETCH:
self.fetch_queue.task_done()
elif task_name == TPEnum.HTM_PARSE:
self.parse_queue.task_done()
elif task_name == TPEnum.ITEM_SAVE:
self.save_queue.task_done()
else:
logging.error("%s finish_a_task error: parameter[%s] is invalid", self.pool_name, task_name)
self.update_number_dict(TPEnum.TASKS_RUNNING, -1)
return
# ================================================================================================================================
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# _*_ coding: utf-8 _*_

"""
concur_insts.py by xianhu
abc_insts.py by xianhu
"""

import time
import logging
from .concur_base import BaseThread, BaseProcess, TPEnum
from .abc_base import BaseThread, BaseProcess, TPEnum


# ===============================================================================================================================
Expand Down
92 changes: 0 additions & 92 deletions spider/concurrent/concur_base.py

This file was deleted.

Loading

0 comments on commit 58ac065

Please sign in to comment.