Flink Hive Source 並行度推斷源碼解析

批讀 Hive

HiveOptions 中有兩個配置

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
        key("table.exec.hive.infer-source-parallelism")
                .defaultValue(true)
                .withDescription(
                        "If is false, parallelism of source are set by config.\n" +
                        "If is true, source parallelism is inferred according to splits number.\n");

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =
        key("table.exec.hive.infer-source-parallelism.max")
                .defaultValue(1000)
                .withDescription("Sets max infer parallelism for source operator.");

這兩個參數只在 HiveParallelismInference 類中使用,觀察到 HiveParallelismInference 類是專門針對 Hive 並行度配置的工具類,代碼如下:

/**
 * A utility class to calculate parallelism for Hive connector considering various factors.
 */
class HiveParallelismInference {

 private static final Logger LOG = LoggerFactory.getLogger(HiveParallelismInference.class);

 private final ObjectPath tablePath;
 private final boolean infer;
 private final int inferMaxParallelism;

 private int parallelism;

 HiveParallelismInference(ObjectPath tablePath, ReadableConfig flinkConf) {
  this.tablePath = tablePath;
        // 獲取 table.exec.hive.infer-source-parallelism 配置並賦值,
  this.infer = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM);
  // 獲取 table.exec.hive.infer-source-parallelism.max 配置並賦值
        this.inferMaxParallelism = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
  Preconditions.checkArgument(
   inferMaxParallelism >= 1,
   HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
        // 獲取 table.exec.resource.default-parallelism 配置
  this.parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
 }

 /**
  * Apply limit to calculate the parallelism.
  * Here limit is the limit in query <code>SELECT * FROM xxx LIMIT [limit]</code>.
  */
 int limit(Long limit) {
  if (limit != null) {
   parallelism = Math.min(parallelism, (int) (limit / 1000));
  }

  // make sure that parallelism is at least 1
  return Math.max(1, parallelism);
 }

    //根據
 /**
  * Infer parallelism by number of files and number of splits.
  * If {@link HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is not set this method does nothing.
  */
 HiveParallelismInference infer(
   SupplierWithException<Integer, IOException> numFiles,
   SupplierWithException<Integer, IOException> numSplits) {
        //如果設置 table.exec.hive.infer-source-parallelism 爲 false,則直接跳過了
  if (!infer) {
   return this;
  }

  try {
   // `createInputSplits` is costly,
   // so we try to avoid calling it by first checking the number of files
   // which is the lower bound of the number of splits
   int lowerBound = logRunningTime("getNumFiles", numFiles);
   if (lowerBound >= inferMaxParallelism) {
    parallelism = inferMaxParallelism;
    return this;
   }

   int splitNum = logRunningTime("createInputSplits", numSplits);
   parallelism = Math.min(splitNum, inferMaxParallelism);
  } catch (IOException e) {
   throw new FlinkHiveException(e);
  }
  return this;
 }

 private int logRunningTime(
   String operationName, SupplierWithException<Integer, IOException> supplier) throws IOException {
  long startTimeMillis = System.currentTimeMillis();
  int result = supplier.get();
  LOG.info(
   "Hive source({}}) {} use time: {} ms, result: {}",
   tablePath,
   operationName,
   System.currentTimeMillis() - startTimeMillis,
   result);
  return result;
 }
}

可以看到註釋主要是 infer 方法去做的的並行度推斷,該方法有兩個參數 numFiles 和 numSplits,該方法只在 HiveTableSource 類中的 getDataStream 方法中調用,可以查看下圖:

那就來看看這兩個方法的實現:

getNumFiles 方法是用來獲取 Hive 表分區下面的文件數量的:

public static int getNumFiles(List<HiveTablePartition> partitions, JobConf jobConf) throws IOException {
    int numFiles = 0;
    FileSystem fs = null;
    for (HiveTablePartition partition : partitions) {
        StorageDescriptor sd = partition.getStorageDescriptor();
        org.apache.hadoop.fs.Path inputPath = new org.apache.hadoop.fs.Path(sd.getLocation());
        if (fs == null) {
            fs = inputPath.getFileSystem(jobConf);
        }
        // it's possible a partition exists in metastore but the data has been removed
        if (!fs.exists(inputPath)) {
            continue;
        }
        numFiles += fs.listStatus(inputPath).length;
    }
    return numFiles;
}

