【Laravel-海贼王系列】第十章,Job&队列存储端实现

栏目: PHP · 发布时间: 7年前

内容简介:新建的这里就是执行一个这里会返回一个
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class TestJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct()
    {
        echo '开始构造Job';
    }
    
    public function handle()
    {
        echo '开始处理Job';
    }
}

复制代码

新建的 TestJob 类,这个类实现了序列化模型,队列功能等等都是通过 trait 类来补充的。 这些特性我们通过使用来分解。

运行一个任务

dispatch(new TestJob());
复制代码

这里就是执行一个 TestJob 的任务,接下去看看 dispatch() 这个方法

function dispatch($job)
    {
        if ($job instanceof Closure) {
            $job = new CallQueuedClosure(new SerializableClosure($job));
        }

        return new PendingDispatch($job);
    }
复制代码

这里会返回一个 Illuminate\Foundation\Bus\PendingDispatch 对象

【Laravel-海贼王系列】第十章,Job&队列存储端实现

TestJob 这个对象里面通过 use Queueable 引入的几个成员属性。 目前为止我们看到只不过是实例化了一个对象,同时将 TestJob 传给 PendingDispatch

我们来解读 PendingDispatch 这个类

<?php

namespace Illuminate\Foundation\Bus;

use Illuminate\Contracts\Bus\Dispatcher;

class PendingDispatch
{
    protected $job;
   
    public function __construct($job)
    {
        $this->job = $job; // 接收传入的 job 对象
    }

    public function onConnection($connection)
    {
        $this->job->onConnection($connection); // 设置任务指定连接

        return $this;
    }

    public function onQueue($queue)
    {
        $this->job->onQueue($queue); // 设置任务队列名

        return $this;
    }

    public function allOnConnection($connection)
    {
        $this->job->allOnConnection($connection); // 设置工作链所有需要的连接

        return $this;
    }

    public function allOnQueue($queue)
    {
        $this->job->allOnQueue($queue); // 设置工作链的队列

        return $this;
    }

    public function delay($delay)
    {
        $this->job->delay($delay); // 设置延迟时间

        return $this;
    }
  
    public function chain($chain)
    {
        $this->job->chain($chain); // 设置工作链任务

        return $this;
    }

    public function __destruct()
    {
        app(Dispatcher::class)->dispatch($this->job); // 通过析构函数来转发job
    }
}
复制代码

分解完这个类,其实大部分都是设置参数的过程,也是通过这些参数来控制任务的执行状态,比如延迟,工作链模式运行等等。

重点在析构函数,当运行完 return new PendingDispatch($job); 之后对象如果没有被任何变量接收,那么对象的内存空间会被回收,从而触发析构函数执行,也是触发 job 继续执行的方式!

public function __destruct()
    {
        app(Dispatcher::class)->dispatch($this->job); // 通过析构函数来转发job
    }
复制代码

获取任务对应的解析器

app(Dispatcher::class) 传入的参数是 Illuminate\Bus\Dispatcher , 这个契约对应的绑定类是通过配置文件 app.providers.Illuminate\Bus\BusServiceProvider::class 来加载的 关于 provider 的启动在第九章中有讲,我们直接看启动方法

public function register()
    {
        $this->app->singleton(Dispatcher::class, function ($app) {
            return new Dispatcher($app, function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
            });
        });

        $this->app->alias(
            Dispatcher::class, DispatcherContract::class
        );

        $this->app->alias(
            Dispatcher::class, QueueingDispatcherContract::class
        );
    }
复制代码

app(Dispatcher::class) 的实质就是这个闭包的返回

function ($app) {
            return new Dispatcher($app, function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
            });
        }
复制代码

看看 Dispatcher 构造函数

public function __construct(Container $container, Closure $queueResolver = null)
    {
        $this->container = $container;
        $this->queueResolver = $queueResolver;
        $this->pipeline = new Pipeline($container);
    }
复制代码

接受两个参数,第一个是容器,第二个就是闭包所以 $this->queueResolver 就是

function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
            }
复制代码

我管这个 $this->queueResolver 叫解析器,作用是接收一个 $connection 然后从容器中解析出队列的驱动并进行连接。

QueueFactoryContract::class 是通过 provider 加载的 位于 app.providers.Illuminate\Queue\QueueServiceProvider::class, 返回的对象是 Illuminate\Queue\QueueManager 由于 'default' => env('QUEUE_CONNECTION', 'sync'), 中配置的 redis 所以最后返回的对象是 Illuminate\Queue\RedisQueue

分发任务到队列

public function dispatch($command)
    {
        // $this->queueResolver 这个队列解析器是在构造的时候注入的
        if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
            return $this->dispatchToQueue($command);
        } 

        return $this->dispatchNow($command);
    }
复制代码

上面的方法明确了任务是该通过队列还是同步执行。

这里我们看,传入的 $command 就是开始的 TestJob 对象。 还记得 Laravel 文档说的如果要通过队列实现需要实现一个指定的接口吗 implements ShouldQueue ,这段代码就是解释了原因。

