Redis Pipelining 底層原理分析及實踐

作者:vivo 互聯網服務器團隊 - Wang Fei

Redis 是一種基於客戶端 - 服務端模型以及請求 / 響應的 TCP 服務。在遇到批處理命令執行時,Redis 提供了 Pipelining(管道) 來提升批處理性能。本文結合實踐分析了 Spring Boot 框架下 Redis 的 Lettuce 客戶端和 Redisson 客戶端對 Pipeline 特性的支持原理,並針對實踐過程中遇到的問題進行了分析,可以幫助開發者瞭解不同客戶端對 Pipeline 支持原理及避免實際使用中出現問題。

一、前言

Redis 已經提供了像 mget 、mset 這種批量的命令,但是某些操作根本就不支持或沒有批量的操作,從而與 Redis 高性能背道而馳。爲此, Redis 基於管道機制,提供 Redis Pipeline 新特性。Redis Pipeline 是一種通過一次性發送多條命令並在執行完後一次性將結果返回,從而減少客戶端與 redis 的通信次數來實現降低往返延時時間提升操作性能的技術。目前,Redis Pipeline 是被很多個版本的 Redis 客戶端所支持的。 

二、Pipeline 底層原理分析

 2.1 Redis 單個命令執行基本步驟

Redis 是一種基於客戶端 - 服務端模型以及請求 / 響應的 TCP 服務。一次 Redis 客戶端發起的請求,經過服務端的響應後,大致會經歷如下的步驟:

  1. 客戶端發起一個(查詢 / 插入)請求,並監聽 socket 返回,通常情況都是阻塞模式等待 Redis 服務器的響應。

  2. 服務端處理命令,並且返回處理結果給客戶端。

  3. 客戶端接收到服務的返回結果,程序從阻塞代碼處返回。

2.2 RTT 時間

Redis 客戶端和服務端之間通過網絡連接進行數據傳輸,數據包從客戶端到達服務器,並從服務器返回數據回覆客戶端的時間被稱之爲 RTT(Round Trip Time - 往返時間)。我們可以很容易就意識到,Redis 在連續請求服務端時,如果 RTT 時間爲 250ms, 即使 Redis 每秒能處理 100k 請求,但也會因爲網絡傳輸花費大量時間,導致每秒最多也只能處理 4 個請求,導致整體性能的下降。

2.3 Redis Pipeline

爲了提升效率,這時候 Pipeline 出現了。Pipelining 不僅僅能夠降低 RRT,實際上它極大的提升了單次執行的操作數。這是因爲如果不使用 Pipelining,那麼每次執行單個命令, 從訪問數據的結構和服務端產生應答的角度,它的成本是很低的。但是從執行網絡 IO 的角度,它的成本其實是很高的。其中涉及到 read() 和 write() 的系統調用,這意味着需要從用戶態切換到內核態, 而這個上下文的切換成本是巨大的。

當使用 Pipeline 時,它允許多個命令的讀通過一次 read() 操作,多個命令的應答使用一次 write() 操作,它允許客戶端可以一次發送多條命令,而不等待上一條命令執行的結果。**不僅減少了 RTT,同時也減少了 IO 調用次數(IO 調用涉及到用戶態到內核態之間的切換),最終提升程序的執行效率與性能。**如下圖:

要支持 Pipeline,其實既要服務端的支持,也要客戶端支持。對於服務端來說,所需要的是能夠處理一個客戶端通過同一個 TCP 連接發來的多個命令,可以理解爲,這裏將多個命令切分,和處理單個命令一樣,Redis 就是這樣處理的。而客戶端,則是要將多個命令緩存起來,緩衝區滿了就發送,然後再寫緩衝,最後才處理 Redis 的應答。

三、Pipeline 基本使用及性能比較

下面我們以給 10w 個 set 結構分別插入一個整數值爲例,分別使用 jedis 單個命令插入、jedis 使用 Pipeline 模式進行插入和 redisson 使用 Pipeline 模式進行插入以及測試其耗時。

