使用 Spring AOP 實現異步文件上傳

背景

相信很多系統裏都有這一種場景:用戶上傳 Excel,後端解析 Excel 生成相應的數據,校驗數據並落庫。

這就引發了一個問題:如果 Excel 的行非常多,或者解析非常複雜,那麼解析 + 校驗的過程就非常耗時。

如果接口是一個同步的接口,則非常容易出現接口超時,進而返回的校驗錯誤信息也無法展示給前端,這就需要從功能上解決這個問題。

一般來說都是啓動一個子線程去做解析工作,主線程正常返回,由子線程記錄上傳狀態 + 校驗結果到數據庫。同時提供一個查詢頁面用於實時查詢上傳的狀態和校驗信息。

進一步的,如果我們每一個上傳的任務都寫一次線程池異步 + 日誌記錄的代碼就顯得非常冗餘。同時,非業務代碼也侵入了業務代碼導致代碼可讀性下降。

從通用性的角度上講,這種業務場景非常適合模板方法的設計模式。即設計一個抽象類,定義上傳的抽象方法,同時實現記錄日誌的方法。

例如:

//僞代碼,省略了一些步驟
@Slf4j
public abstract class AbstractUploadService<T> {
   public static ThreadFactory commonThreadFactory = new ThreadFactoryBuilder().setNameFormat("-upload-pool-%d")
      .setPriority(Thread.NORM_PRIORITY).build();
   public static ExecutorService uploadExecuteService = new ThreadPoolExecutor(10, 20, 300L,
      TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), commonThreadFactory, new ThreadPoolExecutor.AbortPolicy());

   protected abstract String upload(List<T> data);

   protected void execute(String userName, List<T> data) {
      // 生成一個唯一編號
      String uuid = UUID.randomUUID().toString().replace("-""");
      uploadExecuteService.submit(() -> {
         // 記錄日誌
         writeLogToDb(uuid, userName, updateTime, "導入中");
         // 一個字符串,用於記錄upload的校驗信息
         String errorLog = "";
         //執行上傳
         try {
            errorLog = upload(data);
            writeSuccess(uuid, "導入中", updateTime);
         } catch (Exception e) {
            LOGGER.error("導入錯誤", e);
            //計入導入錯誤日誌
            writeFailToDb(uuid, "導入失敗", e.getMessage(), updateTime);
         }
         /**
          * 檢查一下upload是不是返回了錯誤日誌,如果有,需要注意記錄
          *
          * 因爲錯誤日誌可能比較長,
          * 可以寫入一個文件然後上傳到公司的文件服務器,
          * 然後在查看結果的時候允許用戶下載該文件,
          * 這裏不展開只做示意
          */
         if (StringUtils.isNotEmpty(errorLog)) {
            writeFailToDb(uuid, "導入失敗", errorLog, updateTime);
         }

      });
   }
}

如上文所示,模板方法的方式雖然能夠極大地減少重複代碼,但是仍有下面兩個問題:

爲解決上面兩個問題,我也經常進行思考,結果在某次自定義事務提交 or 回滾的方法的時候得到了啓發。

這個上傳的邏輯過程和事務提交的邏輯過程非常像,都是在實際操作前需要做初始化操作,然後在異常或者成功的時候做進一步操作。

這種完全可以通過環裝切面的方式實現,由此,我寫了一個小輪子給團隊使用。(當然了,這個小輪子在本人所在的大團隊內部使用的很好,但是不一定適合其他人,但是思路一樣,大家可以擴展自己的功能)

多說無益,上代碼!

代碼與實現

首先定義一個日誌實體:

public class FileUploadLog {
   private Integer id;
    // 唯一編碼
    private String batchNo;
    // 上傳到文件服務器的文件key
    private String key;
    // 錯誤日誌文件名
    private String fileName;
    //上傳狀態
    private Integer status;
    //上傳人
    private String createName;
    //上傳類型
    private String uploadType;
    //結束時間
    private Date endTime;
    // 開始時間
    private Date startTime;
}

然後定義一個上傳的類型枚舉,用於記錄是哪裏操作的:

public enum UploadType {
   未知(1,"未知"),
   類型2(2,"類型2"),
   類型1(3,"類型1");

   private int code;
   private String desc;
   private static Map<Integer, UploadType> map = new HashMap<>();
   static {
      for (UploadType value : UploadType.values()) {
         map.put(value.code, value);
      }
   }

