内容简介:本文记录下自己看 Scrapy 源码的一点经验,没有涉及框架的方方面面,更多的是关注数据的流转以及代码的组织。如果你想深入框架的某个细节,那么这篇文字可以给你一个切入点。阅读源码常规的步骤我们一步一步来。
本文记录下自己看 Scrapy 源码的一点经验,没有涉及框架的方方面面,更多的是关注数据的流转以及代码的组织。如果你想深入框架的某个细节,那么这篇文字可以给你一个切入点。
阅读源码常规的步骤
- 准备好必须的基础知识
- 熟悉框架有哪些功能
- 看源码,了解代码是怎么组织的,最后按需关注特定部分的实现细节
我们一步一步来。
目录
1 Twisted
Scrapy 基于 Twisted,所以除了要有一定的 Python 基础外,还要对 Twisted 有一些了解。
你可以将 Twisted 和 Asyncio 类比,它们都是为了支持协程而诞生的,只是前者比后者出现的更早。这 2 个技术实现的核心都是事件循环,当程序执行到某个耗时的 IO 操作时,程序的执行权限会被退回给事件循环,事件循环会检测其它准备就绪的协程,然后将执行权限交给它,当之前的协程 IO 操作完毕后,事件循环会将执行权限转给它,继续后面的操作。这样,就能在单线程内实现并发,作用和多线程类似,只是比多线程更轻量。事件循环在 Asyncio 中被叫做 event_loop,在 Twisted 中叫做 reactor。我们看一些简单的例子
1.1 Schedule
- 多少秒后执行某个任务
from twisted.internet import reactor def f(s): print('this will run 3.5 seconds after it was scheduled: %s' % s) reactor.callLater(3.5, f, 'hello, world') # f() will only be called if the event loop is started. reactor.run()
reactor.callLater 会将某个任务加入到事件循环,并设置好多少秒后开始执行,当然要将事件循环启动后才会有作用。
- 每隔多少秒执行某个任务
from twisted.internet import reactor, task def f(s): print(s) loop = task.LoopingCall(f, 'hello, world') # Start looping every 1 second. loop.start(1) reactor.run()
每隔 1s 执行一次 f 方法。
1.2 Deferred
Deferred 表示某个任务未来会产生结果,当任务执行完毕后,会执行注册在 Deferred 的回调函数,并将结果传递给它。
1.2.1 成功回调
from twisted.internet import reactor, defer def get_dummpy_data(input_data): print('get_dummpy_data called') deferred = defer.Deferred() reactor.callLater(2, deferred.callback, input_data * 3) return deferred def cb_print_data(result): print('Result received: {}'.format(result)) deferred = get_dummpy_data(3) deferred.addCallback(cb_print_data) # manually set up the end of the process by asking the reactor to # stop itself in 4 seconds time reactor.callLater(4, reactor.stop) reactor.run()
get_dummpy_data
,返回了 1 个 defer. Deferred 对象,表示该函数是 1 个异步任务,会在未来某个时间点产生结果,这里使用
reactor.callLater(2, deferred.callback, input_data * 3)
模仿异步结果,2s 后执行 deferred 的回调函数,并将计算后的结果传递给它。
cb_print_data
,简单的打印结果。
看下执行流程,首先调用 get_dummpy_data
得到 deferred,然后注册回调函数 cb_print_data,最后运行事件循环。为了让程序正常关闭,还设置了在 4s 后自动关闭事件循环。执行结果
get_dummpy_data called Result received: 9
1.2.2 异常回调
上面我们使用 addCallback 注册成功回调,除开这个,我们还能注册异常回调,实现异常处理。
from twisted.internet import reactor, defer def get_dummpy_data(input_data): print('get_dummpy_data called') deferred = defer.Deferred() if input_data % 2 == 0: reactor.callLater(2, deferred.callback, input_data * 3) else: reactor.callLater(2, deferred.errback, ValueError('You used an odd number!')) return deferred def cb_print_data(result): print('Result received: {}'.format(result)) def eb_print_error(failure): print(failure) deferred = get_dummpy_data(3) deferred.addCallback(cb_print_data) deferred.addErrback(eb_print_error) reactor.callLater(4, reactor.stop) reactor.run()
get_dummpy_data,当 input_data 为奇数时,2s 后会触发 deferred 的异常回调。
执行时,使用 addErrback 注册异常回调。结果
get_dummpy_data called [Failure instance: Traceback (failure with no frames): <class 'ValueError'>: You used an odd number! ]
1.2.3 回调链
看到这里,有的读者可能会想如果我注册多个成功回调以及多个异常回调,成功或者异常时,具体的执行流程又是怎么样的呢?这就要引入回调链的概念了。看下图
有 2 种类型的主链:成功以及异常回调链
- 对于前者来说,每个回调的结果都是下个回调的输入,比如 cb1 的结果会是 cb2 的输入
- 对于后者来说,某个回调返回异常或者抛出异常,就会将异常作为下一个回调的输入,比如 eb1 如果返回或者抛出异常,那么 eb2 会被调用,并将 eb1 返回或者抛出的异常作为参数
当然,还有其它情况
- 如果 cb1 异常,那么 eb2 会被调用,并将 cb1 中的异常作为参数
- 如果 eb1 处理了异常并返回值,那么 cb2 会被调用,并将 eb1 的返回值作为参数
也就是说, 当前级别的回调只根据上一级别的回调的执行情况进行触发。
添加回调大概有下面几种方式
- addCallback、addErrback
- addCallbacks
- addBoth
这些方式有些细节上的不同,举例说明下
第 1 个例子
d = getDeferredFromSomewhere() d.addCallback(cb1) # A d.addErrback(eb1) # B d.addCallback(cb2) d.addErrback(eb2)
对应图
也即,对于每 1 次 addCallback(addErrback),都会在同一级上绑定 1 个成功回调(异常回调)和 1 个什么都不做的异常回调(成功回调)。
第 2 个例子
d = getDeferredFromSomewhere() d.addCallbacks(cb1, eb1) # C d.addCallbacks(cb2, eb2)
对应图
也即,addCallbacks 会在同一级别上绑定 1 个成功回调和 1 个异常回调。
为了加强我们的理解,可以想下这个问题,如果 cb1 出现异常,对于这 2 个例子的执行情况有什么差别?结合图其实很好理解,对于第 1 个例子,eb1 会被调用;对于第 2 个例子,eb2 会被调用。
至于 addBoth(cb),可以理解为
d = getDeferredFromSomewhere() d.addCallback(cb) d.addErrback(cb)
1.2.4 DeferredList
当碰到需要等待多个 deferred 执行完毕的时候,我们可以使用 DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3])
看个例子
from twisted.internet import defer def print_result(result): for (success, value) in result: if success: print('Success:', value) else: print('Failure:', value.getErrorMessage()) deferred1 = defer.Deferred() deferred2 = defer.Deferred() deferred3 = defer.Deferred() dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True) dl.addCallback(print_result) deferred1.callback('one') deferred2.errback(Exception('bang!')) deferred3.callback('three')
print_result print_result
看下执行结果
Success: one Failure: bang! Success: three
1.2.5 gatherResults
gatherResults 和 DeferredList 类似,也是等待多个 deferred,不同的是
- 只要有 1 个 deferred 异常,就会触发整体的异常
- 返回的结果列表中的元素不是一个元祖,而是对应 deferred 的真正的结果。因为只要有 1 个 deferred 发生异常就会触发整体的异常回调,所以成功回调表示所有的 deferred 都是执行成功了的,也就没有必要使用布尔值表示是否成功执行了。
from twisted.internet import defer d1 = defer.Deferred() d2 = defer.Deferred() d = defer.gatherResults([d1, d2]) def print_result(result): print(result) d.addBoth(print_result) d1.callback('one') d2.errback(Exception('bang!'))
print_result
看下执行结果
[Failure instance: Traceback (failure with no frames): <class 'twisted.internet.defer.FirstError'>: FirstError[#1, [Failure instance: Traceback (failure with no frames): <class 'Exception'>: bang! ]] ] Unhandled error in Deferred: Traceback (most recent call last): Failure: builtins.Exception: bang!
如果 gatherResults 时,设置了 consumeErrors 为 True,不会有 Unhandled error in Deferred:
及其之后的输出。
如果将
d2.errback(Exception('bang!'))
改为
d2.callback('two')
结果为
['one', 'two']
1.3 defer.inlineCallbacks
在 Scrapy 源码中,我们经常会看到类似这样的语法
@defer.inlineCallbacks def f(): ...
这可以理解为一个语法糖,表示在被这个装饰器装饰过的方法里面,可以使用 yield 等待 deferred 的结果,我们可以改写 1.2.1 中的例子为
from twisted.internet import reactor, defer def get_dummpy_data(input_data): print('get_dummpy_data called') deferred = defer.Deferred() reactor.callLater(2, deferred.callback, input_data * 3) return deferred @defer.inlineCallbacks def main(): result = yield get_dummpy_data(3) print(result) main() reactor.callLater(4, reactor.stop) reactor.run()
1.4 更多
如果只是想简单的看懂 Scrapy 源码的话,关于 Twisted,理解上面的一些概念就差不多了。虽说源码里面还是一些上面没提及的方法,比如 maybeDeferred 等,不过有上面的一些基础后,你通过查看官方文档或者跳转方法定义处读读,也能较快的理解其用处了。
如果想比较全面的掌握 Twisted,可以先看一本书《Twisted. Network. Programming》,这本书可以帮你更为全面的理解 Twisted 的一些基本概念。读完此书后,再根据需要去读官方文档的特定部分,就要轻松一些了。
2 框架有哪些功能
要知道一个框架有些什么功能,可以看它的官方文档。刚接触 Scrapy 的同学一看文档,可能会觉得功能有点多,有点杂。其实,我们可以看下它的数据流图,就可以将其主要的功能给串起来了。
从图上可以直观的看出,Scrapy 的主要部件有 Engine、Spider、Scheduler、Downloader、Item Pipelines 以及中间件。
更详细的功能,我们就可以通过主要部件去联想。比如:
- Spider 涉及 Request、Response、Selector、Extractor
- Scheduler 涉及去重的 DupeFilter
- Downloader 涉及并发量、下载延时等设置
- Item Pipelines 涉及 Items、Item Loaders、Feed Exports
- 至于中间件,在 Spider 和 Engine 中间的是 Spider Middleware,在 Engine 和 Downloader 中间的是 Downloader Middleware
除此之外,文档上还列了一些内置服务以及解决特定问题的示例,比如:Stats Collection 可以用于统计;Telnet Console 可以查看运行时爬虫的状态、内存使用情况等;对于广度优先的爬虫怎么设置;等等。
上面的联想只是作为一个参考,不同的人有不同的联想方式。
这样一来,我们就了解了框架的大部分功能了。 不过,我还是推荐在使用 Scrapy 一段时间后,碰到特定的问题后,再去看相应的源码。一方面,那个时候,你会对框架的很多概念有更深的理解,看源码的时候更容易理解作者的用心;另一方面,如果只是范范的看源码,很难抓重点,很可能你看了一段时间后,感觉好像没有从中学到什么。
现在我们还是参考上面的图,来仔细看下框架中的数据流
- Spider 就是我们常写的那个 spider 文件,Engine 从
start_urls
或者start_requests
中获取初始请求 - Engine 将请求加入 Scheduler 并从中获取下一个请求
- Scheduler 返回给 Engine 下一个请求
- Engine 将请求发送给 Downloader,这会调用 Downloader Middleware 中的
process_request
- 当 Downloader 下载完毕后会生成一个响应发送给 Engine,这会调用 Downloader Middleware 的
process_response
- Engine 收到 Downloader 生成的响应后会将其发送给 Spider,这会调用 Spider Middleware 的
process_spider_input
- 当 Spider 处理完响应后,会将生成的 Items 或者新的请求发送给 Engine,这会调用 Spider Middleware 的
process_spider_output
- 如果 Spider 发送给 Engine 的有 Items,Engine 会将 Item 发送给 Item Pipelines;如果有新的请求,会将其加入 Scheduler;如果 Scheduler 还有未处理的请求的话,Engine 会向其获取下一个请求
- 重复步骤 1 直到 Scheduler 中的请求被处理完
到这里,看源码前的准备工作就差不多了。下一篇开始跟源码,感兴趣的伙伴可以关注下后面的文章。
以上所述就是小编给大家介绍的《Scrapy 源码阅读(一):Twisted 基础和 Scrapy 数据流》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 大数据技术 DataPipeline在大数据平台的数据流实践
- DataPipeline在大数据平台的数据流实践
- 我对前后端数据模型和数据流的理解
- stream – 数据流处理
- 浅谈hdfs架构与数据流
- 我所认识的前端数据流
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。