Spring boot+redis 整合websocket实现消息实时推送案例

  • 作者: 凯哥Java(公众号:凯哥Java)
  • 工作小总结
  • 时间:2023-08-13 17:03
  • 3053人已阅读
简介 场景描述:在公司开发的时候,有时候,我们会遇到这种需求:系统消息。当有新消息的时候,需要实时推送消息数量。如下图:这个实时消息怎么推送呢?这里凯哥就使用websocket技术实现这个方案。实战步骤:

🔔🔔🔔好消息!好消息!🔔🔔🔔

有需要的朋友👉:联系凯哥 微信号 kaigejava2022

场景描述:

在公司开发的时候,有时候,我们会遇到这种需求:系统消息。当有新消息的时候,需要实时推送消息数量。如下图:

b998b7d2f74f75bbcdfdda69c0fecea2.png

这个实时消息怎么推送呢?这里凯哥就使用websocket技术实现这个方案。

实战步骤:

一:引入依赖

spring boot的版本也是2.7.7

<!-- websocket依赖-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
  <version>2.7.7</version>
</dependency>

二:开启对websocket的支持

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author 凯哥Java
 * @description 开启对websocket的支持
 * @company
 * @since 2023/8/11 22:56
 */
@Slf4j
@Configuration
@EnableWebSocket
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpoint(){
        return new ServerEndpointExporter();
    }

}

三:自定义JSON转换对象

import cn.hutool.json.JSONUtil;

import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;


/**
 * @author 凯哥Java
 * @description 在 websocket 中直接发送 obj 会有问题 - No encoder specified for object of class 需要对 obj 创建解码类,实现 websocket 中的 Encoder.Text<>
 * @since 2023/8/11 22:56
 */
public class WebSocketCustomEncoding implements Encoder.Text<Object> {
    /**
     * The Encoder interface defines how developers can provide a way to convert their
     * custom objects into web socket messages. The Encoder interface contains
     * subinterfaces that allow encoding algorithms to encode custom objects to:
     * text, binary data, character stream and write to an output stream.
     *
     * Encoder 接口定义了如何提供一种方法将定制对象转换为 websocket 消息
     * 可自定义对象编码为文本、二进制数据、字符流、写入输出流
     *  Text、TextStream、Binary、BinaryStream
     * */

    @Override
    public void init(EndpointConfig endpointConfig) {

    }

    @Override
    public void destroy() {

    }

    @Override
    public String encode(Object o) throws EncodeException {
        return JSONUtil.toJsonStr(o);
    }
}


四:编写webSocket服务端

主要使用到的注解:

@ServerEndpoint:注释在类上面

注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端。注解的值将被用于监听用户连接的终端访问URL地址

@OnOpen:注释在方法上面

建立连接成功调用的方法

@OnClose:注释在方法上面

当连接关闭时候调用的方法

onOpen 和 onClose 方法分别被@OnOpen和@OnClose 所注解。他们定义了当一个新用户连接和断开的时候所调用的方法。

@OnMessage:注释在方法上面

方法被@OnMessage所注解。这个注解定义了当服务器接收到客户端发送的消息时所调用的方法。

@OnError:注释在方法上面

发生错误时候调用。

具体代码如下:

import cn.hutool.json.JSONObject;
import com.kaigejava.commoneresult.Result;
import com.kaigejava.config.WebSocketCustomEncoding;
import com.kaigejava.dto.PushParams;
import com.kaigejava.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

import static com.kaigejava.service.RedisConstant.TOTAL_KEY;

/**
 * @author 凯哥Java
 * @description
 * @company
 * @since 2023/8/11 22:52
 */
@Slf4j
// @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端。注解的值将被用于监听用户连接的终端访问URL地址
// encoders = WebSocketCustomEncoding.class 是为了使用ws自己的推送Object消息对象(sendObject())时进行解码,通过Encoder 自定义规则(转换为JSON字符串)
@ServerEndpoint(value = "/websocket/{userId}", encoders = WebSocketCustomEncoding.class)
@Component
public class WebSocketServer {
    private final static Logger logger = LogManager.getLogger(WebSocketServer.class);

    @Resource
    private RedisUtil redisUtil;

    /**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的
     */

    private static int onlineCount = 0;

    /**
     * concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象
     */
    public static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();

    /***
     * 功能描述:
     * concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象的参数体
     */
    public static ConcurrentHashMap<String, PushParams> webSocketParamsMap = new ConcurrentHashMap<>();

    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */

    private Session session;
    private String userId;


