Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo

栏目: Html5 · 发布时间: 5年前

内容简介:上一篇文章第三个方案如下图在方案一的基础进行如下修改,新的架构图流程如下:

上一篇文章 Spring Boot系列21 Spring Websocket实现websocket集群方案讨论 里详细介绍了WebSocket集群的三种方案,并得出结论第三个方案是最好的,本文我们实现第三个方案。

第三个方案如下图

Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo

在方案一的基础进行如下修改,新的架构图流程如下:

  1. 服务A增加WS模块,当websocket连接过来时,将此用户的连接信息(主要是websocket sesionId值)存储 redis
  2. 消息生产者发送消息到的交换机,这些服务不直接推送服务A/B
  3. 增加新的模块dispatch,此模块接收推送过来的信息,并从redis中读取消息接收用户对应的websocket sesionId值,然后根据上面的规则计算出用户对应的路由键,然后将消息发送到用户订阅的队列上
  4. 前端接收消息

详细实现的代码

工程名称:mvc 本文在 Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础进行修改。

在pom.xml中引入redis,rabbitmq相关的jar

<!--  webscoekt 集群 需要 引入支持RabbitMQ, redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
复制代码

rabbitmq, redis的配置

application-wscluster.properties

# websocket集群需要配置RabbitMQ
spring.rabbitmq.host:192.168.21.3
spring.rabbitmq.virtual-host: /icc-local
spring.rabbitmq.username: icc-dev
spring.rabbitmq.password: icc-dev

# 配置redis
spring.redis.database=0
spring.redis.host=192.168.21.4
# spring.redis.password=
spring.redis.port=7001
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0  
spring.redis.pool.max-active=8  
spring.redis.pool.max-wait=-1
复制代码

IRedisSessionService及实现

接口IRedisSessionService定义了对redis的操作 IRedisSessionService实现类将用户名称和websocket sessionId的关系存储到redis,提供添加、删除、查询 IRedisSessionService

public interface IRedisSessionService {
    void add(String name, String wsSessionId);
    boolean del(String name);
    String get(String name);
}
复制代码

SimulationRedisSessionServiceImpl将用户名称和websocket sessionId的关系存储到redis,提供添加、删除、查询

@Component
public class SimulationRedisSessionServiceImpl implements IRedisSessionService {

    @Autowired
    private RedisTemplate<String, String> template;

    // key = 登录用户名称, value=websocket的sessionId
    private ConcurrentHashMap<String,String> redisHashMap = new ConcurrentHashMap<>(32);

    /**
     * 在缓存中保存用户和websocket sessionid的信息
     * @param name
     * @param wsSessionId
     */
    public void add(String name, String wsSessionId){
        BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
        boundValueOperations.set(wsSessionId,24 * 3600, TimeUnit.SECONDS);
    }

    /**
     * 从缓存中删除用户的信息
     * @param name
     */
    public boolean del(String name){
        return template.execute(new RedisCallback<Boolean>() {

            @Override
            public Boolean doInRedis(RedisConnection connection)
                    throws DataAccessException {
                byte[] rawKey = template.getStringSerializer().serialize(name);
                return connection.del(rawKey) > 0;
            }
        }, true);
    }

    /**
     * 根据用户id获取用户对应的sessionId值
     * @param name
     * @return
     */
    public String get(String name){
        BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
        return boundValueOperations.get();
    }
}
复制代码

AuthWebSocketHandlerDecoratorFactory

装饰WebSocketHandlerDecorator对象,在连接建立时,保存websocket的session id,其中key为帐号名称;在连接断开时,从缓存中删除用户的sesionId值。此websocket sessionId值用于创建消息的路由键。

@Component
public class AuthWebSocketHandlerDecoratorFactory implements WebSocketHandlerDecoratorFactory {
    private static final Logger log = LoggerFactory.getLogger(AuthWebSocketHandlerDecoratorFactory.class);

    @Autowired
    private IRedisSessionService redisSessionService;

    @Override
    public WebSocketHandler decorate(WebSocketHandler handler) {
        return new WebSocketHandlerDecorator(handler) {
            @Override
            public void afterConnectionEstablished(final WebSocketSession session) throws Exception {
                // 客户端与服务器端建立连接后,此处记录谁上线了
                Principal principal = session.getPrincipal();
                if(principal != null){
                    String username = principal.getName();
                    log.info("websocket online: " + username + " session " + session.getId());
                    redisSessionService.add(username, session.getId());
                }
                super.afterConnectionEstablished(session);
            }

            @Override
            public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
                // 客户端与服务器端断开连接后,此处记录谁下线了
                Principal principal = session.getPrincipal();
                if(principal != null){
                    String username = session.getPrincipal().getName();
                    log.info("websocket offline: " + username);
                    redisSessionService.del(username);
                }
                super.afterConnectionClosed(session, closeStatus);
            }
        };
    }
}
复制代码

WebSocketRabbitMQMessageBrokerConfigurer

Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础上增加如下功能,将myWebSocketHandlerDecoratorFactory配置到websocket

