Redis集群下批处理-串行化执行代码实践
- Redis
- 时间:2023-01-05 22:47
- 3438人已阅读
简介
当在Redis集群模式下,我们就不能使用mset和pipeline了。因为分片集群下,插槽是根据key算出来的,不同的key算出的slot不同,slot不同的话,可能Redis实例节点也不是一个,所以,不能直接使用mset或者pipeline了。第二种方案:串行slot,简单来说,就是执行前,客户端先计算一下对应的key的slot,一样slot的key就放到一个组里边,不同的,就放到不同的组里边,
🔔🔔🔔好消息!好消息!🔔🔔🔔
有需要的朋友👉:联系凯哥
当在Redis集群模式下,我们就不能使用mset和pipeline了。因为分片集群下,插槽是根据key算出来的,不同的key算出的slot不同,slot不同的话,可能Redis实例节点也不是一个,所以,不能直接使用mset或者pipeline了。
第二种方案:串行slot,简单来说,就是执行前,客户端先计算一下对应的key的slot,一样slot的key就放到一个组里边,不同的,就放到不同的组里边,然后对每个组执行pipeline的批处理,他就能串行执行各个组的命令,这种做法比第一种方法耗时要少,但是缺点呢,相对来说复杂一点,所以这种方案还需要优化一下。
import org.springframework.data.redis.connection.ClusterSlotHashUtil; import org.springframework.data.redis.core.StringRedisTemplate; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import javax.annotation.Resource; import java.util.*; import java.util.stream.Collectors; public class JedisClusterTest { private JedisCluster jedisCluster; @Resource private StringRedisTemplate stringRedisTemplate; @BeforeEach void setUp() { // 配置连接池 JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(8); poolConfig.setMaxIdle(8); poolConfig.setMinIdle(0); poolConfig.setMaxWaitMillis(1000); HashSet<HostAndPort> nodes = new HashSet<>(); nodes.add(new HostAndPort("192.168.1.11", 7001)); nodes.add(new HostAndPort("192.168.1.11", 7002)); nodes.add(new HostAndPort("192.168.1.11", 7003)); nodes.add(new HostAndPort("192.168.1.11", 8001)); nodes.add(new HostAndPort("192.168.1.11", 8002)); nodes.add(new HostAndPort("192.168.1.11", 8003)); jedisCluster = new JedisCluster(nodes, poolConfig); } @Test void testMSet() { //直接执行会报错。因为集群模式下不能mset jedisCluster.mset("name", "Jack", "age", "21", "sex", "male"); } @Test void testMSet2() { Map<String, String> map = new HashMap<>(3); map.put("name", "Jack"); map.put("age", "21"); map.put("sex", "Male"); //对Map数据进行分组。根据相同的slot放在一个分组 //key就是slot,value就是一个组 Map<Integer, List<Map.Entry<String, String>>> result = map.entrySet() .stream() .collect(Collectors.groupingBy( entry -> ClusterSlotHashUtil.calculateSlot(entry.getKey())) ); //串行的去执行mset的逻辑 for (List<Map.Entry<String, String>> list : result.values()) { String[] arr = new String[list.size() * 2]; int j = 0; for (int i = 0; i < list.size(); i++) { j = i<<2; Map.Entry<String, String> e = list.get(0); arr[j] = e.getKey(); arr[j + 1] = e.getValue(); } jedisCluster.mset(arr); } } @AfterEach void tearDown() { if (jedisCluster != null) { jedisCluster.close(); } } @Test void testMSetInCluster() { Map<String, String> map = new HashMap<>(3); map.put("name", "Rose"); map.put("age", "21"); map.put("sex", "Female"); stringRedisTemplate.opsForValue().multiSet(map); List<String> strings = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("name", "age", "sex")); strings.forEach(System.out::println); } }