@Slf4j
public class RedisPipelineTestDemo {
    public static void main(String[] args) {
        //連接redis
        Jedis jedis = new Jedis("10.101.17.180", 6379);
        //jedis逐一給每個set新增一個value
        String zSetKey = "Pipeline-test-set";
        int size = 100000;
        long begin = System.currentTimeMillis();
        for (int i = 0; i < size; i++) {
            jedis.sadd(zSetKey + i, "aaa");
        }
        log.info("Jedis逐一給每個set新增一個value耗時:{}ms", (System.currentTimeMillis() - begin));
        //Jedis使用Pipeline模式         Pipeline Pipeline = jedis.Pipelined();
        begin = System.currentTimeMillis();
        for (int i = 0; i < size; i++) {             Pipeline.sadd(zSetKey + i, "bbb");
        }         Pipeline.sync();
        log.info("Jedis Pipeline模式耗時:{}ms", (System.currentTimeMillis() - begin));
        //Redisson使用Pipeline模式
        Config config = new Config();
        config.useSingleServer().setAddress("redis://10.101.17.180:6379");
        RedissonClient redisson = Redisson.create(config);
        RBatch redisBatch = redisson.createBatch();
        begin = System.currentTimeMillis();
        for (int i = 0; i < size; i++) {
            redisBatch.getSet(zSetKey + i).addAsync("ccc");
        }
        redisBatch.execute();
        log.info("Redisson Pipeline模式耗時:{}ms", (System.currentTimeMillis() - begin));
        //關閉         Pipeline.close();
        jedis.close();
        redisson.shutdown();
    }
}

測試結果如下:

Jedis 逐一給每個 set 新增一個 value 耗時:162655ms

Jedis Pipeline 模式耗時:504ms

Redisson Pipeline 模式耗時:1399ms

我們發現使用 Pipeline 模式對應的性能會明顯好於單個命令執行的情況。

四、項目中實際應用

在實際使用過程中有這樣一個場景,很多應用在節假日的時候需要更新應用圖標樣式,在運營進行後臺配置的時候, 可以根據圈選的用戶標籤預先計算出單個用戶需要下發的圖標樣式並存儲在 Redis 裏面,從而提升性能,這裏就涉及 Redis 的批量操作問題,業務流程如下:

爲了提升 Redis 操作性能,我們決定使用 Redis Pipelining 機制進行批量執行。

4.1 Redis 客戶端對比

針對 Java 技術棧而言,目前 Redis 使用較多的客戶端爲 Jedis、Lettuce 和 Redisson。

目前項目主要是基於 SpringBoot 開發,針對 Redis,其默認的客戶端爲 Lettuce,所以我們基於 Lettuce 客戶端進行分析。

4.2 Spring 環境下 Lettuce 客戶端對 Pipeline 的實現

在 Spring 環境下,使用 Redis 的 Pipeline 也是很簡單的。spring-data-redis 提供了

StringRedisTemplate 簡化了對 Redis 的操作,  只需要調用 StringRedisTemplate 的 executePipelined 方法就可以了,但是在參數中提供了兩種回調方式:SessionCallback 和 RedisCallback

兩種使用方式如下 (這裏以操作 set 結構爲例):

RedisCallback 的使用方式:

public void testRedisCallback() {
        List<Integer> ids= Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        Integer contentId = 1;
        redisTemplate.executePipelined(new InsertPipelineExecutionA(ids, contentId));
    }
@AllArgsConstructor
    private static class InsertPipelineExecutionA implements RedisCallback<Void> {
        private final List<Integer> ids;
        private final Integer contentId;
        @Override
        public Void doInRedis(RedisConnection connection) DataAccessException {
            RedisSetCommands redisSetCommands = connection.setCommands();
            ids.forEach(id-> {
                String redisKey = "aaa:" + id;
                String value = String.valueOf(contentId);
                redisSetCommands.sAdd(redisKey.getBytes(), value.getBytes());
            });
            return null;
        }
    }

SessionCallback 的使用方式:

public void testSessionCallback() {
        List<Integer> ids= Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        Integer contentId = 1;
        redisTemplate.executePipelined(new InsertPipelineExecutionB(ids, contentId));
    }
@AllArgsConstructor
    private static class InsertPipelineExecutionB implements SessionCallback<Void> {
        private final List<Integer> ids;
        private final Integer contentId;
        @Override
        public <K, V> Void execute(RedisOperations<K, V> operations) throws DataAccessException {
            SetOperations<String, String> setOperations = (SetOperations<String, String>) operations.opsForSet();
            ids.forEach(id-> {
                String redisKey = "aaa:" + id;
                String value = String.valueOf(contentId);
                setOperations.add(redisKey, value);
            });
            return null;
        }
    }

4.3 RedisCallBack 和 SessionCallback 之間的比較

1、RedisCallBack 和 SessionCallback 都可以實現回調,通過它們可以在同一條連接中一次執行多個 redis 命令。

2、RedisCallback 使用的是原生

RedisConnection,用起來比較麻煩,比如上面執行 set 的 add 操作,key 和 value 需要進行轉換,可讀性差,但原生 api 提供的功能比較齊全。

3、SessionCalback 提供了良好的封裝,可以優先選擇使用這種回調方式。

最終的代碼實現如下:

public void executeB(List<Integer> userIds, Integer iconId) {
        redisTemplate.executePipelined(new InsertPipelineExecution(userIds, iconId));
}
@AllArgsConstructor
private static class InsertPipelineExecution implements SessionCallback<Void> {
     private final List<Integer> userIds;
     private final Integer iconId;
     @Override
     public <K, V> Void execute(RedisOperations<K, V> operations) throws DataAccessException {
         SetOperations<String, String> setOperations = (SetOperations<String, String>) operations.opsForSet();
         userIds.forEach(userId -> {
             String redisKey = "aaa:" + userId;
             String value = String.valueOf(iconId);
             setOperations.add(redisKey, value);
         });
         return null;
     }
}

4.4 源碼分析

那麼爲什麼使用 Pipeline 方式會對性能有較大提升呢,我們現在從源碼入手着重分析一下:

4.4.1 Pipeline 方式下獲取連接相關原理分析:

@Override
    public List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) {
        Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
        Assert.notNull(session, "Callback object must not be null");
        //1. 獲取對應的Redis連接工廠
        RedisConnectionFactory factory = getRequiredConnectionFactory();
        //2. 綁定連接過程
        RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
        try {
            //3. 執行命令流程, 這裏請求參數爲RedisCallback, 裏面有對應的回調操作
           return execute((RedisCallback<List<Object>>) connection -> {
                //具體的回調邏輯
                connection.openPipeline();
                boolean PipelinedClosed = false;
                try {
                    //執行命令
                    Object result = executeSession(session);
                    if (result != null) {
                        throw new InvalidDataAccessApiUsageException(
                                "Callback cannot return a non-null value as it gets overwritten by the Pipeline");
                    }
                    List<Object> closePipeline = connection.closePipeline();      PipelinedClosed = true;
                    return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
                } finally {
                    if (!PipelinedClosed) {
                        connection.closePipeline();
                    }
                }
            });
        } finally {
            RedisConnectionUtils.unbindConnection(factory);
        }
    }

① 獲取對應的 Redis 連接工廠,這裏要使用 Pipeline 特性需要使用

LettuceConnectionFactory 方式,這裏獲取的連接工廠就是 LettuceConnectionFactory。

② 綁定連接過程,具體指的是將當前連接綁定到當前線程上面, 核心方法爲:doGetConnection。

public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
            boolean enableTransactionSupport) {
        Assert.notNull(factory, "No RedisConnectionFactory specified");
        //核心類,有緩存作用,下次可以從這裏獲取已經存在的連接
        RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
        //如果connHolder不爲null, 則獲取已經存在的連接, 提升性能
        if (connHolder != null) {
            if (enableTransactionSupport) {
                potentiallyRegisterTransactionSynchronisation(connHolder, factory);
            }
            return connHolder.getConnection();
        }
        ......
        //第一次獲取連接,需要從Redis連接工廠獲取連接
        RedisConnection conn = factory.getConnection();
        //bind = true 執行綁定
        if (bind) {
            RedisConnection connectionToBind = conn;
            ......
            connHolder = new RedisConnectionHolder(connectionToBind);
            //綁定核心代碼: 將獲取的連接和當前線程綁定起來
            TransactionSynchronizationManager.bindResource(factory, connHolder);
            ......
            return connHolder.getConnection();
        }
        return conn;
    }

裏面有個核心類 RedisConnectionHolder,我們看一下

RedisConnectionHolder connHolder = 

(RedisConnectionHolder) 

TransactionSynchronizationManager.getResource(factory);

@Nullable
    public static Object getResource(Object key) {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        Object value = doGetResource(actualKey);
        if (value != null && logger.isTraceEnabled()) {
            logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
                    Thread.currentThread().getName() + "]");
        }
        return value;
    }

裏面有一個核心方法 doGetResource

(actualKey),大家很容易猜測這裏涉及到一個 map 結構,如果我們看源碼,也確實是這樣一個結構。

@Nullable
    private static Object doGetResource(Object actualKey) {
        Map<Object, Object> map = resources.get();
        if (map == null) {
            return null;
        }
        Object value = map.get(actualKey);
        // Transparently remove ResourceHolder that was marked as void...
        if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
            map.remove(actualKey);
            // Remove entire ThreadLocal if empty...
            if (map.isEmpty()) {
                resources.remove();
            }
            value = null;
        }
        return value;
    }

resources 是一個 ThreadLocal 類型,這裏會涉及到根據 RedisConnectionFactory 獲取到連接 connection 的邏輯, 如果下一次是同一個 actualKey,那麼就直接使用已經存在的連接,而不需要新建一個連接。第一次這裏 map 爲 null,就直接返回了,然後回到 doGetConnection 方法,由於這裏 bind 爲 true,我們會執行 TransactionSynchronizationManager.bindResource(factory, connHolder);,也就是將連接和當前線程綁定了起來。

