Laravel 中使用 swoole 项目实战开发案例二 (后端主动分场景给界面推送消息)

栏目: PHP · 发布时间: 5年前

内容简介:最近使用 swoole 做了项目,里面设计推送信息给界面前端,和无登陆用户的状态监控,以下是本人从中获取的一点心得,有改进的地方请留言评论。我们假设有一个需求,我在后端点击按钮1,首页弹出“后端触发了按钮1”。后端点了按钮2,列表页弹出“后端触发了按钮2”。做到根据不同场景推送到不同页面。客户端浏览器打开或者刷新界面,在swoole服务会生成一个进程句柄

最近使用 swoole 做了项目,里面设计推送信息给界面前端,和无登陆用户的状态监控,以下是本人从中获取的一点心得,有改进的地方请留言评论。

需求分析

我们假设有一个需求,我在后端点击按钮1,首页弹出“后端触发了按钮1”。后端点了按钮2,列表页弹出“后端触发了按钮2”。做到根据不同场景推送到不同页面。

代码思路

  • Swoole fd

客户端浏览器打开或者刷新界面,在swoole服务会生成一个进程句柄 fd ,每次浏览器页面有打开链接websocket的js代码,便会生成,每次刷新的时候,会关闭之前打开的 fd ,重新生成一个新的,关闭界面的时候会生成一个新的。swoole的 fd 生成规则是从1开始递增。

  • Redis Hash存储 fd

我们建立一个key为 swoole:fds redis哈希类型数据, fd 为hash的字段,每个字段的值我们存储前端websocket请求的url参数信息(根据业务复杂度自己灵活变通,我在项目中会在url带上sessionId)。每次链接打开swoole服务的时候我们存储其信息,每次关闭页面时候我们清除其字段。在 redis 存储如下

Laravel 中使用 swoole 项目实战开发案例二 (后端主动分场景给界面推送消息)

  • 触发分场景推送

    在界面上当进行了触发操作的时候,通过后台curl请求swoole http服务,swoole http服务根据你向我传递的参数分发给对应的逻辑处理。如curl请求 127.0.0.1:9502page=back&func=pushHomeLogic&token=123456 我们可以根据传入的 func 参数,在后台分发给对应逻辑处理。如分发给 pushHomeLogic 方法。在其里面实现自己的逻辑。为防止过多的 if else 以及 foreach 操作 ,我们采用的是闭包, call_user_func 等方法实现如下

    public function onRequest($request,$response)
       {
           if ($this->checkAccess("", $request)) {
               $param = $request->get;
               // 分发处理请求逻辑
               if (isset($param['func'])) {
                   if (method_exists($this,$param['func'])) {
                       call_user_func([$this,$param['func']],$request);
                   }
               }
           }
       }// 往首页推送逻辑处理
       public function pushHomeLogic($request)
       {
           $callback = function (array $aContent,int $fd,SwooleDemo $oSwoole)use($request) {
               if ($aContent && $aContent['page'] == "home") {
                   $aRes['message'] = "后端按了按钮1";
                   $aRes['code'] = "200";
                   $oSwoole::$server->push($fd,xss_json($aRes));
               }
           };
           $this->eachFdLogic($callback);
       }

完整代码

swool脚本代码逻辑

<?php

namespace App\Console\Commands;

use Closure;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;

class SwooleDemo extends Command
{
    // 命令名称
    protected $signature = 'swoole:demo';
    // 命令说明
    protected $description = '这是关于swoole websocket的一个测试demo';
    // swoole websocket服务
    private static $server = null;

    public function __construct()
    {
        parent::__construct();
    }

    // 入口
    public function handle()
    {
        $this->redis = Redis::connection('websocket');
        $server = self::getWebSocketServer();
        $server->on('open',[$this,'onOpen']);
        $server->on('message', [$this, 'onMessage']);
        $server->on('close', [$this, 'onClose']);
        $server->on('request', [$this, 'onRequest']);
        $this->line("swoole服务启动成功 ...");
        $server->start();
    }

