Scrapy 源碼剖析(4):Scrapy 如何完成抓取任務?

這篇文章就讓我們來看一下,也是 Scrapy 最核心的抓取流程是如何運行的,它是如何調度各個組件,完成整個抓取工作的。

運行入口

還是回到最初的入口,在 Scrapy 源碼剖析(二)Scrapy 是如何運行起來的?這篇文章中我們已經詳細分析過了,在執行 Scrapy 命令時,主要經過以下幾步:

crawl 方法最終是調用了 Cralwer 實例的 crawl,這個方法最終把控制權交給了Engine,而 start 方法註冊好協程池,就開始異步調度執行了。

我們來看 Cralwercrawl 方法:

@defer.inlineCallbacks
def crawl(self, *args, **kwargs):
    assert not self.crawling, "Crawling already taking place"
    self.crawling = True
    try:
        # 創建爬蟲實例
        self.spider = self._create_spider(*args, **kwargs)
        # 創建引擎
        self.engine = self._create_engine()
        # 調用spider的start_requests 獲取種子URL
        start_requests = iter(self.spider.start_requests())
        # 調用engine的open_spider 交由引擎調度
        yield self.engine.open_spider(self.spider, start_requests)
        yield defer.maybeDeferred(self.engine.start)
    except Exception:
        if six.PY2:
            exc_info = sys.exc_info()
        self.crawling = False
        if self.engine is not None:
            yield self.engine.close()
        if six.PY2:
            six.reraise(*exc_info)
        raise

這裏首先會創建出爬蟲實例,然後創建引擎,之後調用了 spiderstart_requests 方法,這個方法就是我們平時寫的最多爬蟲類的父類,它在 spiders/__init__.py 中定義:

def start_requests(self):
    # 根據定義好的start_urls屬性 生成種子URL對象
    for url in self.start_urls:
        yield self.make_requests_from_url(url)

def make_requests_from_url(self, url):
    # 構建Request對象
    return Request(url, dont_filter=True)

構建請求

通過上面這段代碼,我們能看到,平時我們必須要定義的 start_urls 屬性,原來就是在這裏用來構建 Request 的,來看 Request 的定義:

class Request(object_ref):

    def __init__(self, url, callback=None, method='GET'headers=None, body=None,
                 cookies=None, meta=None, encoding='utf-8'priority=0,
                 dont_filter=False, errback=None):
        # 編碼
        self._encoding = encoding
        # 請求方法
        self.method = str(method).upper()
        # 設置url
        self._set_url(url)
        # 設置body
        self._set_body(body)
        assert isinstance(priority, int)"Request priority not an integer: %r" % priority
        # 優先級
        self.priority = priority
        assert callback or not errback, "Cannot use errback without a callback"
        # 回調函數
        self.callback = callback
        # 異常回調函數
        self.errback = errback
        # cookies
        self.cookies = cookies or {}
        # 構建Header
        self.headers = Headers(headers or {}encoding=encoding)
        # 是否需要過濾
        self.dont_filter = dont_filter
  # 附加信息
        self._meta = dict(meta) if meta else None

Request 對象比較簡單,就是封裝了請求參數、請求方法、回調以及可附加的屬性信息。

當然,你也可以在子類中重寫 start_requestsmake_requests_from_url 這 2 個方法,用來自定義邏輯構建種子請求。

引擎調度

再回到 crawl 方法,構建好種子請求對象後,調用了 engineopen_spider

@defer.inlineCallbacks
def open_spider(self, spider, start_requests=()close_if_idle=True):
    assert self.has_capacity()"No free spider slot when opening %r" % \
        spider.name
    logger.info("Spider opened"extra={'spider': spider})
    # 註冊_next_request調度方法 循環調度
    nextcall = CallLaterOnce(self._next_request, spider)
    # 初始化scheduler
    scheduler = self.scheduler_cls.from_crawler(self.crawler)
    # 調用爬蟲中間件 處理種子請求
    start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
    # 封裝Slot對象
    slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
    self.slot = slot
    self.spider = spider
    # 調用scheduler的open
    yield scheduler.open(spider)
    # 調用scrapyer的open
    yield self.scraper.open_spider(spider)
    # 調用stats的open
    self.crawler.stats.open_spider(spider)
    yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
    # 發起調度
    slot.nextcall.schedule()
    slot.heartbeat.start(5)

在這裏首先構建了一個 CallLaterOnce,之後把 _next_request 方法註冊了進去,看此類的實現:

