Redis集群下批处理-串行化执行代码实践
- Redis
- 时间:2023-01-05 22:47
- 3673人已阅读
简介
当在Redis集群模式下,我们就不能使用mset和pipeline了。因为分片集群下,插槽是根据key算出来的,不同的key算出的slot不同,slot不同的话,可能Redis实例节点也不是一个,所以,不能直接使用mset或者pipeline了。第二种方案:串行slot,简单来说,就是执行前,客户端先计算一下对应的key的slot,一样slot的key就放到一个组里边,不同的,就放到不同的组里边,
🔔🔔🔔好消息!好消息!🔔🔔🔔
有需要的朋友👉:联系凯哥
当在Redis集群模式下,我们就不能使用mset和pipeline了。因为分片集群下,插槽是根据key算出来的,不同的key算出的slot不同,slot不同的话,可能Redis实例节点也不是一个,所以,不能直接使用mset或者pipeline了。
第二种方案:串行slot,简单来说,就是执行前,客户端先计算一下对应的key的slot,一样slot的key就放到一个组里边,不同的,就放到不同的组里边,然后对每个组执行pipeline的批处理,他就能串行执行各个组的命令,这种做法比第一种方法耗时要少,但是缺点呢,相对来说复杂一点,所以这种方案还需要优化一下。
import org.springframework.data.redis.connection.ClusterSlotHashUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
public class JedisClusterTest {
private JedisCluster jedisCluster;
@Resource
private StringRedisTemplate stringRedisTemplate;
@BeforeEach
void setUp() {
// 配置连接池
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(8);
poolConfig.setMaxIdle(8);
poolConfig.setMinIdle(0);
poolConfig.setMaxWaitMillis(1000);
HashSet<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("192.168.1.11", 7001));
nodes.add(new HostAndPort("192.168.1.11", 7002));
nodes.add(new HostAndPort("192.168.1.11", 7003));
nodes.add(new HostAndPort("192.168.1.11", 8001));
nodes.add(new HostAndPort("192.168.1.11", 8002));
nodes.add(new HostAndPort("192.168.1.11", 8003));
jedisCluster = new JedisCluster(nodes, poolConfig);
}
@Test
void testMSet() {
//直接执行会报错。因为集群模式下不能mset
jedisCluster.mset("name", "Jack", "age", "21", "sex", "male");
}
@Test
void testMSet2() {
Map<String, String> map = new HashMap<>(3);
map.put("name", "Jack");
map.put("age", "21");
map.put("sex", "Male");
//对Map数据进行分组。根据相同的slot放在一个分组
//key就是slot,value就是一个组
Map<Integer, List<Map.Entry<String, String>>> result = map.entrySet()
.stream()
.collect(Collectors.groupingBy(
entry -> ClusterSlotHashUtil.calculateSlot(entry.getKey()))
);
//串行的去执行mset的逻辑
for (List<Map.Entry<String, String>> list : result.values()) {
String[] arr = new String[list.size() * 2];
int j = 0;
for (int i = 0; i < list.size(); i++) {
j = i<<2;
Map.Entry<String, String> e = list.get(0);
arr[j] = e.getKey();
arr[j + 1] = e.getValue();
}
jedisCluster.mset(arr);
}
}
@AfterEach
void tearDown() {
if (jedisCluster != null) {
jedisCluster.close();
}
}
@Test
void testMSetInCluster() {
Map<String, String> map = new HashMap<>(3);
map.put("name", "Rose");
map.put("age", "21");
map.put("sex", "Female");
stringRedisTemplate.opsForValue().multiSet(map);
List<String> strings = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("name", "age", "sex"));
strings.forEach(System.out::println);
}
}