    // 获取服务
    public static function getWebSocketServer()
    {
        if (!(self::$server instanceof \swoole_websocket_server)) {
            self::setWebSocketServer();
        }
        return self::$server;
    }
    // 服务处始设置
    protected static function setWebSocketServer():void
    {
        self::$server  = new \swoole_websocket_server("0.0.0.0", 9502);
        self::$server->set([
            'worker_num' => 1,
            'heartbeat_check_interval' => 60,    // 60秒检测一次
            'heartbeat_idle_time' => 121,        // 121秒没活动的
        ]);
    }

    // 打开swoole websocket服务回调代码
    public function onOpen($server, $request)
    {
        if ($this->checkAccess($server, $request)) {
            self::$server->push($request->fd,xss_json(["code"=>200,"message"=>"打开swoole服务成功"]));
        }
    }
    // 给swoole websocket 发送消息回调代码
    public function onMessage($server, $frame)
    {

    }
    // http请求swoole websocket 回调代码
    public function onRequest($request,$response)
    {
        if ($this->checkAccess("", $request)) {
            $param = $request->get;
            // 分发处理请求逻辑
            if (isset($param['func'])) {
                if (method_exists($this,$param['func'])) {
                    call_user_func([$this,$param['func']],$request);
                }
            }
        }
    }

    // websocket 关闭回调代码
    public function onClose($serv,$fd)
    {
        $this->redis->hdel('swoole:fds', $fd);
        $this->line("客户端 {$fd} 关闭");
    }


    // 校验客户端连接的合法性,无效的连接不允许连接
    public function checkAccess($server, $request):bool
    {
        $bRes = true;
        if (!isset($request->get) || !isset($request->get['token'])) {
            self::$server->close($request->fd);
            $this->line("接口验证字段不全");
            $bRes = false;
        } else if ($request->get['token'] != 123456) {
            $this->line("接口验证错误");
            $bRes = false;
        }
        $this->storeUrlParamToRedis($request);
        return $bRes;
    }

    // 将每个界面打开websocket的url 存储起来
    public function storeUrlParamToRedis($request):void
    {
        // 存储请求url带的信息
        $sContent = json_encode(
            [
                'page' => $request->get['page'],
                'fd' => $request->fd,
            ], true);
        $this->redis->hset("swoole:fds", $request->fd, $sContent);
    }

    /**
     * @param $request
     * @see 循环逻辑处理
     */
    public function eachFdLogic(Closure $callback = null)
    {
        foreach (self::$server->connections as $fd) {
            if (self::$server->isEstablished($fd)) {
                $aContent = json_decode($this->redis->hget("swoole:fds",$fd),true);
                $callback($aContent,$fd,$this);
            } else {
                $this->redis->hdel("swoole:fds",$fd);
            }
        }
    }
    // 往首页推送逻辑处理
    public function pushHomeLogic($request)
    {
        $callback = function (array $aContent,int $fd,SwooleDemo $oSwoole)use($request) {
            if ($aContent && $aContent['page'] == "home") {
                $aRes['message'] = "后端按了按钮1";
                $aRes['code'] = "200";
                $oSwoole::$server->push($fd,xss_json($aRes));
            }
        };
        $this->eachFdLogic($callback);
    }
    // 往列表页推送逻辑处理
    public function pushListLogic($request)
    {
        $callback = function (array $aContent,int $fd,SwooleDemo $oSwoole)use($request) {
            if ($aContent && $aContent['page'] == "list") {
                $aRes['message'] = "后端按了按钮2";
                $aRes['code'] = "200";
                $oSwoole::$server->push($fd,xss_json($aRes));
            }
        };
        $this->eachFdLogic($callback);
    }

    // 启动websocket服务
    public function start()
    {
        self::$server->start();
    }
}