public static void bindResource(Object key, Object value) throws IllegalStateException {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        Assert.notNull(value, "Value must not be null");
        Map<Object, Object> map = resources.get();
        // set ThreadLocal Map if none found
        if (map == null) {
            map = new HashMap<>();
            resources.set(map);
        }
        Object oldValue = map.put(actualKey, value);
        ......
    }

③ 我們回到 executePipelined,在獲取到連接工廠,將連接和當前線程綁定起來以後,就開始需要正式去執行命令了, 這裏會調用 execute 方法

@Override
@Nullable
public <T> T execute(RedisCallback<T> action) {
    return execute(action, isExposeConnection());
}

這裏我們注意到 execute 方法的入參爲 RedisCallbackaction,RedisCallback 對應的 doInRedis 操作如下, 這裏在後面的調用過程中會涉及到回調。

connection.openPipeline();
boolean PipelinedClosed = false;
try {
    Object result = executeSession(session);
    if (result != null) {
        throw new InvalidDataAccessApiUsageException(
                "Callback cannot return a non-null value as it gets overwritten by the Pipeline");
    }
    List<Object> closePipeline = connection.closePipeline();  PipelinedClosed = true;
    return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {
    if (!PipelinedClosed) {
        connection.closePipeline();
    }
}

我們再來看 execute(action, 

isExposeConnection()) 方法,這裏最終會調用

execute(RedisCallbackaction, boolean exposeConnection, boolean Pipeline) 方法。

@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean Pipeline) {
    Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
    Assert.notNull(action, "Callback object must not be null");
    //獲取對應的連接工廠
    RedisConnectionFactory factory = getRequiredConnectionFactory();
    RedisConnection conn = null;
    try {
        if (enableTransactionSupport) {
            // only bind resources in case of potential transaction synchronization
            conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
        } else {
            //獲取對應的連接(enableTransactionSupport=false)   
            conn = RedisConnectionUtils.getConnection(factory);
        }
        boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
        RedisConnection connToUse = preProcessConnection(conn, existingConnection);
        boolean PipelineStatus = connToUse.isPipelined();
        if (Pipeline && !PipelineStatus) {
            connToUse.openPipeline();
        }
        RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
        //核心方法,這裏就開始執行回調操作
        T result = action.doInRedis(connToExpose);
        // close Pipeline
        if (Pipeline && !PipelineStatus) {
            connToUse.closePipeline();
        }
        // TODO: any other connection processing?
        return postProcessResult(result, connToUse, existingConnection);
    } finally {
        RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
    }
}

我們看到這裏最開始也是獲取對應的連接工廠,然後獲取對應的連接

(enableTransactionSupport=false),具體調用是

RedisConnectionUtils.getConnection(factory) 方法,最終會調用

RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean enableTransactionSupport),此時 bind 爲 false

public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
        boolean enableTransactionSupport) {
    Assert.notNull(factory, "No RedisConnectionFactory specified");
    //直接獲取與當前線程綁定的Redis連接
    RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
    if (connHolder != null) {
        if (enableTransactionSupport) {
            potentiallyRegisterTransactionSynchronisation(connHolder, factory);
        }
        return connHolder.getConnection();
    }
    ......
    return conn;
}

前面我們分析過一次,這裏調用

RedisConnectionHolder connHolder = 

(RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); 會獲取到之前和當前線程綁定的 Redis,而不會新創建一個連接。

然後會去執行 T result = action.

doInRedis(connToExpose),這裏的 action 爲 RedisCallback,執行 doInRedis 爲:

//開啓Pipeline功能
connection.openPipeline();
boolean PipelinedClosed = false;
try {
    //執行Redis命令
    Object result = executeSession(session);
    if (result != null) {
        throw new InvalidDataAccessApiUsageException(
                "Callback cannot return a non-null value as it gets overwritten by the Pipeline");
    }
    List<Object> closePipeline = connection.closePipeline();  PipelinedClosed = true;
    return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {
    if (!PipelinedClosed) {
        connection.closePipeline();
    }
}

這裏最開始會開啓 Pipeline 功能,然後執行

Object result = executeSession(session);

private Object executeSession(SessionCallback<?> session) {
    return session.execute(this);
}

這裏會調用我們自定義的 execute 方法

@AllArgsConstructor
private static class InsertPipelineExecution implements SessionCallback<Void> {
     private final List<Integer> userIds;
     private final Integer iconId;
     @Override
     public <K, V> Void execute(RedisOperations<K, V> operations) throws DataAccessException {
         SetOperations<String, String> setOperations = (SetOperations<String, String>) operations.opsForSet();
         userIds.forEach(userId -> {
             String redisKey = "aaa:" + userId;
             String value = String.valueOf(iconId);
             setOperations.add(redisKey, value);
         });
         return null;
     }
}

進入到 foreach 循環,執行 DefaultSetOperations 的 add 方法。

@Override
public Long add(K key, V... values) {
    byte[] rawKey = rawKey(key);
    byte[][] rawValues = rawValues((Object[]) values);
    //這裏的connection.sAdd是後續回調要執行的方法
   return execute(connection -> connection.sAdd(rawKey, rawValues), true);
}

這裏會繼續執行 redisTemplate 的 execute 方法,裏面最終會調用我們之前分析過的 T execute(RedisCallbackaction, boolean exposeConnection, boolean Pipeline) 方法。

@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean Pipeline) {
    Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
    Assert.notNull(action, "Callback object must not be null");
    RedisConnectionFactory factory = getRequiredConnectionFactory();
    RedisConnection conn = null;
    try {
        ......
        //再次執行回調方法,這裏執行的Redis基本數據結構對應的操作命令
        T result = action.doInRedis(connToExpose);
        ......
        // TODO: any other connection processing?
        return postProcessResult(result, connToUse, existingConnection);
    } finally {
        RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
    }
}