   UploadType(int code, String desc) {
      this.code = code;
      this.desc = desc;
   }

   public int getCode() {
      return code;
   }

   public String getDesc() {
      return desc;
   }

   public static UploadType getByCode(Integer code) {
      return map.get(code);
   }
}

最後,定義一個註解,用於標識切點:

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Upload {
   // 記錄上傳類型
   UploadType type() default UploadType.未知;
}

然後,編寫切面:

@Component
@Aspect
@Slf4j
public class UploadAspect {
   public static ThreadFactory commonThreadFactory = new ThreadFactoryBuilder().setNameFormat("upload-pool-%d")
      .setPriority(Thread.NORM_PRIORITY).build();
   public static ExecutorService uploadExecuteService = new ThreadPoolExecutor(10, 20, 300L,
      TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), commonThreadFactory, new ThreadPoolExecutor.AbortPolicy());


   @Pointcut("@annotation(com.aaa.bbb.Upload)")
   public void uploadPoint() {}

   @Around(value = "uploadPoint()")
   public Object uploadControl(ProceedingJoinPoint pjp) {
       // 獲取方法上的註解,進而獲取uploadType
      MethodSignature signature = (MethodSignature)pjp.getSignature();
      Upload annotation = signature.getMethod().getAnnotation(Upload.class);
      UploadType type = annotation == null ? UploadType.未知 : annotation.type();
      // 獲取batchNo
      String batchNo = UUID.randomUUID().toString().replace("-""");
      // 初始化一條上傳的日誌,記錄開始時間
      writeLogToDB(batchNo, type, new Date)
      // 線程池啓動異步線程,開始執行上傳的邏輯,pjp.proceed()就是你實現的上傳功能
      uploadExecuteService.submit(() -> {
         try {
            String errorMessage = pjp.proceed();
            // 沒有異常直接成功
            if (StringUtils.isEmpty(errorMessage)) {
                // 成功,寫入數據庫,具體不展開了
                writeSuccessToDB(batchNo);
            } else {
                // 失敗,因爲返回了校驗信息
                fail(errorMessage, batchNo);
            }
         } catch (Throwable e) {
            LOGGER.error("導入失敗:", e);
            // 失敗,拋了異常,需要記錄
            fail(e.toString(), batchNo);
         }
      });
      return new Object();
   }

   private void fail(String message, String batchNo) {
       // 生成上傳錯誤日誌文件的文件key
      String s3Key = UUID.randomUUID().toString().replace("-""");
      // 生成文件名稱
      String fileName = "錯誤日誌_" +
         DateUtil.dateToString(new Date()"yyyy年MM月dd日HH時mm分ss秒") + ExportConstant.txtSuffix;
      String filePath = "/home/xxx/xxx/" + fileName;
      // 生成一個文件,寫入錯誤數據
      File file = new File(filePath);
      OutputStream outputStream = null;
      try {
         outputStream = new FileOutputStream(file);
         outputStream.write(message.getBytes());

      } catch (Exception e) {
         LOGGER.error("寫入文件錯誤", e);
      } finally {
         try {
            if (outputStream != null)
               outputStream.close();
         } catch (Exception e) {
            LOGGER.error("關閉錯誤", e);
         }
      }
      // 上傳錯誤日誌文件到文件服務器,我們用的是s3
      upFileToS3(file, s3Key);
      // 記錄上傳失敗,同時記錄錯誤日誌文件地址到數據庫,方便用戶查看錯誤信息
      writeFailToDB(batchNo, s3Key, fileName);
      // 刪除文件,防止硬盤爆炸
      deleteFile(file)
   }

}

至此整個異步上傳功能就完成了,是不是很簡單?(笑)

那麼怎麼使用呢?更簡單,只需要在 service 層加入註解即可,頂多就是把錯誤信息 return 出去。

@Upload(type = UploadType.類型1)
public String upload(List<ClassOne> items)  {
   if (items == null || items.size() == 0) {
      return;
   }
   //校驗
   String error = uploadCheck(items);
   if (StringUtils.isNotEmpty) {
       return error;
   }
   //刪除舊的
   deleteAll();
   //插入新的
   batchInsert(items);
}

結語

寫了個小輪子提升團隊整體開發效率感覺真不錯。程序員的最高品質就是解放雙手(偷懶?),然後成功的用自己寫的代碼把自己幹畢業......

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/-Mmt_c0h2hbcOzdTLrKZRg