控制器代码

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Illuminate\Support\Facades\Redis;
class TestController extends Controller
{
    // 首页
    public function home()
    {
        return view("home");
    }
    // 列表
    public function list()
    {
        return view("list");
    }
    // 后端控制
    public function back()
    {
        if (request()->method() == 'POST') {
           $this->curl_get($this->getUrl());
           return json_encode(['code'=>200,"message"=>"成功"]);
        } else {
            return view("back");
        }

    }
    // 获取要请求swoole websocet服务地址
    public function getUrl():string
    {
        // 域名 端口 请求swoole服务的方法
        $sBase = request()->server('HTTP_HOST');
        $iPort = 9502;
        $sFunc = request()->post('func');
        $sPage = "back";
        return $sBase.":".$iPort."?func=".$sFunc."&token=123456&page=".$sPage;
    }
    // curl 推送
    public function curl_get(string $url):string
    {
        $ch_curl = curl_init();
        curl_setopt ($ch_curl, CURLOPT_TIMEOUT_MS, 3000);
        curl_setopt($ch_curl, CURLOPT_SSL_VERIFYPEER, 0);
        curl_setopt ($ch_curl, CURLOPT_HEADER,false);
        curl_setopt($ch_curl, CURLOPT_HTTPGET, 1);
        curl_setopt($ch_curl, CURLOPT_RETURNTRANSFER,true);
        curl_setopt ($ch_curl, CURLOPT_URL,$url);
        $str  = curl_exec($ch_curl);
        curl_close($ch_curl);
        return $str;
    }
}

页面js代码

  • 后端控制页

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>后端界面</title>
    <meta name=viewport content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no">
</head>
<body>
<button class="push" data-func="pushHomeLogic">按钮1</button>
<button class="push" data-func="pushListLogic">按钮2</button>
</body>
<script src="{{ asset("/vendor/tw/global/jQuery/jquery-2.2.3.min.js")}} "></script>
<script>
    $(function () {
        $(".push").on('click',function(){
            var func = $(this).attr('data-func').trim();
            ajaxGet(func)
        })
        function ajaxGet(func) {
            url = "{{route('back')}}";
            token = "{{csrf_token()}}";
            $.ajax({
                url: url,
                type: 'post',
                dataType: "json",
                data:{func:func,_token:token},
                error: function (data) {
                    alert("服务器繁忙, 请联系管理员!");
                    return;
                },
                success: function (result) {

                },
            })
        }

    })
</script>
</html>
  • 首页

<!DOCTYPE html>

<html lang="en">

<head>

<meta charset="UTF-8">
<title>swoole首页</title>
<meta name=viewport content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no">

</head>

<body>

<h1>这是首页</h1>

</body>

<script>

var ws;//websocket实例
var lockReconnect = false;//避免重复连接
var wsUrl = 'ws://{{$_SERVER["HTTP_HOST"]}}:9502?page=home&token=123456';

function initEventHandle() {
    ws.onclose = function () {
        reconnect(wsUrl);
    };
    ws.onerror = function () {
        reconnect(wsUrl);
    };
    ws.onopen = function () {
        //心跳检测重置
        heartCheck.reset().start();
    };
    ws.onmessage = function (event) {
        //如果获取到消息,心跳检测重置
        //拿到任何消息都说明当前连接是正常的
        var data = JSON.parse(event.data);
        if (data.code == 200) {
            console.log(data.message)
        }
        heartCheck.reset().start();
    }
}
createWebSocket(wsUrl);
/**
 * 创建链接
 * @param url
 */