這裏會繼續執行 T result = 

action.doInRedis(connToExpose);,這裏其實執行的 doInRedis 方法爲:

connection -> connection.sAdd(rawKey, rawValues)

4.4.2 Pipeline 方式下執行命令的流程分析:

① 接着上面的流程分析,這裏的 sAdd 方法實際調用的是 DefaultStringRedisConnection 的 sAdd 方法

@Override
public Long sAdd(byte[] key, byte[]... values) {
    return convertAndReturn(delegate.sAdd(key, values), identityConverter);
}

② 這裏會進一步調用

DefaultedRedisConnection 的 sAdd 方法

@Override
@Deprecated
default Long sAdd(byte[] key, byte[]... values) {
    return setCommands().sAdd(key, values);
}

③ 接着調用 LettuceSetCommands 的 sAdd 方法

@Override
public Long sAdd(byte[] key, byte[]... values) {
    Assert.notNull(key, "Key must not be null!");
    Assert.notNull(values, "Values must not be null!");
    Assert.noNullElements(values, "Values must not contain null elements!");
    try {
        // 如果開啓了 Pipelined 模式,獲取的是 異步連接,進行異步操作
        if (isPipelined()) {    Pipeline(connection.newLettuceResult(getAsyncConnection().sadd(key, values)));
            return null;
        }
        if (isQueueing()) {
            transaction(connection.newLettuceResult(getAsyncConnection().sadd(key, values)));
            return null;
        }
        //常規模式下,使用的是同步操作
        return getConnection().sadd(key, values);
    } catch (Exception ex) {
        throw convertLettuceAccessException(ex);
    }
}

這裏我們開啓了 Pipeline, 實際會調用

Pipeline(connection.newLettuceResult(getAsyncConnection().sadd(key, values))); 也就是獲取異步連接 getAsyncConnection,然後進行異步操作 sadd,而常規模式下,使用的是同步操作,所以在 Pipeline 模式下,執行效率更高。

從上面的獲取連接和具體命令執行相關源碼分析可以得出使用 Lettuce 客戶端 Pipeline 模式高效的根本原因:

  1. 普通模式下,每執行一個命令都需要先打開一個連接,命令執行完畢以後又需要關閉這個連接,執行下一個命令時,又需要經過連接打開和關閉的流程;而 Pipeline 的所有命令的執行只需要經過一次連接打開和關閉。

  2. 普通模式下命令的執行是同步阻塞模式,而 Pipeline 模式下命令的執行是異步非阻塞模式。

五、項目中遇到的坑

前面介紹了涉及到批量操作,可以使用 Redis Pipelining 機制,那是不是任何批量操作相關的場景都可以使用呢,比如 list 類型數據的批量移除操作,我們的代碼最開始是這麼寫的:

public void deleteSet(String updateKey, Set<Integer> userIds) {
        if (CollectionUtils.isEmpty(userIds)) {
            return;
        }
        redisTemplate.executePipelined(new DeleteListCallBack(userIds, updateKey));
    }
@AllArgsConstructor
private static class DeleteListCallBack implements SessionCallback<Object> {
    private Set<Integer> userIds;
    private String updateKey;
    @Override
    public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
        ListOperations<String, String> listOperations = (ListOperations<String, String>) operations.opsForList();
        userIds.forEach(userId -> listOperations.remove(updateKey, 1, userId.toString()));
        return null;
    }
}

在數據量比較小的時候沒有出現問題,直到有一條收到了 Redis 的內存和 cpu 利用率的告警消息,我們發現這麼使用是有問題的,核心原因在於 list 的 lrem 操作的時間複雜度是 O(N+M),其中 N 是 list 的長度, M 是要移除的元素的個數,而我們這裏還是一個一個移除的,當然會導致 Redis 數據積壓和 cpu 每秒 ops 升高導致 cpu 利用率飈高。也就是說,即使使用 Pipeline 進行批量操作,但是由於單次操作很耗時,是會導致整個 Redis 出現問題的。