@Configuration
// 此注解开使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping
@EnableWebSocketMessageBroker
public class WebSocketRabbitMQMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer {

    @Autowired
    private MyPrincipalHandshakeHandler myDefaultHandshakeHandler;
    @Autowired
    private AuthHandshakeInterceptor sessionAuthHandshakeInterceptor;

    @Autowired
    private AuthWebSocketHandlerDecoratorFactory myWebSocketHandlerDecoratorFactory;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        ….
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
          …       
    }

    /**
     * 这时实际spring weboscket集群的新增的配置,用于获取建立websocket时获取对应的sessionid值
     * @param registration
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.addDecoratorFactory(myWebSocketHandlerDecoratorFactory);
        super.configureWebSocketTransport(registration);
    }
}

复制代码

TestMQCtl:

在上文 Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础上,对此类进行修改

  • sendMq2User()方法根据用户的帐号和websocket sessionId根据["web订阅队列名称+'-user'+websocket sessionId"]组合路由键。然后通过AmqpTemplate 实例向amq.topic交换机发送消息,路由键为["web订阅队列名称+'-user'+websocket sessionId"]。方法中websocket sessionId是从根据帐号名称从redis中获取 其它的方法,这里不一一列出
@Controller
@RequestMapping(value = "/ws")
public class TestMQCtl {
    private  static final Logger log = LoggerFactory.getLogger(TestMQCtl.class);

    @Autowired
    private AmqpTemplate amqpTemplate;
    @Autowired
    private IRedisSessionService redisSessionService;

     /**
     * 向执行用户发送请求
     * @param msg
     * @param name
     * @return
     */
    @RequestMapping(value = "send2user")
    @ResponseBody
    public int sendMq2User(String msg, String name){
        // 根据用户名称获取用户对应的session id值
        String wsSessionId = redisSessionService.get(name);
        RequestMessage demoMQ = new RequestMessage();
        demoMQ.setName(msg);

        // 生成路由键值,生成规则如下: websocket订阅的目的地 + "-user" + websocket的sessionId值。生成值类似:
        String routingKey = getTopicRoutingKey("demo", wsSessionId);
        // 向amq.topi交换机发送消息,路由键为routingKey
        log.info("向用户[{}]sessionId=[{}],发送消息[{}],路由键[{}]", name, wsSessionId, wsSessionId, routingKey);
        amqpTemplate.convertAndSend("amq.topic", routingKey,  JSON.toJSONString(demoMQ));
        return 0;
    }

    /**
     * 获取Topic的生成的路由键
     *
     * @param actualDestination
     * @param sessionId
     * @return
     */
    private String getTopicRoutingKey(String actualDestination, String sessionId){
        return actualDestination + "-user" + sessionId;
    }
   ….
}
复制代码

测试

以不同端口启动两个服务启动服务类:WebSocketClusterApplication 以“--spring.profiles.active=wscluster --server.port=8081”参数启动服务A 以“--spring.profiles.active=wscluster --server.port=8082”参数启动服务B

登录模拟帐号:xiaoming登录服务A,xiaoming2登录服务B使用xiaoming登录服务A,并登录websocket http://127.0.0.1:8081/ws/login 使用xiaoming登录,并提交

Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo

点击连接,如果连接变灰色,则登录websocket成功

Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo

打开另一个浏览器,使用xiaoming2登录服务B,并登录websocket http://127.0.0.1:8082/ws/login 使用xiaoming2登录并提交,最后登录websocket

登录服务A模拟发送页面登录http://127.0.0.1:8081/ws/send,发送消息

  1. 向帐号xiaoming发送消息xiaoming-receive,只能被连接服务A的服务websocket收到 §
  2. 向帐号xiaoming2发送消息xiaoming2-receive,只能被连接服务B的服务websocket收到

此时两个页面收到信息:

Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo

xiaoming帐号只收到xiaoming-receive xiaoming2帐号只收到xiaoming2-receive

登录服务B模拟发送页面登录http://127.0.0.1:8082/ws/send,发送消息,和http://127.0.0.1:8081/ws/send 一样发送相同消息,结果是一样

结论无论用户登录的服务A,还是服务B,我们通过以上的代码,我们都可以发送消息到指定的用户,所以我们已经实现websocket集群


以上所述就是小编给大家介绍的《Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

超级运营术

超级运营术

韩叙 / 中信出版社 / 2017-5

新产品上线,为什么仅仅500次转发能带来300个内测用户? 为什么每一次内容推送,都带来App的一次卸载高峰? 同类活动那么多,怎样做才能超越竞品,占据头条? 为什么有的文案像“小广告”,有的文案像贴心老友? 创业公司与大平台的玩法有何不同? …… 如何从“了解运营”到“精通运营”,可能是运营人*的困惑。《超级运营术》正是对这个问题的全面解答。韩叙总结10年运营......一起来看看 《超级运营术》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

URL 编码/解码
URL 编码/解码

URL 编码/解码

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具