Golang实现简单爬虫框架(4)——队列实现并发任务调度

栏目: Go · 发布时间: 7年前

内容简介:在上一篇文章其实我们可以自己做一个任务分发的机制,我们来决定分发给哪一个Worker注意:本次并发是在上一篇文章简单并发实现的基础上修改,所以没有贴出全部代码,只是贴出部分修改部分,要查看完整项目代码,可以查看上篇文章,或者从github下载

前言

在上一篇文章 《Golang实现简单爬虫框架(3)——简单并发版》 中我们实现了一个最简单并发爬虫,调度器为每一个 Request 创建一个 goroutine ,每个 goroutineWorker 队列中分发任务,发完就结束。所有的 Worker 都在抢一个 channel 中的任务。但是这样做还是有些许不足之处,比如控制力弱:所有的Worker在抢同一个 channel 中的任务,我们没有办法控制给哪一个worker任务。

其实我们可以自己做一个任务分发的机制,我们来决定分发给哪一个Worker

注意:本次并发是在上一篇文章简单并发实现的基础上修改,所以没有贴出全部代码,只是贴出部分修改部分,要查看完整项目代码,可以查看上篇文章,或者从github下载 项目源代码查看

1、项目架构

在上一篇文章实现简单并发的基础上,我们修改下 Scheduler 的任务分发机制

Golang实现简单爬虫框架(4)——队列实现并发任务调度

  • Scheduler 接收到一个 Request 后,不能直接发给 Worker ,也不能为每个 Request 创建一个 goroutine ,所以这里使用一个Request队列
  • 同时我们想对 Worker 实现一个更多的控制,可以决定把任务分发给哪一个 Worker ,所以这里我们还需要一个 Worker 队列
  • 当有了 RequestWorker ,我们就可以把选择的Request发送给选择的 Worker

2、队列实现任务调度器

在scheduler目录下创建queued.go文件

package scheduler

import "crawler/engine"

// 使用队列来调度任务

type QueuedScheduler struct {
    requestChan chan engine.Request        // Request channel
    // Worker channel, 其中每一个Worker是一个 chan engine.Request 类型
    workerChan  chan chan engine.Request    
}

// 提交请求任务到 requestChannel
func (s *QueuedScheduler) Submit(request engine.Request) {
    s.requestChan <- request
}

func (s *QueuedScheduler) ConfigMasterWorkerChan(chan engine.Request) {
    panic("implement me")
}

// 告诉外界有一个 worker 可以接收 request
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
    s.workerChan <- w
}

func (s *QueuedScheduler) Run() {
    // 生成channel
    s.workerChan = make(chan chan engine.Request)
    s.requestChan = make(chan engine.Request)
    go func() {
        // 创建请求队列和工作队列
        var requestQ []engine.Request
        var workerQ []chan engine.Request
        for {
            var activeWorker chan engine.Request
            var activeRequest engine.Request
            
            // 当requestQ和workerQ同时有数据时
            if len(requestQ) > 0 && len(workerQ) > 0 {
                activeWorker = workerQ[0]
                activeRequest = requestQ[0]
            }
            
            select {
            case r := <-s.requestChan: // 当 requestChan 收到数据
                requestQ = append(requestQ, r)
            case w := <-s.workerChan: // 当 workerChan 收到数据
                workerQ = append(workerQ, w)
            case activeWorker <- activeRequest: // 当请求队列和认读队列都不为空时,给任务队列分配任务
                requestQ = requestQ[1:]
                workerQ = workerQ[1:]
            }
        }
    }()
}

3、爬虫引擎

修改后的concurrent.go文件如下

package engine

import (
    "log"
)

// 并发引擎
type ConcurrendEngine struct {
    Scheduler   Scheduler
    WorkerCount int
}

// 任务调度器
type Scheduler interface {
    Submit(request Request) // 提交任务
    ConfigMasterWorkerChan(chan Request)
    WorkerReady(w chan Request)
    Run()
}

func (e *ConcurrendEngine) Run(seeds ...Request) {

    out := make(chan ParseResult)
    e.Scheduler.Run()

    // 创建 goruntine
    for i := 0; i < e.WorkerCount; i++ {
        createWorker(out, e.Scheduler)
    }

    // engine把请求任务提交给 Scheduler
    for _, request := range seeds {
        e.Scheduler.Submit(request)
    }

    itemCount := 0
    for {
        // 接受 Worker 的解析结果
        result := <-out
        for _, item := range result.Items {
            log.Printf("Got item: #%d: %v\n", itemCount, item)
            itemCount++
        }

        // 然后把 Worker 解析出的 Request 送给 Scheduler
        for _, request := range result.Requests {
            e.Scheduler.Submit(request)
        }
    }
}

func createWorker(out chan ParseResult, s Scheduler) {
    // 为每一个Worker创建一个channel
    in := make(chan Request)
    go func() {
        for {
            s.WorkerReady(in) // 告诉调度器任务空闲
            request := <-in
            result, err := worker(request)
            if err != nil {
                continue
            }
            out <- result
        }
    }()
}

4、main函数

package main

import (
    "crawler/engine"
    "crawler/scheduler"
    "crawler/zhenai/parser"
)

func main() {
    e := engine.ConcurrendEngine{
        Scheduler:   &scheduler.QueuedScheduler{},// 这里调用并发调度器
        WorkerCount: 50,
    }
    e.Run(engine.Request{
        Url:       "http://www.zhenai.com/zhenghun",
        ParseFunc: parser.ParseCityList,
    })
}

运行结果如下:

Golang实现简单爬虫框架(4)——队列实现并发任务调度

5、总结

在这篇文章中我们使用队列实现对并发任务的调度,从而实现了对Worker的控制。我们现在并发有两种实现方式,但是他们的调度方法是不同的,为了代码的统一,所以在下一篇文章中的内容有:

  • 对项目做一个同构
  • 添加数据的存储模块。

如果想获取 Google工程师深度讲解 go 语言 视频资源的,可以在评论区留下邮箱。

项目的 源代码 已经托管到Github上,对于各个版本都有记录,欢迎大家查看,记得给个star,在此先谢谢大家

如果觉得博客不错,劳烦大人给个赞,


以上所述就是小编给大家介绍的《Golang实现简单爬虫框架(4)——队列实现并发任务调度》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Linux Command Line

The Linux Command Line

William E. Shotts Jr. / No Starch Press, Incorporated / 2012-1-17 / USD 39.95

You've experienced the shiny, point-and-click surface of your Linux computer-now dive below and explore its depths with the power of the command line. The Linux Command Line takes you from your very ......一起来看看 《The Linux Command Line》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

html转js在线工具
html转js在线工具

html转js在线工具