後面我們進行了優化,選用了 list 的 ltrim 命令,一次命令執行批量 remove 操作:

public void deleteSet(String updateKey, Set<Integer> deviceIds) {
        if (CollectionUtils.isEmpty(deviceIds)) {
            return;
        }
        int maxSize = 10000;
        redisTemplate.opsForList().trim(updateKey, maxSize + 1, -1);
    }

由於 ltrim 本身的時間複雜度爲 O(M), 其中 M 要移除的元素的個數,相比於原始方案的 lrem,效率提升很多,可以不需要使用 Redis Pipeline,優化結果使得 Redis 內存利用率和 cpu 利用率都極大程度得到緩解。

六、Redisson 對 Redis Pipeline 特性支持

在 redisson 官方文檔中額外特性介紹中有說到批量命令執行這個特性, 也就是多個命令在一次網絡調用中集中發送,該特性是 RBatch 這個類支持的,從這個類的描述來看,主要是爲 Redis Pipeline 這個特性服務的,並且主要是通過隊列和異步實現的。

/**
 * Interface for using Redis Pipeline feature.
 * <p>
 * All method invocations on objects got through this interface
 * are batched to separate queue and could be executed later
 * with <code>execute()</code> or <code>executeAsync()</code> methods.
 *
 *
 * @author Nikita Koksharov
 *
 */
public interface RBatch {
    /**
     * Returns stream instance by <code>name</code>
     *
     * @param <K> type of key
     * @param <V> type of value
     * @param name of stream
     * @return RStream object
     */
    <K, V> RStreamAsync<K, V> getStream(String name);
    /**
     * Returns stream instance by <code>name</code>
     * using provided <code>codec</code> for entries.
     *
     * @param <K> type of key
     * @param <V> type of value
     * @param name - name of stream
     * @param codec - codec for entry
     * @return RStream object
     */
    <K, V> RStreamAsync<K, V> getStream(String name, Codec codec);
    ......
    /**
     * Returns list instance by name.
     *
     * @param <V> type of object
     * @param name - name of object
     * @return List object
     */
    <V> RListAsync<V> getList(String name);
    <V> RListAsync<V> getList(String name, Codec codec);
    ......
    /**
     * Executes all operations accumulated during async methods invocations.
     * <p>
     * If cluster configuration used then operations are grouped by slot ids
     * and may be executed on different servers. Thus command execution order could be changed
     *
     * @return List with result object for each command
     * @throws RedisException in case of any error
     *
     */
    BatchResult<?> execute() throws RedisException;
    /**
     * Executes all operations accumulated during async methods invocations asynchronously.
     * <p>
     * In cluster configurations operations grouped by slot ids
     * so may be executed on different servers. Thus command execution order could be changed
     *
     * @return List with result object for each command
     */
    RFuture<BatchResult<?>> executeAsync();
    /**
     * Discard batched commands and release allocated buffers used for parameters encoding.
     */
    void discard();
    /**
     * Discard batched commands and release allocated buffers used for parameters encoding.
     *
     * @return void
     */
    RFuture<Void> discardAsync();
}

簡單的測試代碼如下:

@Slf4j
public class RedisPipelineTest {
    public static void main(String[] args) {
        //Redisson使用Pipeline模式
        Config config = new Config();
        config.useSingleServer().setAddress("redis://xx.xx.xx.xx:6379");
        RedissonClient redisson = Redisson.create(config);
        RBatch redisBatch = redisson.createBatch();
        int size = 100000;
        String zSetKey = "Pipeline-test-set";
        long begin = System.currentTimeMillis();
        //將命令放入隊列中
        for (int i = 0; i < size; i++) {
            redisBatch.getSet(zSetKey + i).addAsync("ccc");
        }
        //批量執行命令
        redisBatch.execute();
        log.info("Redisson Pipeline模式耗時:{}ms", (System.currentTimeMillis() - begin));
        //關閉
        redisson.shutdown();
    }
}

核心方法分析:

  1. 建 Redisson 客戶端 RedissonClient redisson = redisson.create(config), 該方法最終會調用 Reddison 的構造方法 Redisson(Config config)。
protected Redisson(Config config) {
        this.config = config;
        Config configCopy = new Config(config);
        connectionManager = ConfigSupport.createConnectionManager(configCopy);
        RedissonObjectBuilder objectBuilder = null;
        if (config.isReferenceEnabled()) {
            objectBuilder = new RedissonObjectBuilder(this);
        }
        //新建異步命令執行器
      commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
        //執行刪除超時任務的定時器
      evictionScheduler = new EvictionScheduler(commandExecutor);
        writeBehindService = new WriteBehindService(commandExecutor);
}