protected function commandShouldBeQueued($command)
    {
        return $command instanceof ShouldQueue;
    }
复制代码

继续下去,通过上面的判断之后我们进入 dispatchToQueue($command) 这里

public function dispatchToQueue($command)
    {
        $connection = $command->connection ?? null;

        $queue = call_user_func($this->queueResolver, $connection);

        if (! $queue instanceof Queue) {
            throw new RuntimeException('Queue resolver did not return a Queue implementation.');
        }

        if (method_exists($command, 'queue')) {
            return $command->queue($queue, $command);
        }

        return $this->pushCommandToQueue($queue, $command);
    }
复制代码

上面解析过了 $queue 就是 Illuminate\Queue\RedisQueue 这个对象

if (method_exists($command, 'queue')) {
            return $command->queue($queue, $command);
        } // 返回 false
复制代码

所有最后执行了 return $this->pushCommandToQueue($queue, $command);

protected function pushCommandToQueue($queue, $command)
    {
        if (isset($command->queue, $command->delay)) {
            return $queue->laterOn($command->queue, $command->delay, $command);
        } // 如果存在指定的队列和延迟,则推入指定队列+延迟

        if (isset($command->queue)) {
            return $queue->pushOn($command->queue, $command);
        } // 如果存在指定的队列则push到指定的队列

        if (isset($command->delay)) {
            return $queue->later($command->delay, $command);
        } // 只存在延迟设置,推入延迟

        return $queue->push($command); // 默认
    }
复制代码

构造数据

上面已经到了最终的调用,那么接下来的事情就是构造一个什么样格式的数据存入 redis

追踪 $queue->push($command)

这里的 $job 就是最开始传入的 TestJob 对象!

public function push($job, $data = '', $queue = null)
    {
        return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
    }
复制代码

构造 payload

protected function createPayload($job, $queue, $data = '')
    {
        $payload = json_encode($this->createPayloadArray($job, $queue, $data));

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidPayloadException(
                'Unable to JSON encode payload. Error code: '.json_last_error()
            );
        }

        return $payload;
    }
复制代码

// 这里的 createPayloadArray() 先调用 Illuminate\Queue\RedisQueue 对象的

protected function createPayloadArray($job, $queue, $data = '')
    {
        return array_merge(parent::createPayloadArray($job, $queue, $data), [
            'id' => $this->getRandomId(),
            'attempts' => 0,
        ]);
    }
复制代码

// 追踪父类 Illuminate\Queue\Queue 方法

protected function createPayloadArray($job, $queue, $data = '')
    {
        return is_object($job)
                    ? $this->createObjectPayload($job, $queue)
                    : $this->createStringPayload($job, $queue, $data);
    }    
    
// $job 是对象的时候格式化方式
protected function createObjectPayload($job, $queue)
    {
        $payload = $this->withCreatePayloadHooks($queue, [
            'displayName' => $this->getDisplayName($job),
            'job' => 'Illuminate\Queue\CallQueuedHandler@call',
            'maxTries' => $job->tries ?? null, // 这是任务设置的重试次数
            'timeout' => $job->timeout ?? null, // 这是超时时间
            'timeoutAt' => $this->getJobExpiration($job), // 获取处理过期时间
            'data' => [
                'commandName' => $job,
                'command' => $job,
            ],
        ]);

        return array_merge($payload, [
            'data' => [
                'commandName' => get_class($job),
                'command' => serialize(clone $job), 
                            // 序列化,这里的序列化会调用 
                            // SerializesModels 特质类的__sleep()方法
                            // 在开头的时候所有的 Job 类都有use
            ],
        ]);
    }
    
// $job 是字符串的时候格式化方式    
protected function createStringPayload($job, $queue, $data)
    {
        return $this->withCreatePayloadHooks($queue, [
            'displayName' => is_string($job) ? explode('@', $job)[0] : null,
            'job' => $job,
            'maxTries' => null,
            'timeout' => null,
            'data' => $data,
        ]);
    }    
复制代码

将获取的最后 json 字符串 rpushredis 中。

public function pushRaw($payload, $queue = null, array $options = [])
    {
        $this->getConnection()->rpush($this->getQueue($queue), $payload); 
        return json_decode($payload, true)['id'] ?? null; 
    }    
复制代码
【Laravel-海贼王系列】第十章,Job&队列存储端实现
至于延迟任务 return $queue->later($command->delay, $command);

, 逻辑基本上一样,只不过最后存入的队列是名不一样

【Laravel-海贼王系列】第十章,Job&队列存储端实现

小结

到这里位置关于任务和队列的应用写入端口已经完成,最终是把指定的格式存入配置的存储驱动中的过程。


以上所述就是小编给大家介绍的《【Laravel-海贼王系列】第十章,Job&队列存储端实现》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Rework

Rework

Jason Fried、David Heinemeier Hansson / Crown Business / 2010-3-9 / USD 22.00

"Jason Fried and David Hansson follow their own advice in REWORK, laying bare the surprising philosophies at the core of 37signals' success and inspiring us to put them into practice. There's no jarg......一起来看看 《Rework》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具