    /**
     * 连接建立成功调用的方法
     * onOpen 和 onClose 方法分别被@OnOpen和@OnClose 所注解。他们定义了当一个新用户连接和断开的时候所调用的方法。
     */
    @OnOpen
    public Result onOpen(Session session, @PathParam("userId") String userId) {

        this.session = session;
        this.userId = userId;
        //加入map
        webSocketMap.put(userId, this);
        addOnlineCount();           //在线数加1
        logger.info("用户{}连接成功,当前在线人数为{}", userId, getOnlineCount());
        //先从redis中获取
        String rediske = TOTAL_KEY + userId;
        // Object totalObj = redisUtil.get(rediske);
        int total = 0;
       /* if(Objects.nonNull(totalObj)){
            total = (int)totalObj;
        }*/
        JSONObject object = new JSONObject();
        object.putOnce("total", total);
        try {
            sendMessageObj(object);
        } catch (Exception e) {
            logger.error("IO异常");
        }
        return Result.ok(object);
    }


    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        //从map中删除
        webSocketMap.remove(userId);
        subOnlineCount();           //在线数减1
        logger.info("用户{}关闭连接!当前在线人数为{}", userId, getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     * onMessage 方法被@OnMessage所注解。这个注解定义了当服务器接收到客户端发送的消息时所调用的方法。
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        logger.info("来自客户端用户:{} 消息:{}", userId, message);

        //群发消息
        for (String item : webSocketMap.keySet()) {
            try {
                webSocketMap.get(item).sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 发生错误时调用
     *
     * @OnError
     */
    @OnError
    public void onError(Session session, Throwable error) {
        logger.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 向客户端发送消息
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
        //this.session.getAsyncRemote().sendText(message);
    }

    public void sendMessageObj(Object messageObj) throws IOException, EncodeException {
        this.session.getBasicRemote().sendObject(messageObj);
        //this.session.getAsyncRemote().sendText(message);
    }

    /**
     * 向客户端发送消息
     */
    public void sendMessage(Object message) throws IOException, EncodeException {
        this.session.getBasicRemote().sendObject(message);
        //this.session.getAsyncRemote().sendText(message);
    }

    /**
     * 通过userId向客户端发送消息
     */
    public void sendMessageByUserId(String userId, String message) throws IOException {
        logger.info("服务端发送消息到{},消息:{}", userId, message);

        if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
            webSocketMap.get(userId).sendMessage(message);
        } else {
            logger.error("用户{}不在线", userId);
        }

    }

    /**
     * 通过userId向客户端发送消息
     */
    public void sendMessageByUserId(String userId, Object message) throws IOException, EncodeException {
        logger.info("服务端发送消息到{},消息:{}", userId, message);
        if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
            webSocketMap.get(userId).sendMessage(message);
        } else {
            logger.error("用户{}不在线", userId);
        }
    }

    /**
     * 通过userId更新缓存的参数
     */
    public void changeParamsByUserId(String userId, PushParams pushParams) throws IOException, EncodeException {
        logger.info("ws用户{}请求参数更新,参数:{}", userId, pushParams.toString());
        webSocketParamsMap.put(userId, pushParams);
    }

    /**
     * 群发自定义消息
     */
    public static void sendInfo(String message) throws IOException {
        for (String item : webSocketMap.keySet()) {
            try {
                webSocketMap.get(item).sendMessage(message);
            } catch (IOException e) {
                continue;
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }

    /**
     * 根据用户id给用户推送消息总数
     *
     * @param userId
     */
    public void addTotalAndSendMessageTotalByUserId(String userId)  {
        logger.info("服务端发送消息到{},添加消息数量", userId);

        if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
            //先从redis中获取
            String rediske = TOTAL_KEY + userId;
            Long total = redisUtil.incr(rediske, 1);
            JSONObject object = new JSONObject();
            object.putOnce("total", total);
            try {
                webSocketMap.get(userId).sendMessageObj(object);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (EncodeException e) {
                e.printStackTrace();
            }
        } else {
            logger.error("用户{}不在线", userId);
        }
    }

    public void sendMessageTotalByUserId(String userId)  {
        logger.info("服务端发送消息到{},获取消息数量", userId);

        if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
            int total = 0;
            JSONObject object = new JSONObject();
            object.putOnce("total", total);
            try {
                webSocketMap.get(userId).sendMessageObj(object);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (EncodeException e) {
                e.printStackTrace();
            }
        } else {
            logger.error("用户{}不在线", userId);
        }
    }

}

五:自定义消息推送的参数体

package com.kaigejava.dto;

import lombok.Data;

/**
 * @author 凯哥Java
 * @description
 * @company
 * @since 2023/8/13 10:41
 */

@Data
public class PushParams {

    /**
     * 功能描述:
     * 类型
     */
    private String type;

    /**
     * 功能描述:
     * 开始时间
     */
    private String startTime;

    /**
     * 功能描述:
     * 结束时间
     */
    private String stopTime;
}

六:编写controller进行发送消息

package com.kaigejava.service;

import cn.hutool.core.date.DatePattern;
import cn.hutool.json.JSONUtil;
import com.kaigejava.commoneresult.Result;
import com.kaigejava.dto.PushParams;
import com.kaigejava.dto.SystemMessageDTO;
import com.kaigejava.util.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import javax.websocket.EncodeException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;

import static com.kaigejava.service.RedisConstant.KEY;

/**
 * @author 凯哥Java
 * @description
 * @company
 * @since 2023/8/13 10:46
 */
@RestController
@RequestMapping("/webSocketPush")
public class WebSocketController {
    @Autowired
    private WebSocketServer webSocket;


    @Resource
    private RedisUtil redisUtil;

    @RequestMapping("/sentMessage")
    public Result sentMessage(String userId, String message){
        try {

            SystemMessageDTO dto = new SystemMessageDTO();
            dto.setTitle(message);
            String time = DatePattern.NORM_DATETIME_FORMAT.format(new Date());
            dto.setTime(time);
            dto.setStatus(0);
            //这里将消息存放到redis中
            String redisKey = KEY+userId;
            Object megObj = redisUtil.get(redisKey);
            List<SystemMessageDTO>  list = new ArrayList<>();
            if(Objects.nonNull(megObj)){
                list = JSONUtil.toList((String)megObj,SystemMessageDTO.class);
            }
            list.add(dto);
            //存放到redis中
            redisUtil.set(redisKey,JSONUtil.toJsonStr(list));
            webSocket.addTotalAndSendMessageTotalByUserId(userId);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return Result.ok("SUCCESS");

    }



    /***
     * 功能描述:
     * 根据用户ID更新ws推送的参数
     * @Author: 凯哥Java
     * @Date: 2022-12-01 09:21:25
     * @Param  userId: WS中的用户ID
     * @Param pushParams: 推送参数
     * @return: void
     * @since: 1.0.0
     */
    @RequestMapping("/changeWsParams")
    public void changeWsParams(String userId, PushParams pushParams){
        try {
            webSocket.changeParamsByUserId(userId,pushParams);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (EncodeException e) {
            e.printStackTrace();
        }

    }
}


下面是redis相关的配置

一:配置的相关配置。在application.yml文件中

spring:
  redis:
    timeout: 18000
    database: 1
    port: 6379
    host: 127.0.0.1

二:redis配置文件

package com.kaigejava.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @author 凯哥Java
 * @description
 * @company
 * @since 2023/8/13 11:17
 */
@Configuration
@EnableCaching //开启注解
public class RedisConfig extends CachingConfigurerSupport {

    /**
     * retemplate相关配置
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 配置连接工厂
        template.setConnectionFactory(factory);

        //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
        Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();
        // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jacksonSeial.setObjectMapper(om);

        // 值采用json序列化
        template.setValueSerializer(jacksonSeial);
        //使用StringRedisSerializer来序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());

        // 设置hash key 和value序列化模式
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(jacksonSeial);
        template.afterPropertiesSet();

        return template;
    }

    /**
     * 对hash类型的数据操作
     */
    @Bean
    public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForHash();
    }

    /**
     * 对redis字符串类型数据操作
     */
    @Bean
    public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForValue();
    }

    /**
     * 对链表类型的数据操作
     */
    @Bean
    public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForList();
    }

    /**
     * 对无序集合类型的数据操作
     */
    @Bean
    public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForSet();
    }

    /**
     * 对有序集合类型的数据操作
     */
    @Bean
    public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForZSet();
    }

}