該構造方法中會新建異步命名執行器 CommandAsyncExecutor commandExecutor 和用戶刪除超時任務的 EvictionScheduler evictionScheduler。

  1. 創建 RBatch 實例 RBatch redisBatch = redisson.createBatch(), 該方法會使用到步驟 1 中的 commandExecutor 和 evictionScheduler 實例對象。
@Override
public RBatch createBatch(BatchOptions options) {
    return new RedissonBatch(evictionScheduler, commandExecutor, options);
}
public RedissonBatch(EvictionScheduler evictionScheduler, CommandAsyncExecutor executor, BatchOptions options) {
        this.executorService = new CommandBatchService(executor, options);
        this.evictionScheduler = evictionScheduler;
}

其中的 options 對象會影響後面批量執行命令的流程。

  1. 異步給 set 集合添加元素的操作 addAsync,這裏會具體調用 RedissonSet 的 addAsync 方法
@Override
public RFuture<Boolean> addAsync(V e) {
    String name = getRawName(e);
    return commandExecutor.writeAsync(name, codec, RedisCommands.SADD_SINGLE, name, encode(e));
}

(1)接着調用 CommandAsyncExecutor 的異步寫入方法 writeAsync。

@Override
public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
    RPromise<R> mainPromise = createPromise();
    NodeSource source = getNodeSource(key);
    async(false, source, codec, command, params, mainPromise, false);
    return mainPromise;
}

(2) 接着調用批量命令執行器

CommandBatchService 的異步發送命令。

@Override
public <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
        Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect) {
    if (isRedisBasedQueue()) {
        boolean isReadOnly = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC;
        RedisExecutor<V, R> executor = new RedisQueuedBatchExecutor<>(isReadOnly, nodeSource, codec, command, params, mainPromise,
                false, connectionManager, objectBuilder, commands, connections, options, index, executed, latch, referenceType);
        executor.execute();
    } else {
        //執行分支
        RedisExecutor<V, R> executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise,
                false, connectionManager, objectBuilder, commands, options, index, executed, referenceType);
        executor.execute();
    }
}

(3) 接着調用了 RedisBatchExecutor.

execute 方法和 BaseRedisBatchExecutor.

addBatchCommandData 方法。

@Override
public void execute() {
    addBatchCommandData(params);
}
protected final void addBatchCommandData(Object[] batchParams) {
    MasterSlaveEntry msEntry = getEntry(source);
    Entry entry = commands.get(msEntry);
    if (entry == null) {
        entry = new Entry();
        Entry oldEntry = commands.putIfAbsent(msEntry, entry);
        if (oldEntry != null) {
            entry = oldEntry;
        }
    }
    if (!readOnlyMode) {
        entry.setReadOnlyMode(false);
    }
    Codec codecToUse = getCodec(codec);
    BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codecToUse, command, batchParams, index.incrementAndGet());
    entry.getCommands().add(commandData);
}

這裏的 commands 以主節點爲 KEY, 以待發送命令隊列列表爲 VALUE(Entry),保存一個 MAP. 然後會把命令都添加到 entry 的 commands 命令隊列中, Entry 結構如下面代碼所示。

public static class Entry {
    Deque<BatchCommandData<?, ?>> commands = new LinkedBlockingDeque<>();
    volatile boolean readOnlyMode = true;
    public Deque<BatchCommandData<?, ?>> getCommands() {
        return commands;
    }
    public void setReadOnlyMode(boolean readOnlyMode) {
        this.readOnlyMode = readOnlyMode;
    }
    public boolean isReadOnlyMode() {
        return readOnlyMode;
    }
    public void clearErrors() {
        for (BatchCommandData<?, ?> commandEntry : commands) {
            commandEntry.clearError();
        }
    }
}
  1. 批量執行命令 redisBatch.execute(),這裏會最終調用 CommandBatchService 的 executeAsync 方法,該方法完整代碼如下,我們下面來逐一進行拆解。
public RFuture<BatchResult<?>> executeAsync() {
        ......
        RPromise<BatchResult<?>> promise = new RedissonPromise<>();
        RPromise<Void> voidPromise = new RedissonPromise<Void>();
        if (this.options.isSkipResult()
                && this.options.getSyncSlaves() == 0) {
            ......
        } else {
            //這裏是對異步執行結果進行處理,可以先忽略, 後面會詳細講,先關注批量執行命令的邏輯
            voidPromise.onComplete((res, ex) -> {
                ......
            });
        }
        AtomicInteger slots = new AtomicInteger(commands.size());
        ......
        //真正執行的代碼入口,批量執行命令
        for (Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
            RedisCommonBatchExecutor executor = new RedisCommonBatchExecutor(new NodeSource(e.getKey()), voidPromise,
                                                    connectionManager, this.options, e.getValue(), slots, referenceType);
            executor.execute();
        }
        return promise;
    }