createInputSplits 方法是用來將 Hive 表分區下的文件分割成邏輯上的 InputSplit,這裏是在 Flink Hive Connector 裏面定義了一個 HiveSourceSplit 類來包裝 InputSplit,包含了 Hive 表分區的信息。

public static List<HiveSourceSplit> createInputSplits(
        int minNumSplits,
        List<HiveTablePartition> partitions,
        JobConf jobConf) throws IOException {
    List<HiveSourceSplit> hiveSplits = new ArrayList<>();
    FileSystem fs = null;
    for (HiveTablePartition partition : partitions) {
        StorageDescriptor sd = partition.getStorageDescriptor();
        org.apache.hadoop.fs.Path inputPath = new org.apache.hadoop.fs.Path(sd.getLocation());
        if (fs == null) {
            fs = inputPath.getFileSystem(jobConf);
        }
        // it's possible a partition exists in metastore but the data has been removed
        if (!fs.exists(inputPath)) {
            continue;
        }
        InputFormat format;
        try {
            format = (InputFormat)
                    Class.forName(sd.getInputFormat(), true, Thread.currentThread().getContextClassLoader()).newInstance();
        } catch (Exception e) {
            throw new FlinkHiveException("Unable to instantiate the hadoop input format", e);
        }
        ReflectionUtils.setConf(format, jobConf);
        jobConf.set(INPUT_DIR, sd.getLocation());
        //TODO: we should consider how to calculate the splits according to minNumSplits in the future.
        org.apache.hadoop.mapred.InputSplit[] splitArray = format.getSplits(jobConf, minNumSplits);
        for (org.apache.hadoop.mapred.InputSplit inputSplit : splitArray) {
            Preconditions.checkState(inputSplit instanceof FileSplit,
                    "Unsupported InputSplit type: " + inputSplit.getClass().getName());
            hiveSplits.add(new HiveSourceSplit((FileSplit) inputSplit, partition, null));
        }
    }

    return hiveSplits;
}

因爲上面兩個方法的執行可能需要一點時間,所以專門還寫了一個 logRunningTime 記錄其執行的時間。

如果文件數大於配置的最大並行度,那麼作業的並行度直接以配置的最大並行度爲準;否則取 InputSplit 個數與配置的最大並行度兩者最小值。

int lowerBound = logRunningTime("getNumFiles", numFiles);
if (lowerBound >= inferMaxParallelism) {
    parallelism = inferMaxParallelism;
    return this;
}

int splitNum = logRunningTime("createInputSplits", numSplits);
parallelism = Math.min(splitNum, inferMaxParallelism);

然後就是 limit 方法的限制並行度了:

/**
 * Apply limit to calculate the parallelism.
 * Here limit is the limit in query <code>SELECT * FROM xxx LIMIT [limit]</code>.
 */
int limit(Long limit) {
    if (limit != null) {
        parallelism = Math.min(parallelism, (int) (limit / 1000));
    }

    // make sure that parallelism is at least 1
    return Math.max(1, parallelism);
}

這個方法的註釋的意思是根據查詢語句的 limit 來配置並行度,判斷前面得到的並行度與 limit/1000 的大小,取兩者最小值。舉個例子,前面判斷這個 Hive 表分區有非常多的文件,比如 10001 個,那大於默認的最大值 1000,那麼返回的並行度是 1000,但是因爲查詢 Hive 的 SQL 只是 100 條,那麼這裏取值得到的最小值是 0,最後通過 Math.max(1, parallelism) 返回的 source 並行度是 1。

注意⚠️:上面的並行度配置僅僅針對於批作業查 Hive 數據,不針對流讀 Hive 數據。

流讀 Hive

在 HiveTableSource 類中的 getDataStream 方法中並沒有針對流讀配置 Source 並行度。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/IEo88bDwms8T_scKGHEibA