class CallLaterOnce(object):
    # 在twisted的reactor中循環調度一個方法
    def __init__(self, func, *a, **kw):
        self._func = func
        self._a = a
        self._kw = kw
        self._call = None

    def schedule(self, delay=0):
        # 上次發起調度 纔可再次繼續調度
        if self._call is None:
            # 註冊self到callLater中
            self._call = reactor.callLater(delay, self)

    def cancel(self):
        if self._call:
            self._call.cancel()

    def __call__(self):
        # 上面註冊的是self 所以會執行__call__
        self._call = None
        return self._func(*self._a, **self._kw)

這裏封裝了循環執行的方法類,並且註冊的方法會在 twistedreactor 中異步執行,以後執行只需調用 schedule,就會註冊 selfreactorcallLater 中,然後它會執行 __call__ 方法,最終執行的就是我們註冊的方法。

而這裏我們註冊的方法就是引擎的 _next_request,也就是說,此方法會循環調度,直到程序退出。

之後調用了爬蟲中間件的 process_start_requests 方法,你可以定義多個自己的爬蟲中間件,每個類都重寫此方法,爬蟲在調度之前會分別調用你定義好的爬蟲中間件,來處理初始化請求,你可以進行過濾、加工、篩選以及你想做的任何邏輯。

這樣做的好處就是,把想做的邏輯拆分成多箇中間件,每個中間件功能獨立,而且維護起來更加清晰。

調度器

接下來就要開始調度任務了,這裏首先調用了 Scheduleropen

def open(self, spider):
    self.spider = spider
    # 實例化優先級隊列
    self.mqs = self.pqclass(self._newmq)
    # 如果定義了dqdir則實例化基於磁盤的隊列
    self.dqs = self._dq() if self.dqdir else None
    # 調用請求指紋過濾器的open方法
    return self.df.open()
    
def _dq(self):
    # 實例化磁盤隊列
    activef = join(self.dqdir, 'active.json')
    if exists(activef):
        with open(activef) as f:
            prios = json.load(f)
    else:
        prios = ()
    q = self.pqclass(self._newdq, startprios=prios)
    if q:
        logger.info("Resuming crawl (%(queuesize)d requests scheduled)",
                    {'queuesize': len(q)}extra={'spider': self.spider})
    return q

open 方法中,調度器會實例化出優先級隊列,以及根據 dqdir是否配置,決定是否使用磁盤隊列,最後調用了請求指紋過濾器open 方法,這個方法在父類 BaseDupeFilter 中定義:

class BaseDupeFilter(object):
    # 過濾器基類,子類可重寫以下方法
    @classmethod
    def from_settings(cls, settings):
        return cls()

    def request_seen(self, request):
        # 請求過濾
        return False

    def open(self):
        # 可重寫 完成過濾器的初始化工作
        pass

    def close(self, reason):
        # 可重寫 完成關閉過濾器工作
        pass

    def log(self, request, spider):
        pas

請求過濾器提供了請求過濾的具體實現方式,Scrapy 默認提供了 RFPDupeFilter 過濾器實現過濾重複請求的邏輯,這裏先對這個類有個瞭解,後面會講具體是如何過濾重複請求的。

Scraper

再之後就調用 Scraperopen_spider 方法,在之前的文章中我們提到過,Scraper 類是連接 EngineSpiderItem Pipeline 這 3 個組件的橋樑:

@defer.inlineCallbacks
def open_spider(self, spider):
    self.slot = Slot()
    # 調用所有pipeline的open_spider
    yield self.itemproc.open_spider(spider)

這裏的主要邏輯是 Scraper 調用所有 Pipelineopen_spider 方法,如果我們定義了多個 Pipeline 輸出類,可以重寫 open_spider 完成每個 Pipeline 在輸出前的初始化工作。

循環調度

調用了一系列組件的 open 方法後,最後調用了 nextcall.schedule() 開始調度,也就是循環執行在上面註冊的 _next_request 方法:

def _next_request(self, spider):
    # 此方法會循環調度
    slot = self.slot
    if not slot:
        return
    # 暫停
    if self.paused:
        return
    # 是否等待
    while not self._needs_backout(spider):
        # 從scheduler中獲取request
        # 注意:第一次獲取時,是沒有的,也就是會break出來
        # 從而執行下面的邏輯
        if not self._next_request_from_scheduler(spider):
            break
    # 如果start_requests有數據且不需要等待
    if slot.start_requests and not self._needs_backout(spider):
        try:
            # 獲取下一個種子請求
            request = next(slot.start_requests)
        except StopIteration:
            slot.start_requests = None
        except Exception:
            slot.start_requests = None
            logger.error('Error while obtaining start requests',
                         exc_info=True, extra={'spider': spider})
        else:
            # 調用crawl,實際是把request放入scheduler的隊列中
            self.crawl(request, spider)
    # 空閒則關閉spider
    if self.spider_is_idle(spider) and slot.close_if_idle:
        self._spider_idle(spider)
        