裏面會用到我們在 3.3 步驟所生成的 commands 實例。

(1)接着調用了基類 RedisExecutor 的 execute 方法

public void execute() {
        ......
        connectionFuture.onComplete((connection, e) -> {
            if (connectionFuture.isCancelled()) {
                connectionManager.getShutdownLatch().release();
                return;
            }
            if (!connectionFuture.isSuccess()) {
                connectionManager.getShutdownLatch().release();
                exception = convertException(connectionFuture);
                return;
            }
            //調用RedisCommonBatchExecutor的sendCommand方法, 裏面會將多個命令放到一個List<CommandData<?, ?>> list列表裏面
        sendCommand(attemptPromise, connection);
            writeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    checkWriteFuture(writeFuture, attemptPromise, connection);
                }
            });
        });
        ......
    }

(2)接着調用

RedisCommonBatchExecutor 的 sendCommand 方法,裏面會將多個命令放到一個 List list 列表裏面。

@Override
    protected void sendCommand(RPromise<Void> attemptPromise, RedisConnection connection) {
        boolean isAtomic = options.getExecutionMode() != ExecutionMode.IN_MEMORY;
        boolean isQueued = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC
                                || options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC;
        //將多個命令放到一個List<CommandData<?, ?>> list列表裏面
      List<CommandData<?, ?>> list = new ArrayList<>(entry.getCommands().size());
        if (source.getRedirect() == Redirect.ASK) {
            RPromise<Void> promise = new RedissonPromise<Void>();
            list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));
        }
        for (CommandData<?, ?> c : entry.getCommands()) {
            if ((c.getPromise().isCancelled() || c.getPromise().isSuccess())
                    && !isWaitCommand(c)
                        && !isAtomic) {
                // skip command
                continue;
            }
            list.add(c);
        }
        ......
        //調用RedisConnection的send方法,將命令一次性發到Redis服務器端
      writeFuture = connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued, options.getSyncSlaves() > 0));
    }

(3)接着調用 RedisConnection 的 send 方法,通過 Netty 通信發送命令到 Redis 服務器端執行,這裏也驗證了 Redisson 客戶端底層是採用 Netty 進行通信的。

public ChannelFuture send(CommandsData data) {
        return channel.writeAndFlush(data);
}
  1. 接收返回結果,這裏主要是監聽事件是否完成,然後組裝返回結果, 核心方法是步驟 4 提到的 CommandBatchService 的 executeAsync 方法, 裏面會對返回結果進行監聽和處理, 核心代碼如下:
public RFuture<BatchResult<?>> executeAsync() {
    ......
    RPromise<BatchResult<?>> promise = new RedissonPromise<>();
    RPromise<Void> voidPromise = new RedissonPromise<Void>();
    if (this.options.isSkipResult()
            && this.options.getSyncSlaves() == 0) {
        ......
    } else {
        voidPromise.onComplete((res, ex) -> {
            //對返回結果的處理
            executed.set(true);
            ......
            List<Object> responses = new ArrayList<Object>(entries.size());
            int syncedSlaves = 0;
            for (BatchCommandData<?, ?> commandEntry : entries) {
                if (isWaitCommand(commandEntry)) {
                    syncedSlaves = (Integer) commandEntry.getPromise().getNow();
                } else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
                        && !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())
                        && !this.options.isSkipResult()) {
                    ......
                    //獲取單個命令的執行結果
                    Object entryResult = commandEntry.getPromise().getNow();
                    ......
                    //將單個命令執行結果放到List中
                    responses.add(entryResult);
                }
            }
            BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves);
            promise.trySuccess(result);
            ......
        });
    }
    ......
    return promise;
}

這裏會把單個命令的執行結果放到 responses 裏面,最終返回 RPromisepromise。

從上面的分析來看,Redisson 客戶端對 Redis Pipeline 的支持也是從多個命令在一次網絡通信中執行和異步處理來實現的。

七、總結

Redis 提供了 Pipelining 進行批量操作的高級特性,極大地提高了部分數據類型沒有批量執行命令導致的執行耗時而引起的性能問題,但是我們在使用的過程中需要考慮 Pipeline 操作中單個命令執行的耗時問題,否則帶來的效果可能適得其反。最後擴展分析了 Redisson 客戶端對 Redis Pipeline 特性的支持原理,可以與 Lettuce 客戶端對 Redis Pipeline 支持原理進行比較,加深 Pipeline 在不同 Redis 客戶端實現方式的理解。

參考資料:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/i6jRZwsqy-Iv_TKAjHAJnw