三:redis的工具类

package com.kaigejava.util;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
 * @author 凯哥Java
 * @description
 * @company
 * @since 2023/8/13 11:18
 */
@Component
public class RedisUtil {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public RedisUtil(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 指定缓存失效时间
     * @param key 键
     * @param time 时间(秒)
     * @return
     */
    public boolean expire(String key,long time){
        try {
            if(time>0){
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire(String key){
        return redisTemplate.getExpire(key,TimeUnit.SECONDS);
    }

    /**
     * 判断key是否存在
     * @param key 键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key){
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除缓存
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String ... key){
        if(key!=null&&key.length>0){
            if(key.length==1){
                redisTemplate.delete(key[0]);
            }else{
                redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
            }
        }
    }

    //============================String=============================
    /**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */
    public Object get(String key){
        return key==null?null:redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     * @param key 键
     * @param value 值
     * @return true成功 false失败
     */
    public boolean set(String key,Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 普通缓存放入并设置时间
     * @param key 键
     * @param value 值
     * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */
    public boolean set(String key,Object value,long time){
        try {
            if(time>0){
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            }else{
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 递增
     * @param key 键
     * @param delta 要增加几(大于0)
     * @return
     */
    public long incr(String key, long delta){
        if(delta<0){
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }

    /**
     * 递减
     * @param key 键
     * @param delta 要减少几(小于0)
     * @return
     */
    public long decr(String key, long delta){
        if(delta<0){
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }

    //================================Map=================================
    /**
     * HashGet
     * @param key 键 不能为null
     * @param item 项 不能为null
     * @return 值
     */
    public Object hget(String key,String item){
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object,Object> hmget(String key){
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     * @param key 键
     * @param map 对应多个键值
     * @return true 成功 false 失败
     */
    public boolean hmset(String key, Map<String,Object> map){
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * HashSet 并设置时间
     * @param key 键
     * @param map 对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hmset(String key, Map<String,Object> map, long time){
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if(time>0){
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     * @param key 键
     * @param item 项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key,String item,Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     * @param key 键
     * @param item 项
     * @param value 值
     * @param time 时间(秒)  注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key,String item,Object value,long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if(time>0){
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除hash表中的值
     * @param key 键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item){
        redisTemplate.opsForHash().delete(key,item);
    }

    /**
     * 判断hash表中是否有该项的值
     * @param key 键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item){
        return redisTemplate.opsForHash().hasKey(key, item);
    }

    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     * @param key 键
     * @param item 项
     * @param by 要增加几(大于0)
     * @return
     */
    public double hincr(String key, String item,double by){
        return redisTemplate.opsForHash().increment(key, item, by);
    }

    /**
     * hash递减
     * @param key 键
     * @param item 项
     * @param by 要减少记(小于0)
     * @return
     */
    public double hdecr(String key, String item,double by){
        return redisTemplate.opsForHash().increment(key, item,-by);
    }

    //============================set=============================
    /**
     * 根据key获取Set中的所有值
     * @param key 键
     * @return
     */
    public Set<Object> sGet(String key){
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 根据value从一个set中查询,是否存在
     * @param key 键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key,Object value){
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将数据放入set缓存
     * @param key 键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object...values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 将set数据放入缓存
     * @param key 键
     * @param time 时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key,long time,Object...values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if(time>0) {
                expire(key, time);
            }
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 获取set缓存的长度
     * @param key 键
     * @return
     */
    public long sGetSetSize(String key){
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 移除值为value的
     * @param key 键
     * @param values 值 可以是多个
     * @return 移除的个数
     */
    public long setRemove(String key, Object ...values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
    //===============================list=================================

    /**
     * 获取list缓存的内容
     * @param key 键
     * @param start 开始
     * @param end 结束  0 到 -1代表所有值
     * @return
     */
    public List<Object> lGet(String key, long start, long end){
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取list缓存的长度
     * @param key 键
     * @return
     */
    public long lGetListSize(String key){
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 通过索引 获取list中的值
     * @param key 键
     * @param index 索引  index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     * @return
     */
    public Object lGetIndex(String key,long index){
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 将list放入缓存
     * @param key 键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     * @param key 键
     * @param value 值
     * @param time 时间(秒)
     * @return
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     * @param key 键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     * @param key 键
     * @param value 值
     * @param time 时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据索引修改list中的某条数据
     * @param key 键
     * @param index 索引
     * @param value 值
     * @return
     */
    public boolean lUpdateIndex(String key, long index,Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 移除N个值为value
     * @param key 键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */
    public long lRemove(String key,long count,Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
}


以下是查看消息列表及设置为已读的接口

一:消息实体对象:

package com.kaigejava.dto;

import lombok.Data;

/**
 * @author 凯哥Java
 * @description
 * @company
 * @since 2023/8/13 11:05
 */
@Data
public class SystemMessageDTO {

    private String  title;


    private String time;


    private int status;
}

二:获取未读列表及获取消息列表的接口

package com.kaigejava.systemmessage;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.json.JSONUtil;
import com.kaigejava.commoneresult.Result;
import com.kaigejava.service.WebSocketServer;
import com.kaigejava.util.RedisUtil;
import com.kaigejava.dto.SystemMessageDTO;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static com.kaigejava.service.RedisConstant.KEY;
import static com.kaigejava.service.RedisConstant.TOTAL_KEY;

/**
 * @author 凯哥Java
 * @description
 * @company
 * @since 2023/8/13 9:55
 */
@RestController
@RequestMapping("msg")
 @CrossOrigin(origins = "*",maxAge = 3600)
public class SystemMessageController {


    @Resource
    private RedisUtil redisUtil;

    @Resource
    private WebSocketServer webSocketServer;

    @GetMapping("/test")
    public String test(String msg) {
        redisUtil.set("kaige-pp", "价格:" + msg);
        return "OK";
    }


     /**
     * 根据用户id获取当前用户所有未读的消息,同时设置为已读
     **/
     
    @GetMapping("/unread_list/{userId}")
    public Result unreadList(@PathVariable("userId") String userId) {
        //这里将消息存放到redis中
        String redisKey = KEY + userId;
        Object megObj = redisUtil.get(redisKey);
        List<SystemMessageDTO> list = new ArrayList<SystemMessageDTO>();
        if (Objects.nonNull(megObj)) {
            list = JSONUtil.toList((String) megObj, SystemMessageDTO.class);
        }
        List<SystemMessageDTO> unreadList = list.stream().filter(dto -> dto.getStatus() == 0).collect(Collectors.toList());
        List<SystemMessageDTO> dataList = new ArrayList<SystemMessageDTO>();
        if (!CollectionUtils.isEmpty(unreadList)) {
            for (int i = 0; i < unreadList.size(); i++) {
                SystemMessageDTO newDto = new SystemMessageDTO();
                BeanUtil.copyProperties(list.get(i), newDto);
                dataList.add(newDto);
            }
            //将这些设置为已读。
            list.stream().forEach(dto -> dto.setStatus(1));
            //重新设置回去
            redisUtil.set(redisKey, JSONUtil.toJsonStr(list));
            //设置未读数量为0个
            String rediske = TOTAL_KEY + userId;
            redisUtil.set(rediske, 0);
            //推送
            webSocketServer.sendMessageTotalByUserId(userId);
        }

        return Result.ok(dataList);
    }
    
    /**
     * 根据用户id获取当前用户所有的消息,同时设置为已读
     **/

    @GetMapping("/list/{userId}")
    public Result list(@PathVariable("userId") String userId) {
        //redisUtil.set("kaige-pp", "价格:" + msg);
        //这里将消息存放到redis中
        String redisKey = KEY + userId;
        Object megObj = redisUtil.get(redisKey);
        List<SystemMessageDTO> list = new ArrayList<>();
        if (Objects.nonNull(megObj)) {
            list = JSONUtil.toList((String) megObj, SystemMessageDTO.class);
        }

        webSocketServer.sendMessageTotalByUserId(userId);
        List<SystemMessageDTO> unreadList = list.stream().filter(dto -> dto.getStatus() == 0).collect(Collectors.toList());
        List<SystemMessageDTO> dataList = new ArrayList<SystemMessageDTO>();
        if (!CollectionUtils.isEmpty(unreadList)) {
            for (int i = 0; i < unreadList.size(); i++) {
                SystemMessageDTO newDto = new SystemMessageDTO();
                BeanUtil.copyProperties(list.get(i), newDto);
                dataList.add(newDto);
            }
            //将这些设置为已读。
            list.stream().forEach(dto -> dto.setStatus(1));
            //重新设置回去
            redisUtil.set(redisKey, JSONUtil.toJsonStr(list));
            //设置未读数量为0个
            String rediske = TOTAL_KEY + userId;
            redisUtil.set(rediske, 0);

        }
        return Result.ok(list);
    }


}


最后进行测试

一:启动项目后,

http://www.jsons.cn/websocket/ 

输入地址:ws://localhost:8082/websocket/99

555cdf2a2e8ffdc6541ca90a9cf891b3.png

注意:这个连接是在websocketService对象中

二:发送消息测试下:

http://localhost:8082/webSocketPush/sentMessage?userId=99&message=kaigejava

注意:这个连接是在WebSocketController中。

查看客户端是否收到了消息推送的未读消息数量是1.如下图:

cff752c41044a9d93af9bd669420272c.png

这样,说明在服务端接收到消息后,根据用户id,将消息推送到客户端。已经OK的。

三:我们在来看看根据用户id获取消息列表及未读消息列表

消息列表:http://localhost:8082/msg/list/99

8fe6c9010bb7ce5a4bbdf282fdb4d54d.png

未读消息列表:http://localhost:8082/msg/unread_list/99

OK 至此。spring boot +Redis实现消息推送所有代码及教程已经写完。欢迎大家关注凯哥Java(kaigejava)获取源码

TopTop