Spring boot+redis 整合websocket实现消息实时推送案例
- 工作小总结
- 时间:2023-08-13 17:03
- 3053人已阅读
🔔🔔🔔好消息!好消息!🔔🔔🔔
有需要的朋友👉:联系凯哥
场景描述:
在公司开发的时候,有时候,我们会遇到这种需求:系统消息。当有新消息的时候,需要实时推送消息数量。如下图:
这个实时消息怎么推送呢?这里凯哥就使用websocket技术实现这个方案。
实战步骤:
一:引入依赖
spring boot的版本也是2.7.7
<!-- websocket依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>2.7.7</version> </dependency>
二:开启对websocket的支持
import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @author 凯哥Java * @description 开启对websocket的支持 * @company * @since 2023/8/11 22:56 */ @Slf4j @Configuration @EnableWebSocket public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpoint(){ return new ServerEndpointExporter(); } }
三:自定义JSON转换对象
import cn.hutool.json.JSONUtil; import javax.websocket.EncodeException; import javax.websocket.Encoder; import javax.websocket.EndpointConfig; /** * @author 凯哥Java * @description 在 websocket 中直接发送 obj 会有问题 - No encoder specified for object of class 需要对 obj 创建解码类,实现 websocket 中的 Encoder.Text<> * @since 2023/8/11 22:56 */ public class WebSocketCustomEncoding implements Encoder.Text<Object> { /** * The Encoder interface defines how developers can provide a way to convert their * custom objects into web socket messages. The Encoder interface contains * subinterfaces that allow encoding algorithms to encode custom objects to: * text, binary data, character stream and write to an output stream. * * Encoder 接口定义了如何提供一种方法将定制对象转换为 websocket 消息 * 可自定义对象编码为文本、二进制数据、字符流、写入输出流 * Text、TextStream、Binary、BinaryStream * */ @Override public void init(EndpointConfig endpointConfig) { } @Override public void destroy() { } @Override public String encode(Object o) throws EncodeException { return JSONUtil.toJsonStr(o); } }
四:编写webSocket服务端
主要使用到的注解:
@ServerEndpoint:注释在类上面
注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端。注解的值将被用于监听用户连接的终端访问URL地址
@OnOpen:注释在方法上面
建立连接成功调用的方法
@OnClose:注释在方法上面
当连接关闭时候调用的方法
onOpen 和 onClose 方法分别被@OnOpen和@OnClose 所注解。他们定义了当一个新用户连接和断开的时候所调用的方法。
@OnMessage:注释在方法上面
方法被@OnMessage所注解。这个注解定义了当服务器接收到客户端发送的消息时所调用的方法。
@OnError:注释在方法上面
发生错误时候调用。
具体代码如下:
import cn.hutool.json.JSONObject; import com.kaigejava.commoneresult.Result; import com.kaigejava.config.WebSocketCustomEncoding; import com.kaigejava.dto.PushParams; import com.kaigejava.util.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import static com.kaigejava.service.RedisConstant.TOTAL_KEY; /** * @author 凯哥Java * @description * @company * @since 2023/8/11 22:52 */ @Slf4j // @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端。注解的值将被用于监听用户连接的终端访问URL地址 // encoders = WebSocketCustomEncoding.class 是为了使用ws自己的推送Object消息对象(sendObject())时进行解码,通过Encoder 自定义规则(转换为JSON字符串) @ServerEndpoint(value = "/websocket/{userId}", encoders = WebSocketCustomEncoding.class) @Component public class WebSocketServer { private final static Logger logger = LogManager.getLogger(WebSocketServer.class); @Resource private RedisUtil redisUtil; /** * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的 */ private static int onlineCount = 0; /** * concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象 */ public static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>(); /*** * 功能描述: * concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象的参数体 */ public static ConcurrentHashMap<String, PushParams> webSocketParamsMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; private String userId; /** * 连接建立成功调用的方法 * onOpen 和 onClose 方法分别被@OnOpen和@OnClose 所注解。他们定义了当一个新用户连接和断开的时候所调用的方法。 */ @OnOpen public Result onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId = userId; //加入map webSocketMap.put(userId, this); addOnlineCount(); //在线数加1 logger.info("用户{}连接成功,当前在线人数为{}", userId, getOnlineCount()); //先从redis中获取 String rediske = TOTAL_KEY + userId; // Object totalObj = redisUtil.get(rediske); int total = 0; /* if(Objects.nonNull(totalObj)){ total = (int)totalObj; }*/ JSONObject object = new JSONObject(); object.putOnce("total", total); try { sendMessageObj(object); } catch (Exception e) { logger.error("IO异常"); } return Result.ok(object); } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { //从map中删除 webSocketMap.remove(userId); subOnlineCount(); //在线数减1 logger.info("用户{}关闭连接!当前在线人数为{}", userId, getOnlineCount()); } /** * 收到客户端消息后调用的方法 * onMessage 方法被@OnMessage所注解。这个注解定义了当服务器接收到客户端发送的消息时所调用的方法。 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { logger.info("来自客户端用户:{} 消息:{}", userId, message); //群发消息 for (String item : webSocketMap.keySet()) { try { webSocketMap.get(item).sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } } /** * 发生错误时调用 * * @OnError */ @OnError public void onError(Session session, Throwable error) { logger.error("用户错误:" + this.userId + ",原因:" + error.getMessage()); error.printStackTrace(); } /** * 向客户端发送消息 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); //this.session.getAsyncRemote().sendText(message); } public void sendMessageObj(Object messageObj) throws IOException, EncodeException { this.session.getBasicRemote().sendObject(messageObj); //this.session.getAsyncRemote().sendText(message); } /** * 向客户端发送消息 */ public void sendMessage(Object message) throws IOException, EncodeException { this.session.getBasicRemote().sendObject(message); //this.session.getAsyncRemote().sendText(message); } /** * 通过userId向客户端发送消息 */ public void sendMessageByUserId(String userId, String message) throws IOException { logger.info("服务端发送消息到{},消息:{}", userId, message); if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) { webSocketMap.get(userId).sendMessage(message); } else { logger.error("用户{}不在线", userId); } } /** * 通过userId向客户端发送消息 */ public void sendMessageByUserId(String userId, Object message) throws IOException, EncodeException { logger.info("服务端发送消息到{},消息:{}", userId, message); if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) { webSocketMap.get(userId).sendMessage(message); } else { logger.error("用户{}不在线", userId); } } /** * 通过userId更新缓存的参数 */ public void changeParamsByUserId(String userId, PushParams pushParams) throws IOException, EncodeException { logger.info("ws用户{}请求参数更新,参数:{}", userId, pushParams.toString()); webSocketParamsMap.put(userId, pushParams); } /** * 群发自定义消息 */ public static void sendInfo(String message) throws IOException { for (String item : webSocketMap.keySet()) { try { webSocketMap.get(item).sendMessage(message); } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } /** * 根据用户id给用户推送消息总数 * * @param userId */ public void addTotalAndSendMessageTotalByUserId(String userId) { logger.info("服务端发送消息到{},添加消息数量", userId); if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) { //先从redis中获取 String rediske = TOTAL_KEY + userId; Long total = redisUtil.incr(rediske, 1); JSONObject object = new JSONObject(); object.putOnce("total", total); try { webSocketMap.get(userId).sendMessageObj(object); } catch (IOException e) { e.printStackTrace(); } catch (EncodeException e) { e.printStackTrace(); } } else { logger.error("用户{}不在线", userId); } } public void sendMessageTotalByUserId(String userId) { logger.info("服务端发送消息到{},获取消息数量", userId); if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) { int total = 0; JSONObject object = new JSONObject(); object.putOnce("total", total); try { webSocketMap.get(userId).sendMessageObj(object); } catch (IOException e) { e.printStackTrace(); } catch (EncodeException e) { e.printStackTrace(); } } else { logger.error("用户{}不在线", userId); } } }
五:自定义消息推送的参数体
package com.kaigejava.dto; import lombok.Data; /** * @author 凯哥Java * @description * @company * @since 2023/8/13 10:41 */ @Data public class PushParams { /** * 功能描述: * 类型 */ private String type; /** * 功能描述: * 开始时间 */ private String startTime; /** * 功能描述: * 结束时间 */ private String stopTime; }
六:编写controller进行发送消息
package com.kaigejava.service; import cn.hutool.core.date.DatePattern; import cn.hutool.json.JSONUtil; import com.kaigejava.commoneresult.Result; import com.kaigejava.dto.PushParams; import com.kaigejava.dto.SystemMessageDTO; import com.kaigejava.util.RedisUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import javax.websocket.EncodeException; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Objects; import static com.kaigejava.service.RedisConstant.KEY; /** * @author 凯哥Java * @description * @company * @since 2023/8/13 10:46 */ @RestController @RequestMapping("/webSocketPush") public class WebSocketController { @Autowired private WebSocketServer webSocket; @Resource private RedisUtil redisUtil; @RequestMapping("/sentMessage") public Result sentMessage(String userId, String message){ try { SystemMessageDTO dto = new SystemMessageDTO(); dto.setTitle(message); String time = DatePattern.NORM_DATETIME_FORMAT.format(new Date()); dto.setTime(time); dto.setStatus(0); //这里将消息存放到redis中 String redisKey = KEY+userId; Object megObj = redisUtil.get(redisKey); List<SystemMessageDTO> list = new ArrayList<>(); if(Objects.nonNull(megObj)){ list = JSONUtil.toList((String)megObj,SystemMessageDTO.class); } list.add(dto); //存放到redis中 redisUtil.set(redisKey,JSONUtil.toJsonStr(list)); webSocket.addTotalAndSendMessageTotalByUserId(userId); } catch (Exception e) { e.printStackTrace(); } return Result.ok("SUCCESS"); } /*** * 功能描述: * 根据用户ID更新ws推送的参数 * @Author: 凯哥Java * @Date: 2022-12-01 09:21:25 * @Param userId: WS中的用户ID * @Param pushParams: 推送参数 * @return: void * @since: 1.0.0 */ @RequestMapping("/changeWsParams") public void changeWsParams(String userId, PushParams pushParams){ try { webSocket.changeParamsByUserId(userId,pushParams); } catch (IOException e) { e.printStackTrace(); } catch (EncodeException e) { e.printStackTrace(); } } }
下面是redis相关的配置
一:配置的相关配置。在application.yml文件中
spring: redis: timeout: 18000 database: 1 port: 6379 host: 127.0.0.1
二:redis配置文件
package com.kaigejava.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.*; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * @author 凯哥Java * @description * @company * @since 2023/8/13 11:17 */ @Configuration @EnableCaching //开启注解 public class RedisConfig extends CachingConfigurerSupport { /** * retemplate相关配置 */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); // 配置连接工厂 template.setConnectionFactory(factory); //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式) Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常 om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jacksonSeial.setObjectMapper(om); // 值采用json序列化 template.setValueSerializer(jacksonSeial); //使用StringRedisSerializer来序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); // 设置hash key 和value序列化模式 template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(jacksonSeial); template.afterPropertiesSet(); return template; } /** * 对hash类型的数据操作 */ @Bean public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) { return redisTemplate.opsForHash(); } /** * 对redis字符串类型数据操作 */ @Bean public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) { return redisTemplate.opsForValue(); } /** * 对链表类型的数据操作 */ @Bean public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) { return redisTemplate.opsForList(); } /** * 对无序集合类型的数据操作 */ @Bean public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) { return redisTemplate.opsForSet(); } /** * 对有序集合类型的数据操作 */ @Bean public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) { return redisTemplate.opsForZSet(); } }
三:redis的工具类
package com.kaigejava.util; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; /** * @author 凯哥Java * @description * @company * @since 2023/8/13 11:18 */ @Component public class RedisUtil { @Autowired private RedisTemplate<String, Object> redisTemplate; public RedisUtil(RedisTemplate<String, Object> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 指定缓存失效时间 * @param key 键 * @param time 时间(秒) * @return */ public boolean expire(String key,long time){ try { if(time>0){ redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据key 获取过期时间 * @param key 键 不能为null * @return 时间(秒) 返回0代表为永久有效 */ public long getExpire(String key){ return redisTemplate.getExpire(key,TimeUnit.SECONDS); } /** * 判断key是否存在 * @param key 键 * @return true 存在 false不存在 */ public boolean hasKey(String key){ try { return redisTemplate.hasKey(key); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除缓存 * @param key 可以传一个值 或多个 */ @SuppressWarnings("unchecked") public void del(String ... key){ if(key!=null&&key.length>0){ if(key.length==1){ redisTemplate.delete(key[0]); }else{ redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key)); } } } //============================String============================= /** * 普通缓存获取 * @param key 键 * @return 值 */ public Object get(String key){ return key==null?null:redisTemplate.opsForValue().get(key); } /** * 普通缓存放入 * @param key 键 * @param value 值 * @return true成功 false失败 */ public boolean set(String key,Object value) { try { redisTemplate.opsForValue().set(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 普通缓存放入并设置时间 * @param key 键 * @param value 值 * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 * @return true成功 false 失败 */ public boolean set(String key,Object value,long time){ try { if(time>0){ redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); }else{ set(key, value); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 递增 * @param key 键 * @param delta 要增加几(大于0) * @return */ public long incr(String key, long delta){ if(delta<0){ throw new RuntimeException("递增因子必须大于0"); } return redisTemplate.opsForValue().increment(key, delta); } /** * 递减 * @param key 键 * @param delta 要减少几(小于0) * @return */ public long decr(String key, long delta){ if(delta<0){ throw new RuntimeException("递减因子必须大于0"); } return redisTemplate.opsForValue().increment(key, -delta); } //================================Map================================= /** * HashGet * @param key 键 不能为null * @param item 项 不能为null * @return 值 */ public Object hget(String key,String item){ return redisTemplate.opsForHash().get(key, item); } /** * 获取hashKey对应的所有键值 * @param key 键 * @return 对应的多个键值 */ public Map<Object,Object> hmget(String key){ return redisTemplate.opsForHash().entries(key); } /** * HashSet * @param key 键 * @param map 对应多个键值 * @return true 成功 false 失败 */ public boolean hmset(String key, Map<String,Object> map){ try { redisTemplate.opsForHash().putAll(key, map); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * HashSet 并设置时间 * @param key 键 * @param map 对应多个键值 * @param time 时间(秒) * @return true成功 false失败 */ public boolean hmset(String key, Map<String,Object> map, long time){ try { redisTemplate.opsForHash().putAll(key, map); if(time>0){ expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * @param key 键 * @param item 项 * @param value 值 * @return true 成功 false失败 */ public boolean hset(String key,String item,Object value) { try { redisTemplate.opsForHash().put(key, item, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * @param key 键 * @param item 项 * @param value 值 * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间 * @return true 成功 false失败 */ public boolean hset(String key,String item,Object value,long time) { try { redisTemplate.opsForHash().put(key, item, value); if(time>0){ expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除hash表中的值 * @param key 键 不能为null * @param item 项 可以使多个 不能为null */ public void hdel(String key, Object... item){ redisTemplate.opsForHash().delete(key,item); } /** * 判断hash表中是否有该项的值 * @param key 键 不能为null * @param item 项 不能为null * @return true 存在 false不存在 */ public boolean hHasKey(String key, String item){ return redisTemplate.opsForHash().hasKey(key, item); } /** * hash递增 如果不存在,就会创建一个 并把新增后的值返回 * @param key 键 * @param item 项 * @param by 要增加几(大于0) * @return */ public double hincr(String key, String item,double by){ return redisTemplate.opsForHash().increment(key, item, by); } /** * hash递减 * @param key 键 * @param item 项 * @param by 要减少记(小于0) * @return */ public double hdecr(String key, String item,double by){ return redisTemplate.opsForHash().increment(key, item,-by); } //============================set============================= /** * 根据key获取Set中的所有值 * @param key 键 * @return */ public Set<Object> sGet(String key){ try { return redisTemplate.opsForSet().members(key); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 根据value从一个set中查询,是否存在 * @param key 键 * @param value 值 * @return true 存在 false不存在 */ public boolean sHasKey(String key,Object value){ try { return redisTemplate.opsForSet().isMember(key, value); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将数据放入set缓存 * @param key 键 * @param values 值 可以是多个 * @return 成功个数 */ public long sSet(String key, Object...values) { try { return redisTemplate.opsForSet().add(key, values); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 将set数据放入缓存 * @param key 键 * @param time 时间(秒) * @param values 值 可以是多个 * @return 成功个数 */ public long sSetAndTime(String key,long time,Object...values) { try { Long count = redisTemplate.opsForSet().add(key, values); if(time>0) { expire(key, time); } return count; } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 获取set缓存的长度 * @param key 键 * @return */ public long sGetSetSize(String key){ try { return redisTemplate.opsForSet().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 移除值为value的 * @param key 键 * @param values 值 可以是多个 * @return 移除的个数 */ public long setRemove(String key, Object ...values) { try { Long count = redisTemplate.opsForSet().remove(key, values); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } //===============================list================================= /** * 获取list缓存的内容 * @param key 键 * @param start 开始 * @param end 结束 0 到 -1代表所有值 * @return */ public List<Object> lGet(String key, long start, long end){ try { return redisTemplate.opsForList().range(key, start, end); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 获取list缓存的长度 * @param key 键 * @return */ public long lGetListSize(String key){ try { return redisTemplate.opsForList().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 通过索引 获取list中的值 * @param key 键 * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推 * @return */ public Object lGetIndex(String key,long index){ try { return redisTemplate.opsForList().index(key, index); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @return */ public boolean lSet(String key, Object value) { try { redisTemplate.opsForList().rightPush(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, Object value, long time) { try { redisTemplate.opsForList().rightPush(key, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @return */ public boolean lSet(String key, List<Object> value) { try { redisTemplate.opsForList().rightPushAll(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, List<Object> value, long time) { try { redisTemplate.opsForList().rightPushAll(key, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据索引修改list中的某条数据 * @param key 键 * @param index 索引 * @param value 值 * @return */ public boolean lUpdateIndex(String key, long index,Object value) { try { redisTemplate.opsForList().set(key, index, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 移除N个值为value * @param key 键 * @param count 移除多少个 * @param value 值 * @return 移除的个数 */ public long lRemove(String key,long count,Object value) { try { Long remove = redisTemplate.opsForList().remove(key, count, value); return remove; } catch (Exception e) { e.printStackTrace(); return 0; } } }
以下是查看消息列表及设置为已读的接口
一:消息实体对象:
package com.kaigejava.dto; import lombok.Data; /** * @author 凯哥Java * @description * @company * @since 2023/8/13 11:05 */ @Data public class SystemMessageDTO { private String title; private String time; private int status; }
二:获取未读列表及获取消息列表的接口
package com.kaigejava.systemmessage; import cn.hutool.core.bean.BeanUtil; import cn.hutool.json.JSONUtil; import com.kaigejava.commoneresult.Result; import com.kaigejava.service.WebSocketServer; import com.kaigejava.util.RedisUtil; import com.kaigejava.dto.SystemMessageDTO; import org.springframework.util.CollectionUtils; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; import static com.kaigejava.service.RedisConstant.KEY; import static com.kaigejava.service.RedisConstant.TOTAL_KEY; /** * @author 凯哥Java * @description * @company * @since 2023/8/13 9:55 */ @RestController @RequestMapping("msg") @CrossOrigin(origins = "*",maxAge = 3600) public class SystemMessageController { @Resource private RedisUtil redisUtil; @Resource private WebSocketServer webSocketServer; @GetMapping("/test") public String test(String msg) { redisUtil.set("kaige-pp", "价格:" + msg); return "OK"; } /** * 根据用户id获取当前用户所有未读的消息,同时设置为已读 **/ @GetMapping("/unread_list/{userId}") public Result unreadList(@PathVariable("userId") String userId) { //这里将消息存放到redis中 String redisKey = KEY + userId; Object megObj = redisUtil.get(redisKey); List<SystemMessageDTO> list = new ArrayList<SystemMessageDTO>(); if (Objects.nonNull(megObj)) { list = JSONUtil.toList((String) megObj, SystemMessageDTO.class); } List<SystemMessageDTO> unreadList = list.stream().filter(dto -> dto.getStatus() == 0).collect(Collectors.toList()); List<SystemMessageDTO> dataList = new ArrayList<SystemMessageDTO>(); if (!CollectionUtils.isEmpty(unreadList)) { for (int i = 0; i < unreadList.size(); i++) { SystemMessageDTO newDto = new SystemMessageDTO(); BeanUtil.copyProperties(list.get(i), newDto); dataList.add(newDto); } //将这些设置为已读。 list.stream().forEach(dto -> dto.setStatus(1)); //重新设置回去 redisUtil.set(redisKey, JSONUtil.toJsonStr(list)); //设置未读数量为0个 String rediske = TOTAL_KEY + userId; redisUtil.set(rediske, 0); //推送 webSocketServer.sendMessageTotalByUserId(userId); } return Result.ok(dataList); } /** * 根据用户id获取当前用户所有的消息,同时设置为已读 **/ @GetMapping("/list/{userId}") public Result list(@PathVariable("userId") String userId) { //redisUtil.set("kaige-pp", "价格:" + msg); //这里将消息存放到redis中 String redisKey = KEY + userId; Object megObj = redisUtil.get(redisKey); List<SystemMessageDTO> list = new ArrayList<>(); if (Objects.nonNull(megObj)) { list = JSONUtil.toList((String) megObj, SystemMessageDTO.class); } webSocketServer.sendMessageTotalByUserId(userId); List<SystemMessageDTO> unreadList = list.stream().filter(dto -> dto.getStatus() == 0).collect(Collectors.toList()); List<SystemMessageDTO> dataList = new ArrayList<SystemMessageDTO>(); if (!CollectionUtils.isEmpty(unreadList)) { for (int i = 0; i < unreadList.size(); i++) { SystemMessageDTO newDto = new SystemMessageDTO(); BeanUtil.copyProperties(list.get(i), newDto); dataList.add(newDto); } //将这些设置为已读。 list.stream().forEach(dto -> dto.setStatus(1)); //重新设置回去 redisUtil.set(redisKey, JSONUtil.toJsonStr(list)); //设置未读数量为0个 String rediske = TOTAL_KEY + userId; redisUtil.set(rediske, 0); } return Result.ok(list); } }
最后进行测试
一:启动项目后,
在http://www.jsons.cn/websocket/
输入地址:ws://localhost:8082/websocket/99
注意:这个连接是在websocketService对象中
二:发送消息测试下:
http://localhost:8082/webSocketPush/sentMessage?userId=99&message=kaigejava
注意:这个连接是在WebSocketController中。
查看客户端是否收到了消息推送的未读消息数量是1.如下图:
这样,说明在服务端接收到消息后,根据用户id,将消息推送到客户端。已经OK的。
三:我们在来看看根据用户id获取消息列表及未读消息列表
消息列表:http://localhost:8082/msg/list/99
未读消息列表:http://localhost:8082/msg/unread_list/99
OK 至此。spring boot +Redis实现消息推送所有代码及教程已经写完。欢迎大家关注凯哥Java(kaigejava)获取源码