Redis 批量操作 pipeline 模式
業務場景
項目中場景需要 get 一批 key 的 value,因爲 redis 的 get 操作 (不單單是 get 命令) 是阻塞的,如果循環取值的話,就算是內網,耗時也是巨大的。所以想到了 redis 的 pipeline 命令。
pipeline 簡介
- 非 pipeline:client 一個請求,redis server 一個響應,期間 client 阻塞
- Pipeline:redis 的管道命令,允許 client 將多個請求依次發給服務器(redis 的客戶端,如 jedisCluster,lettuce 等都實現了對 pipeline 的封裝),過程中而不需要等待請求的回覆,在最後再一併讀取結果即可。
單機版
單機版比較簡單,批量獲取
1//換成真實的redis實例
2Jedis jedis = new Jedis();
3//獲取管道
4Pipeline p = jedis.pipelined();
5for (int i = 0; i < 10000; i++) {
6 p.get("key_" + i);
7}
8//獲取結果
9List<Object> results = p.syncAndReturnAll();
批量插入
1String key = "key";
2Jedis jedis = new Jedis();
3Pipeline p = jedis.pipelined();
4List<String> cacheData = .... //要插入的數據列表
5for(String data: cacheData ){
6 p.hset(key, data);
7}
8p.sync();
9jedis.close();
集羣版
實際上遇到的問題是,項目上所用到的 Redis 是集羣,初始化的時候使用的類是 JedisCluster 而不是 Jedis。去查了 JedisCluster 的文檔,並沒有發現提供有像 Jedis 一樣的獲取 Pipeline 對象的 pipelined() 方法。解決方案:
Redis 集羣規範有說: Redis 集羣的鍵空間被分割爲 16384 個槽(slot), 集羣的最大節點數量也是 16384 個。每個主節點都負責處理 16384 個哈希槽的其中一部分。當我們說一個集羣處於 “穩定”(stable)狀態時, 指的是集羣沒有在執行重配置(reconfiguration)操作, 每個哈希槽都只由一個節點進行處理。所以可以根據要插入的 key 知道這個 key 所對應的槽的號碼,再通過這個槽的號碼從集羣中找到對應 Jedis。具體實現如下:
1//初始化得到了jedis cluster, 如何獲取HostAndPort集合代碼就不寫了
2Set<HostAndPort> nodes = .....
3JedisCluster jedisCluster = new JedisCluster(nodes);
4
5Map<String, JedisPool> nodeMap = jedisCluster.getClusterNodes();
6String anyHost = nodeMap.keySet().iterator().next();
7
8//getSlotHostMap方法在下面有
9TreeMap<Long, String> slotHostMap = getSlotHostMap(anyHost);
1private static TreeMap<Long, String> getSlotHostMap(String anyHostAndPortStr) {
2 TreeMap<Long, String> tree = new TreeMap<Long, String>();
3 String parts[] = anyHostAndPortStr.split(":");
4 HostAndPort anyHostAndPort = new HostAndPort(parts[0], Integer.parseInt(parts[1]));
5 try{
6 Jedis jedis = new Jedis(anyHostAndPort.getHost(), anyHostAndPort.getPort());
7 List<Object> list = jedis.clusterSlots();
8 for (Object object : list) {
9 List<Object> list1 = (List<Object>) object;
10 List<Object> master = (List<Object>) list1.get(2);
11 String hostAndPort = new String((byte[]) master.get(0)) + ":" + master.get(1);
12 tree.put((Long) list1.get(0), hostAndPort);
13 tree.put((Long) list1.get(1), hostAndPort);
14 }
15 jedis.close();
16 }catch(Exception e){
17
18 }
19 return tree;
20}
上面這幾步可以在初始化的時候就完成。不需要每次都調用, 把 nodeMap 和 slotHostMap 都定義爲靜態變量。
1//獲取槽號
2int slot = JedisClusterCRC16.getSlot(key);
3//獲取到對應的Jedis對象
4Map.Entry<Long, String> entry = slotHostMap.lowerEntry(Long.valueOf(slot));
5Jedis jedis = nodeMap.get(entry.getValue()).getResource();
建議上面這步操作可以封裝成一個靜態方法。比如命名爲 public static Jedis getJedisByKey(String key) 之類的。意思就是在集羣中, 通過 key 獲取到這個 key 所對應的 Jedis 對象。這樣再通過上面的 jedis.pipelined(); 來就可以進行批量插入了。以下是一個比較完整的封裝
1import redis.clients.jedis.*;
3import redis.clients.jedis.exceptions.JedisMovedDataException;
4import redis.clients.jedis.exceptions.JedisRedirectionException;
5import redis.clients.util.JedisClusterCRC16;
6import redis.clients.util.SafeEncoder;
7
8import java.io.Closeable;
9import java.lang.reflect.Field;
10import java.util.*;
11import java.util.function.BiConsumer;
12
14public class JedisClusterPipeline extends PipelineBase implements Closeable {
15
16 /**
17 * 用於獲取 JedisClusterInfoCache
18 */
19 private JedisSlotBasedConnectionHandler connectionHandler;
20 /**
21 * 根據hash值獲取連接
22 */
23 private JedisClusterInfoCache clusterInfoCache;
24
25 /**
26 * 也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問接口
27 * JedisCluster繼承於BinaryJedisCluster
28 * 在BinaryJedisCluster,connectionHandler屬性protected修飾的,所以需要反射
29 *
30 *
31 * 而 JedisClusterInfoCache 屬性在JedisClusterConnectionHandler中,但是這個類是抽象類,
32 * 但它有一個實現類JedisSlotBasedConnectionHandler
33 */
34 private static final Field FIELD_CONNECTION_HANDLER;
35 private static final Field FIELD_CACHE;
36 static {
37 FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
38 FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
39 }
40
41 /**
42 * 根據順序存儲每個命令對應的Client
43 */
44 private Queue<Client> clients = new LinkedList<>();
45 /**
46 * 用於緩存連接
47 * 一次pipeline過程中使用到的jedis緩存
48 */
49 private Map<JedisPool, Jedis> jedisMap = new HashMap<>();
50 /**
51 * 是否有數據在緩存區
52 */
53 private boolean hasDataInBuf = false;
54
55 /**
56 * 根據jedisCluster實例生成對應的JedisClusterPipeline
57 * 通過此方式獲取pipeline進行操作的話必須調用close()關閉管道
58 * 調用本類裏pipelineXX方法則不用close(),但建議最好還是在finally裏調用一下close()
59 * @param
60 * @return
61 */
62 public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
63 JedisClusterPipeline pipeline = new JedisClusterPipeline();
64 pipeline.setJedisCluster(jedisCluster);
65 return pipeline;
66 }
67
68 public JedisClusterPipeline() {
69 }
70
71 public void setJedisCluster(JedisCluster jedis) {
72 connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
73 clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
74 }
75
76 /**
77 * 刷新集羣信息,當集羣信息發生變更時調用
78 * @param
79 * @return
80 */
81 public void refreshCluster() {
82 connectionHandler.renewSlotCache();
83 }
84
85 /**
86 * 同步讀取所有數據. 與syncAndReturnAll()相比,sync()只是沒有對數據做反序列化
87 */
88 public void sync() {
89 innerSync(null);
90 }
91
92 /**
93 * 同步讀取所有數據 並按命令順序返回一個列表
94 *
95 * @return 按照命令的順序返回所有的數據
96 */
97 public List<Object> syncAndReturnAll() {
98 List<Object> responseList = new ArrayList<>();
99
100 innerSync(responseList);
101
102 return responseList;
103 }
104
105 @Override
106 public void close() {
107 clean();
108 clients.clear();
109 for (Jedis jedis : jedisMap.values()) {
110 if (hasDataInBuf) {
111 flushCachedData(jedis);
112 }
113 jedis.close();
114 }
115 jedisMap.clear();
116 hasDataInBuf = false;
117 }
118
119 private void flushCachedData(Jedis jedis) {
120 try {
121 jedis.getClient().getAll();
122 } catch (RuntimeException ex) {
123 }
124 }
125
126 @Override
127 protected Client getClient(String key) {
128 byte[] bKey = SafeEncoder.encode(key);
129 return getClient(bKey);
130 }
131
132 @Override
133 protected Client getClient(byte[] key) {
134 Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));
135 Client client = jedis.getClient();
136 clients.add(client);
137 return client;
138 }
139
140 private Jedis getJedis(int slot) {
141 JedisPool pool = clusterInfoCache.getSlotPool(slot);
142 // 根據pool從緩存中獲取Jedis
143 Jedis jedis = jedisMap.get(pool);
144 if (null == jedis) {
145 jedis = pool.getResource();
146 jedisMap.put(pool, jedis);
147 }
148 hasDataInBuf = true;
149 return jedis;
150 }
151
152 public static void pipelineSetEx(String[] keys, String[] values, int[] exps,JedisCluster jedisCluster) {
153 operate(new Command() {
154 @Override
155 public List execute() {
156 JedisClusterPipeline p = pipelined(jedisCluster);
157 for (int i = 0, len = keys.length; i < len; i++) {
158 p.setex(keys[i], exps[i], values[i]);
159 }
160 return p.syncAndReturnAll();
161 }
162 });
163 }
164
165 public static List<Map<String, String>> pipelineHgetAll(String[] keys,JedisCluster jedisCluster) {
166 return operate(new Command() {
167 @Override
168 public List execute() {
169 JedisClusterPipeline p = pipelined(jedisCluster);
170 for (int i = 0, len = keys.length; i < len; i++) {
171 p.hgetAll(keys[i]);
172 }
173 return p.syncAndReturnAll();
174 }
175 });
176 }
177
178 public static List<Boolean> pipelineSismember(String[] keys, String members,JedisCluster jedisCluster) {
179 return operate(new Command() {
180 @Override
181 public List execute() {
182 JedisClusterPipeline p = pipelined(jedisCluster);
183 for (int i = 0, len = keys.length; i < len; i++) {
184 p.sismember(keys[i], members);
185 }
186 return p.syncAndReturnAll();
187 }
188 });
189 }
190
191 public static <O> List pipeline(BiConsumer<O, JedisClusterPipeline> function, O obj,JedisCluster jedisCluster) {
192 return operate(new Command() {
193 @Override
194 public List execute() {
195 JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jedisCluster);
196 function.accept(obj, jcp);
197 return jcp.syncAndReturnAll();
198 }
199 });
200 }
201
202 private void innerSync(List<Object> formatted) {
203 HashSet<Client> clientSet = new HashSet<>();
204 try {
205 for (Client client : clients) {
206 // 在sync()調用時其實是不需要解析結果數據的,但是如果不調用get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,因此需要調用get()來觸發錯誤。
207 // 其實如果Response的data屬性可以直接獲取,可以省掉解析數據的時間,然而它並沒有提供對應方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了
208 Object data = generateResponse(client.getOne()).get();
209 if (null != formatted) {
210 formatted.add(data);
211 }
212 // size相同說明所有的client都已經添加,就不用再調用add方法了
213 if (clientSet.size() != jedisMap.size()) {
214 clientSet.add(client);
215 }
216 }
217 } catch (JedisRedirectionException jre) {
218 if (jre instanceof JedisMovedDataException) {
219 // if MOVED redirection occurred, rebuilds cluster's slot cache,
220 // recommended by Redis cluster specification
221 refreshCluster();
222 }
223
224 throw jre;
225 } finally {
226 if (clientSet.size() != jedisMap.size()) {
227 // 所有還沒有執行過的client要保證執行(flush),防止放回連接池後後面的命令被污染
228 for (Jedis jedis : jedisMap.values()) {
229 if (clientSet.contains(jedis.getClient())) {
230 continue;
231 }
232 flushCachedData(jedis);
233 }
234 }
235 hasDataInBuf = false;
236 close();
237 }
238 }
239
240 private static Field getField(Class<?> cls, String fieldName) {
241 try {
242 Field field = cls.getDeclaredField(fieldName);
243 field.setAccessible(true);
244 return field;
245 } catch (NoSuchFieldException | SecurityException e) {
246 throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);
247 }
248 }
249
250 @SuppressWarnings({"unchecked" })
251 private static <T> T getValue(Object obj, Field field) {
252 try {
253 return (T)field.get(obj);
254 } catch (IllegalArgumentException | IllegalAccessException e) {
257 throw new RuntimeException(e);
258 }
259 }
260
261 private static <T> T operate(Command command) {
262 try {
263 return command.execute();
264 } catch (Exception e) {
266 throw new RuntimeException(e);
267 }
268 }
269
270 interface Command {
271 /**
272 * 具體執行命令
273 *
274 * @param <T>
275 * @return
276 */
277 <T> T execute();
278 }
279}
使用例子
1 public Object testPipelineOperate() {
2 // String[] keys = {"dylan1","dylan2"};
3 // String[] values = {"dylan1-v1","dylan2-v2"};
4 // int[] exps = {100,200};
5 // JedisClusterPipeline.pipelineSetEx(keys, values, exps, jedisCluster);
6 long start = System.currentTimeMillis();
7
8 List<String> keyList = new ArrayList<>();
9 for (int i = 0; i < 1000; i++) {
10 keyList.add(i + "");
11 }
12 // List<String> pipeline = JedisClusterPipeline.pipeline(this::getValue, keyList, jedisCluster);
13 // List<String> pipeline = JedisClusterPipeline.pipeline(this::getHashValue, keyList, jedisCluster);
14 String[] keys = {"dylan-test1", "dylan-test2"};
15
16 List<Map<String, String>> all = JedisClusterPipeline.pipelineHgetAll(keys, jedisCluster);
17 long end = System.currentTimeMillis();
18 System.out.println("testPipelineOperate cost:" + (end-start));
19
20 return Response.success(all);
21 }
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/jycl2ZIboUVb-Qv5OZyAMw