def _needs_backout(self, spider):
    # 是否需要等待,取決4個條件
    # 1. Engine是否stop
    # 2. slot是否close
    # 3. downloader下載超過預設
    # 4. scraper處理response超過預設
    slot = self.slot
    return not self.running \
        or slot.closing \
        or self.downloader.needs_backout() \
        or self.scraper.slot.needs_backout()

def _next_request_from_scheduler(self, spider):
    slot = self.slot
    # 從scheduler拿出下個request
    request = slot.scheduler.next_request()
    if not request:
        return
    # 下載
    d = self._download(request, spider)
    # 註冊成功、失敗、出口回調方法
    d.addBoth(self._handle_downloader_output, request, spider)
    d.addErrback(lambda f: logger.info('Error while handling downloader output',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
    d.addBoth(lambda _: slot.remove_request(request))
    d.addErrback(lambda f: logger.info('Error while removing request from slot',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
    d.addBoth(lambda _: slot.nextcall.schedule())
    d.addErrback(lambda f: logger.info('Error while scheduling new request',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
    return d
    

def crawl(self, request, spider):
    assert spider in self.open_spiders, \
        "Spider %r not opened when crawling: %s" % (spider.name, request)
    # request放入scheduler隊列,調用nextcall的schedule
    self.schedule(request, spider)
    self.slot.nextcall.schedule()

def schedule(self, request, spider):
    self.signals.send_catch_log(signal=signals.request_scheduled,
            request=request, spider=spider)
    # 調用scheduler的enqueue_request,把request放入scheduler隊列
    if not self.slot.scheduler.enqueue_request(request):
        self.signals.send_catch_log(signal=signals.request_dropped,
                                    request=request, spider=spider)

_next_request 方法首先調用 _needs_backout 檢查是否需要等待,等待的條件有以下幾種情況:

如果不需要等待,則調用 _next_request_from_scheduler,此方法從名字上就能看出,主要是從 Schduler 中獲取 Request

這裏要注意,在第一次調用此方法時,Scheduler 中是沒有放入任何 Request 的,這裏會直接break 出來,執行下面的邏輯,而下面就會調用 crawl 方法,實際是把請求放到 Scheduler 的請求隊列,放入隊列的過程會經過請求過濾器校驗是否重複。

下次再調用 _next_request_from_scheduler 時,就能從 Scheduler 中獲取到下載請求,然後執行下載動作。

先來看第一次調度,執行 crawl

def crawl(self, request, spider):
    assert spider in self.open_spiders, \
        "Spider %r not opened when crawling: %s" % (spider.name, request)
    # 放入Scheduler隊列
    self.schedule(request, spider)
    # 進行下一次調度
    self.slot.nextcall.schedule()
    
def schedule(self, request, spider):
    self.signals.send_catch_log(signal=signals.request_scheduled,
            request=request, spider=spider)
    # 放入Scheduler隊列
    if not self.slot.scheduler.enqueue_request(request):
        self.signals.send_catch_log(signal=signals.request_dropped,
                                    request=request, spider=spider)

調用引擎的 crawl 實際就是把請求放入 Scheduler 的隊列中,下面看請求是如何入隊列的。

請求入隊

Scheduler 請求入隊方法:

def enqueue_request(self, request):
    # 請求入隊 若請求過濾器驗證重複 返回False
    if not request.dont_filter and self.df.request_seen(request):
        self.df.log(request, self.spider)
        return False
    # 磁盤隊列是否入隊成功
    dqok = self._dqpush(request)
    if dqok:
        self.stats.inc_value('scheduler/enqueued/disk'spider=self.spider)
    else:
        # 沒有定義磁盤隊列 則使用內存隊列
        self._mqpush(request)
        self.stats.inc_value('scheduler/enqueued/memory'spider=self.spider)
    self.stats.inc_value('scheduler/enqueued'spider=self.spider)
    return True
    
def _dqpush(self, request):
    # 是否定義磁盤隊列
    if self.dqs is None:
        return
    try:
        # Request對象轉dict
        reqd = request_to_dict(request, self.spider)
        # 放入磁盤隊列
        self.dqs.push(reqd, -request.priority)
    except ValueError as e:  # non serializable request
        if self.logunser:
            msg = ("Unable to serialize request: %(request)s - reason:"
                   " %(reason)s - no more unserializable requests will be"
                   " logged (stats being collected)")
            logger.warning(msg, {'request': request, 'reason': e},
                           exc_info=True, extra={'spider': self.spider})
            self.logunser = False
        self.stats.inc_value('scheduler/unserializable',
                             spider=self.spider)
        return
    else:
        return True
    
def _mqpush(self, request):
    # 入內存隊列
    self.mqs.push(request, -request.priority)

在上一篇文章時有說到,調度器主要定義了 2 種隊列:基於磁盤隊列、基於內存隊列。

如果在實例化 Scheduler 時候傳入 jobdir,則使用磁盤隊列,否則使用內存隊列,默認使用內存隊列。

指紋過濾

上面說到,在請求入隊之前,首先會通過請求指紋過濾器檢查請求是否重複,也就是調用了過濾器的 request_seen

def request_seen(self, request):
    # 生成請求指紋
    fp = self.request_fingerprint(request)
    # 請求指紋如果在指紋集合中 則認爲重複
    if fp in self.fingerprints:
        return True
    # 不重複則記錄此指紋
    self.fingerprints.add(fp)
    # 實例化如果有path則把指紋寫入文件
    if self.file:
        self.file.write(fp + os.linesep)

def request_fingerprint(self, request):
    # 調用utils.request的request_fingerprint
    return request_fingerprint(request)

utils.requestrequest_fingerprint 邏輯如下:

def request_fingerprint(request, include_headers=None):
    """生成請求指紋"""
    # 指紋生成是否包含headers
    if include_headers:
        include_headers = tuple(to_bytes(h.lower())
                                 for h in sorted(include_headers))
    cache = _fingerprint_cache.setdefault(request, {})
    if include_headers not in cache:
        # 使用sha1算法生成指紋
        fp = hashlib.sha1()
        fp.update(to_bytes(request.method))
        fp.update(to_bytes(canonicalize_url(request.url)))
        fp.update(request.body or b'')
        if include_headers:
            for hdr in include_headers:
                if hdr in request.headers:
                    fp.update(hdr)
                    for v in request.headers.getlist(hdr):
                        fp.update(v)
        cache[include_headers] = fp.hexdigest()
    return cache[include_headers]

這個過濾器先是通過 Request 對象生成一個請求指紋,在這裏使用 sha1 算法,並記錄到指紋集合,每次請求入隊前先到這裏驗證一下指紋集合,如果已存在,則認爲請求重複,則不會重複入隊列。

不過如果我想不校驗重複,也想重複爬取怎麼辦?看 enqueue_request 的第一行判斷,僅需將 Request 實例的 dont_filter 設置爲 True 就可以重複抓取此請求,非常靈活。

Scrapy 就是通過此邏輯實現重複請求的過濾,默認情況下,重複請求是不會進行重複抓取的。

下載請求

請求第一次進來後,肯定是不重複的,那麼則會正常進入調度器隊列。之後下一次調度,再次調用 _next_request_from_scheduler 方法,此時調用調度器的 next_request 方法,就是從調度器隊列中取出一個請求,這次就要開始進行網絡下載了,也就是調用 _download

def _download(self, request, spider):
    # 下載請求
    slot = self.slot
    slot.add_request(request)
    def _on_success(response):
        # 成功回調 結果必須是Request或Response
        assert isinstance(response, (Response, Request))
        if isinstance(response, Response):
            # 如果下載後結果爲Response 返回Response
            response.request = request
            logkws = self.logformatter.crawled(request, response, spider)
            logger.log(*logformatter_adapter(logkws)extra={'spider': spider})
            self.signals.send_catch_log(signal=signals.response_received, \
                response=response, request=request, spider=spider)
        return response

    def _on_complete(_):
        # 此次下載完成後 繼續進行下一次調度
        slot.nextcall.schedule()
        return _

    # 調用Downloader進行下載
    dwld = self.downloader.fetch(request, spider)
    # 註冊成功回調
    dwld.addCallbacks(_on_success)
    # 結束回調
    dwld.addBoth(_on_complete)
    return dwld

在進行網絡下載時,調用了 Downloaderfetch

def fetch(self, request, spider):
    def _deactivate(response):
        # 下載結束後刪除此記錄
        self.active.remove(request)
        return response
    # 下載前記錄處理中的請求
    self.active.add(request)
    # 調用下載器中間件download 並註冊下載成功的回調方法是self._enqueue_request
    dfd = self.middleware.download(self._enqueue_request, request, spider)
    # 註冊結束回調
    return dfd.addBoth(_deactivate)

這裏調用下載器中間件的 download,並註冊下載成功的回調方法是 _enqueue_request,來看下載方法:

def download(self, download_func, request, spider):
    @defer.inlineCallbacks
    def process_request(request):
        # 如果下載器中間件有定義process_request 則依次執行
        for method in self.methods['process_request']:
            response = yield method(request=request, spider=spider)
            assert response is None or isinstance(response, (Response, Request))\
                    'Middleware %s.process_request must return None, Response or Request, got %s' % \
                    (six.get_method_self(method).__class__.__name__, response.__class__.__name__)
            # 如果下載器中間件有返回值 直接返回此結果
            if response:
                defer.returnValue(response)
        # 如果下載器中間件沒有返回值,則執行註冊進來的方法 也就是Downloader的_enqueue_request
        defer.returnValue((yield download_func(request=request,spider=spider)))

    @defer.inlineCallbacks
    def process_response(response):
        assert response is not None, 'Received None in process_response'
        if isinstance(response, Request):
            defer.returnValue(response)

        # 如果下載器中間件有定義process_response 則依次執行
        for method in self.methods['process_response']:
            response = yield method(request=request, response=response,
                                    spider=spider)
            assert isinstance(response, (Response, Request))\
                'Middleware %s.process_response must return Response or Request, got %s' % \
                (six.get_method_self(method).__class__.__name__, type(response))
            if isinstance(response, Request):
                defer.returnValue(response)
        defer.returnValue(response)

    @defer.inlineCallbacks
    def process_exception(_failure):
        exception = _failure.value
        # 如果下載器中間件有定義process_exception 則依次執行
        for method in self.methods['process_exception']:
            response = yield method(request=request, exception=exception,
                                    spider=spider)
            assert response is None or isinstance(response, (Response, Request))\
                'Middleware %s.process_exception must return None, Response or Request, got %s' % \
                (six.get_method_self(method).__class__.__name__, type(response))
            if response:
                defer.returnValue(response)
        defer.returnValue(_failure)

    # 註冊執行、錯誤、回調方法
    deferred = mustbe_deferred(process_request, request)
    deferred.addErrback(process_exception)
    deferred.addCallback(process_response)
    return deferred

在下載過程中,首先找到所有定義好的下載器中間件,包括內置定義好的,也可以自己擴展下載器中間件,下載前先依次執行 process_request,可對 Request 進行加工、處理、校驗等操作,然後發起真正的網絡下載,也就是第一個參數 download_func,在這裏是 Downloader_enqueue_request 方法:

下載成功後回調 Downloader_enqueue_request

def _enqueue_request(self, request, spider):
    # 加入下載請求隊列
    key, slot = self._get_slot(request, spider)
    request.meta['download_slot'] = key

    def _deactivate(response):
        slot.active.remove(request)
        return response

    slot.active.add(request)
    deferred = defer.Deferred().addBoth(_deactivate)
    # 下載隊列
    slot.queue.append((request, deferred))
    # 處理下載隊列
    self._process_queue(spider, slot)
    return deferred
    
def _process_queue(self, spider, slot):
    if slot.latercall and slot.latercall.active():
        return

    # 如果延遲下載參數有配置 則延遲處理隊列
    now = time()
    delay = slot.download_delay()
    if delay:
        penalty = delay - now + slot.lastseen
        if penalty > 0:
            slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
            return

    # 處理下載隊列
    while slot.queue and slot.free_transfer_slots() > 0:
        slot.lastseen = now
        # 從下載隊列中取出下載請求
        request, deferred = slot.queue.popleft()
        # 開始下載
        dfd = self._download(slot, request, spider)
        dfd.chainDeferred(deferred)
        # 延遲
        if delay:
            self._process_queue(spider, slot)
            break
            
def _download(self, slot, request, spider):
    # 註冊方法 調用handlers的download_request
    dfd = mustbe_deferred(self.handlers.download_request, request, spider)

    # 註冊下載完成回調方法
    def _downloaded(response):
        self.signals.send_catch_log(signal=signals.response_downloaded,
                                    response=response,
                                    request=request,
                                    spider=spider)
        return response
    dfd.addCallback(_downloaded)

    slot.transferring.add(request)

    def finish_transferring(_):
        slot.transferring.remove(request)
        # 下載完成後調用_process_queue
        self._process_queue(spider, slot)
        return _

    return dfd.addBoth(finish_transferring)

這裏也維護了一個下載隊列,可根據配置達到延遲下載的要求。真正發起下載請求是調用了 self.handlers.download_request

def download_request(self, request, spider):
    # 獲取請求的scheme
    scheme = urlparse_cached(request).scheme
    # 根據scheeme獲取下載處理器
    handler = self._get_handler(scheme)
    if not handler:
        raise NotSupported("Unsupported URL scheme '%s': %s" %
                           (scheme, self._notconfigured[scheme]))
    # 開始下載 並返回結果
    return handler.download_request(request, spider)
    
def _get_handler(self, scheme):
    # 根據scheme獲取對應的下載處理器
    # 配置文件中定義好了http、https、ftp等資源的下載處理器
    if scheme in self._handlers:
        return self._handlers[scheme]
    if scheme in self._notconfigured:
        return None
    if scheme not in self._schemes:
        self._notconfigured[scheme] = 'no handler available for that scheme'
        return None

    path = self._schemes[scheme]
    try:
        # 實例化下載處理器
        dhcls = load_object(path)
        dh = dhcls(self._crawler.settings)
    except NotConfigured as ex:
        self._notconfigured[scheme] = str(ex)
        return None
    except Exception as ex:
        logger.error('Loading "%(clspath)s" for scheme "%(scheme)s"',
                     {"clspath": path, "scheme": scheme},
                     exc_info=True,  extra={'crawler': self._crawler})
        self._notconfigured[scheme] = str(ex)
        return None
    else:
        self._handlers[scheme] = dh
    return self._handlers[scheme]

下載前,先通過解析 requestscheme 來獲取對應的下載處理器,默認配置文件中定義的下載處理器如下:

DOWNLOAD_HANDLERS_BASE = {
    'file''scrapy.core.downloader.handlers.file.FileDownloadHandler',
    'http''scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
    'https''scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
    's3''scrapy.core.downloader.handlers.s3.S3DownloadHandler',
    'ftp''scrapy.core.downloader.handlers.ftp.FTPDownloadHandler',
}

然後調用 download_request 方法,完成網絡下載,這裏不再詳細講解每個處理器的實現,簡單來說,你可以把它想象成封裝好的網絡下載庫,輸入 URL,它會給你輸出下載結果,這樣方便理解。

在下載過程中,如果發生異常情況,則會依次調用下載器中間件的 process_exception 方法,每個中間件只需定義自己的異常處理邏輯即可。

如果下載成功,則會依次執行下載器中間件的 process_response 方法,每個中間件可以進一步處理下載後的結果,最終返回。

這裏值得提一下,process_request 方法是每個中間件順序執行的,而 process_responseprocess_exception 方法是每個中間件倒序執行的,具體可看一下 DownaloderMiddlewareManager_add_middleware 方法,就可以明白是如何註冊這個方法鏈的。

拿到最終的下載結果後,再回到 ExecuteEngine_next_request_from_scheduler 中,會看到調用了 _handle_downloader_output,也就是處理下載結果的邏輯:

def _handle_downloader_output(self, response, request, spider):
    # 下載結果必須是Request、Response、Failure其一
    assert isinstance(response, (Request, Response, Failure)), response
    # 如果是Request 則再次調用crawl 執行Scheduler的入隊邏輯
    if isinstance(response, Request):
        self.crawl(response, spider)
        return
    # 如果是Response或Failure 則調用scraper的enqueue_scrape進一步處理
    # 主要是和Spiders和Pipeline交互
    d = self.scraper.enqueue_scrape(response, request, spider)
    d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
                                        exc_info=failure_to_exc_info(f),
                                        extra={'spider': spider}))
    return d

拿到下載結果後,主要分 2 個邏輯:

處理下載結果

請求入隊邏輯不用再說,前面已經講過。現在主要看 Scraperenqueue_scrape,看Scraper 組件是如何處理後續邏輯的:

def enqueue_scrape(self, response, request, spider):
    # 加入Scrape處理隊列
    slot = self.slot
    dfd = slot.add_response_request(response, request)
    def finish_scraping(_):
        slot.finish_response(response, request)
        self._check_if_closing(spider, slot)
        self._scrape_next(spider, slot)
        return _
    dfd.addBoth(finish_scraping)
    dfd.addErrback(
        lambda f: logger.error('Scraper bug processing %(request)s',
                               {'request': request},
                               exc_info=failure_to_exc_info(f),
                               extra={'spider': spider}))
    self._scrape_next(spider, slot)
    return dfd

def _scrape_next(self, spider, slot):
    while slot.queue:
        # 從Scraper隊列中獲取一個待處理的任務
        response, request, deferred = slot.next_response_request_deferred()
        self._scrape(response, request, spider).chainDeferred(deferred)

def _scrape(self, response, request, spider):
    assert isinstance(response, (Response, Failure))
    # 調用_scrape2繼續處理
    dfd = self._scrape2(response, request, spider)
    # 註冊異常回調
    dfd.addErrback(self.handle_spider_error, request, response, spider)
    # 出口回調
    dfd.addCallback(self.handle_spider_output, request, response, spider)
    return dfd

def _scrape2(self, request_result, request, spider):
    # 如果結果不是Failure實例 則調用爬蟲中間件管理器的scrape_response
    if not isinstance(request_result, Failure):
        return self.spidermw.scrape_response(
            self.call_spider, request_result, request, spider)
    else:
        # 直接調用call_spider
        dfd = self.call_spider(request_result, request, spider)
        return dfd.addErrback(
            self._log_download_errors, request_result, request, spider)

首先把請求和響應加入到 Scraper 的處理隊列中,然後從隊列中獲取到任務,如果不是異常結果,則調用爬蟲中間件管理器scrape_response 方法:

def scrape_response(self, scrape_func, response, request, spider):
    fname = lambda f:'%s.%s' % (
            six.get_method_self(f).__class__.__name__,
            six.get_method_function(f).__name__)

    def process_spider_input(response):
        # 執行一系列爬蟲中間件的process_spider_input
        for method in self.methods['process_spider_input']:
            try:
                result = method(response=response, spider=spider)
                assert result is None, \
                        'Middleware %s must returns None or ' \
                        'raise an exception, got %s ' \
                        % (fname(method), type(result))
            except:
                return scrape_func(Failure(), request, spider)
        # 執行完中間件的一系列process_spider_input方法後 執行call_spider
        return scrape_func(response, request, spider)

    def process_spider_exception(_failure):
        # 執行一系列爬蟲中間件的process_spider_exception
        exception = _failure.value
        for method in self.methods['process_spider_exception']:
            result = method(response=response, exception=exception, spider=spider)
            assert result is None or _isiterable(result)\
                'Middleware %s must returns None, or an iterable object, got %s ' % \
                (fname(method), type(result))
            if result is not None:
                return result
        return _failure

    def process_spider_output(result):
        # 執行一系列爬蟲中間件的process_spider_output
        for method in self.methods['process_spider_output']:
            result = method(response=response, result=result, spider=spider)
            assert _isiterable(result)\
                'Middleware %s must returns an iterable object, got %s ' % \
                (fname(method), type(result))
        return result

    # 執行process_spider_input
    dfd = mustbe_deferred(process_spider_input, response)
    # 註冊異常回調
    dfd.addErrback(process_spider_exception)
    # 註冊出口回調
    dfd.addCallback(process_spider_output)
    return dfd

有沒有感覺套路很熟悉?與上面下載器中間件調用方式非常相似,也調用一系列的前置方法,再執行真正的處理邏輯,最後執行一系列的後置方法。

回調爬蟲

接下來看一下,Scrapy 是如何執行我們寫好的爬蟲邏輯的,也就是 call_spider 方法,這裏回調我們寫好的爬蟲類:

def call_spider(self, result, request, spider):
    # 回調爬蟲模塊
    result.request = request
    dfd = defer_result(result)
    # 註冊回調方法 取得request.callback 如果未定義則調用爬蟲模塊的parse方法
    dfd.addCallbacks(request.callback or spider.parse, request.errback)
    return dfd.addCallback(iterate_spider_output)

看到這裏,你應該更熟悉,平時我們寫的最多的爬蟲代碼,parse 則是第一個回調方法。之後爬蟲類拿到下載結果,就可以定義下載後的 callback 方法,也是在這裏進行回調執行的。

處理輸出

在與爬蟲類交互完成之後,Scraper 調用了 handle_spider_output 方法處理爬蟲的輸出結果:

def handle_spider_output(self, result, request, response, spider):
    # 處理爬蟲輸出結果
    if not result:
        return defer_succeed(None)
    it = iter_errback(result, self.handle_spider_error, request, response, spider)
    # 註冊_process_spidermw_output
    dfd = parallel(it, self.concurrent_items,
        self._process_spidermw_output, request, response, spider)
    return dfd

def _process_spidermw_output(self, output, request, response, spider):
    # 處理Spider模塊返回的每一個Request/Item
    if isinstance(output, Request):
        # 如果結果是Request 再次入Scheduler的請求隊列
        self.crawler.engine.crawl(request=output, spider=spider)
    elif isinstance(output, (BaseItem, dict)):
        # 如果結果是BaseItem/dict
        self.slot.itemproc_size += 1
        # 調用Pipeline的process_item
        dfd = self.itemproc.process_item(output, spider)
        dfd.addBoth(self._itemproc_finished, output, response, spider)
        return dfd
    elif output is None:
        pass
    else:
        typename = type(output).__name__
        logger.error('Spider must return Request, BaseItem, dict or None, '
                     'got %(typename)r in %(request)s',
                     {'request': request, 'typename': typename},
                     extra={'spider': spider})

執行完我們自定義的解析邏輯後,解析方法可返回新的 RequestBaseItem 實例。

如果是新的請求,則再次通過 Scheduler 進入請求隊列,如果是 BaseItem 實例,則調用 Pipeline 管理器,依次執行 process_item。我們想輸出結果時,只需要定義 Pepeline 類,然後重寫這個方法就可以了。

ItemPipeManager 處理邏輯:

class ItemPipelineManager(MiddlewareManager):

    component_name = 'item pipeline'

    @classmethod
    def _get_mwlist_from_settings(cls, settings):
        return build_component_list(settings.getwithbase('ITEM_PIPELINES'))

    def _add_middleware(self, pipe):
        super(ItemPipelineManager, self)._add_middleware(pipe)
        if hasattr(pipe, 'process_item'):
            self.methods['process_item'].append(pipe.process_item)

    def process_item(self, item, spider):
        # 依次調用Pipeline的process_item
        return self._process_chain('process_item', item, spider)

可以看到 ItemPipeManager 也是一箇中間件,和之前下載器中間件管理器和爬蟲中間件管理器類似,如果子類有定義 process_item,則依次執行它。

執行完之後,調用 _itemproc_finished

def _itemproc_finished(self, output, item, response, spider):
    self.slot.itemproc_size -= 1
    if isinstance(output, Failure):
        ex = output.value
        # 如果在Pipeline處理中拋DropItem異常 忽略處理結果
        if isinstance(ex, DropItem):
            logkws = self.logformatter.dropped(item, ex, response, spider)
            logger.log(*logformatter_adapter(logkws)extra={'spider': spider})
            return self.signals.send_catch_log_deferred(
                signal=signals.item_dropped, item=item, response=response,
                spider=spider, exception=output.value)
        else:
            logger.error('Error processing %(item)s'{'item': item},
                         exc_info=failure_to_exc_info(output),
                         extra={'spider': spider})
    else:
        logkws = self.logformatter.scraped(output, response, spider)
        logger.log(*logformatter_adapter(logkws)extra={'spider': spider})
        return self.signals.send_catch_log_deferred(
            signal=signals.item_scraped, item=output, response=response,
            spider=spider)

這裏可以看到,如果想在 Pipeline 中丟棄某個結果,直接拋出 DropItem 異常即可,Scrapy 會進行對應的處理。

到這裏,抓取結果會根據自定義的輸出類,然後輸出到指定位置,而新的 Request 則會再次進入請求隊列,等待引擎下一次調度,也就是再次調用 ExecutionEngine_next_request,直至請求隊列沒有新的任務,整個程序退出。

CrawlerSpider

以上,基本上整個核心抓取流程就講完了。

這裏再簡單說一下 CrawlerSpider 類,我們平時用的也比較多,它其實就是繼承了 Spider 類,然後重寫了 parse 方法(這也是繼承此類不要重寫此方法的原因),並結合 Rule 規則類,來完成 Request 的自動提取邏輯。

Scrapy 提供了這個類方便我們更快速地編寫爬蟲代碼,我們也可以基於此類進行再次封裝,讓我們的爬蟲代碼寫得更簡單。

由此我們也可看出,Scrapy 的每個模塊的實現都非常純粹,每個組件都通過配置文件定義連接起來,如果想要擴展或替換,只需定義並實現自己的處理邏輯即可,其他模塊均不受任何影響,所以我們也可以看到,業界有非常多的 Scrapy 插件,都是通過此機制來實現的。

總結

這篇文章的代碼量較多,也是 Scrapy 最爲核心的抓取流程,如果你能把這塊邏輯搞清楚了,那對 Scrapy 開發新的插件,或者在它的基礎上進行二次開發也非常簡單了。

總結一下整個抓取流程,還是用這兩張圖表示再清楚不過:

Scrapy 整體給我的感覺是,雖然它只是個單機版的爬蟲框架,但我們可以非常方便地編寫插件,或者自定義組件替換默認的功能,從而定製化我們自己的爬蟲,最終可以實現一個功能強大的爬蟲框架,例如分佈式、代理調度、併發控制、可視化、監控等功能,它的靈活度非常高。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/_GfhiIxT0pQCvk80r4KCUA