function createWebSocket(url) {
    try {
        ws = new WebSocket(url);
        initEventHandle();
    } catch (e) {
        reconnect(url);
    }
}
function reconnect(url) {
    if(lockReconnect) return;
    lockReconnect = true;
    //没连接上会一直重连,设置延迟避免请求过多
    setTimeout(function () {
        createWebSocket(url);
        lockReconnect = false;
    }, 2000);
}
//心跳检测
var heartCheck = {
    timeout: 60000,//60秒
    timeoutObj: null,
    serverTimeoutObj: null,
    reset: function(){
        clearTimeout(this.timeoutObj);
        clearTimeout(this.serverTimeoutObj);
        return this;
    },
    start: function(){
        var self = this;
        this.timeoutObj = setTimeout(function(){
            //这里发送一个心跳,后端收到后,返回一个心跳消息,
            //onmessage拿到返回的心跳就说明连接正常
            ws.send("heartbeat");
            self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了
                ws.close();//如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
            }, self.timeout);
        }, this.timeout);
    },
    header:function(url) {
        window.location.href=url
    }

}

</script>

</html>

* ####    列表页面

<!DOCTYPE html>

<html lang="en">

<head>

<meta charset="UTF-8">
<title>swoole列表页</title>
<meta name=viewport content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no">

</head>

<body>

<h1>swoole列表页</h1>

</body>

<script>

var ws;//websocket实例
var lockReconnect = false;//避免重复连接
var wsUrl = 'ws://{{$_SERVER["HTTP_HOST"]}}:9502?page=list&token=123456';

function initEventHandle() {
    ws.onclose = function () {
        reconnect(wsUrl);
    };
    ws.onerror = function () {
        reconnect(wsUrl);
    };
    ws.onopen = function () {
        //心跳检测重置
        heartCheck.reset().start();
    };
    ws.onmessage = function (event) {
        //如果获取到消息,心跳检测重置
        //拿到任何消息都说明当前连接是正常的
        var data = JSON.parse(event.data);
        if (data.code == 200) {
            console.log(data.message)
        }
        heartCheck.reset().start();
    }
}
createWebSocket(wsUrl);
/**
 * 创建链接
 * @param url
 */
function createWebSocket(url) {
    try {
        ws = new WebSocket(url);
        initEventHandle();
    } catch (e) {
        reconnect(url);
    }
}
function reconnect(url) {
    if(lockReconnect) return;
    lockReconnect = true;
    //没连接上会一直重连,设置延迟避免请求过多
    setTimeout(function () {
        createWebSocket(url);
        lockReconnect = false;
    }, 2000);
}
//心跳检测
var heartCheck = {
    timeout: 60000,//60秒
    timeoutObj: null,
    serverTimeoutObj: null,
    reset: function(){
        clearTimeout(this.timeoutObj);
        clearTimeout(this.serverTimeoutObj);
        return this;
    },
    start: function(){
        var self = this;
        this.timeoutObj = setTimeout(function(){
            //这里发送一个心跳,后端收到后,返回一个心跳消息,
            //onmessage拿到返回的心跳就说明连接正常
            ws.send("heartbeat");
            self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了
                ws.close();//如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
            }, self.timeout);
        }, this.timeout);
    },
    header:function(url) {
        window.location.href=url
    }

}

</script>

</html>

### 界面效果
#### 后台控制点击按钮1
![Laravel 中使用 swoole 项目实战开发案例一 (建立 swoole 和前端通信)](https://cdn.learnku.com/uploads/images/201907/01/43464/E3wTNweNZ7.png!large)
![Laravel 中使用 swoole 项目实战开发案例一 (建立 swoole 和前端通信)](https://cdn.learnku.com/uploads/images/201907/01/43464/WxCJP5EpfF.png!large)
#### 后端界面点击按钮2
![Laravel 中使用 swoole 项目实战开发案例一 (建立 swoole 和前端通信)](https://cdn.learnku.com/uploads/images/201907/01/43464/rmKYU8Z3Wj.png!large)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Book of CSS3

The Book of CSS3

Peter Gasston / No Starch Press / 2011-5-13 / USD 34.95

CSS3 is the technology behind most of the eye-catching visuals on the Web today, but the official documentation can be dry and hard to follow. Luckily, The Book of CSS3 distills the heady technical la......一起来看看 《The Book of CSS3》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码