Flink Hive Source 並行度推斷源碼解析
批讀 Hive
HiveOptions 中有兩個配置
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
key("table.exec.hive.infer-source-parallelism")
.defaultValue(true)
.withDescription(
"If is false, parallelism of source are set by config.\n" +
"If is true, source parallelism is inferred according to splits number.\n");
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =
key("table.exec.hive.infer-source-parallelism.max")
.defaultValue(1000)
.withDescription("Sets max infer parallelism for source operator.");
-
table.exec.hive.infer-source-parallelism:默認值是 true,表示 source 的並行度是根據數據分區數和文件數推斷的,如果設置爲 false 的話表示並行度是以配置的爲準
-
table.exec.hive.infer-source-parallelism.max:默認值是 1000,表示讀取 Hive 數據的 source 最大並行度
這兩個參數只在 HiveParallelismInference 類中使用,觀察到 HiveParallelismInference 類是專門針對 Hive 並行度配置的工具類,代碼如下:
/**
* A utility class to calculate parallelism for Hive connector considering various factors.
*/
class HiveParallelismInference {
private static final Logger LOG = LoggerFactory.getLogger(HiveParallelismInference.class);
private final ObjectPath tablePath;
private final boolean infer;
private final int inferMaxParallelism;
private int parallelism;
HiveParallelismInference(ObjectPath tablePath, ReadableConfig flinkConf) {
this.tablePath = tablePath;
// 獲取 table.exec.hive.infer-source-parallelism 配置並賦值,
this.infer = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM);
// 獲取 table.exec.hive.infer-source-parallelism.max 配置並賦值
this.inferMaxParallelism = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
Preconditions.checkArgument(
inferMaxParallelism >= 1,
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
// 獲取 table.exec.resource.default-parallelism 配置
this.parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
}
/**
* Apply limit to calculate the parallelism.
* Here limit is the limit in query <code>SELECT * FROM xxx LIMIT [limit]</code>.
*/
int limit(Long limit) {
if (limit != null) {
parallelism = Math.min(parallelism, (int) (limit / 1000));
}
// make sure that parallelism is at least 1
return Math.max(1, parallelism);
}
//根據
/**
* Infer parallelism by number of files and number of splits.
* If {@link HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is not set this method does nothing.
*/
HiveParallelismInference infer(
SupplierWithException<Integer, IOException> numFiles,
SupplierWithException<Integer, IOException> numSplits) {
//如果設置 table.exec.hive.infer-source-parallelism 爲 false,則直接跳過了
if (!infer) {
return this;
}
try {
// `createInputSplits` is costly,
// so we try to avoid calling it by first checking the number of files
// which is the lower bound of the number of splits
int lowerBound = logRunningTime("getNumFiles", numFiles);
if (lowerBound >= inferMaxParallelism) {
parallelism = inferMaxParallelism;
return this;
}
int splitNum = logRunningTime("createInputSplits", numSplits);
parallelism = Math.min(splitNum, inferMaxParallelism);
} catch (IOException e) {
throw new FlinkHiveException(e);
}
return this;
}
private int logRunningTime(
String operationName, SupplierWithException<Integer, IOException> supplier) throws IOException {
long startTimeMillis = System.currentTimeMillis();
int result = supplier.get();
LOG.info(
"Hive source({}}) {} use time: {} ms, result: {}",
tablePath,
operationName,
System.currentTimeMillis() - startTimeMillis,
result);
return result;
}
}
可以看到註釋主要是 infer 方法去做的的並行度推斷,該方法有兩個參數 numFiles 和 numSplits,該方法只在 HiveTableSource 類中的 getDataStream 方法中調用,可以查看下圖:
那就來看看這兩個方法的實現:
getNumFiles 方法是用來獲取 Hive 表分區下面的文件數量的:
public static int getNumFiles(List<HiveTablePartition> partitions, JobConf jobConf) throws IOException {
int numFiles = 0;
FileSystem fs = null;
for (HiveTablePartition partition : partitions) {
StorageDescriptor sd = partition.getStorageDescriptor();
org.apache.hadoop.fs.Path inputPath = new org.apache.hadoop.fs.Path(sd.getLocation());
if (fs == null) {
fs = inputPath.getFileSystem(jobConf);
}
// it's possible a partition exists in metastore but the data has been removed
if (!fs.exists(inputPath)) {
continue;
}
numFiles += fs.listStatus(inputPath).length;
}
return numFiles;
}
createInputSplits 方法是用來將 Hive 表分區下的文件分割成邏輯上的 InputSplit,這裏是在 Flink Hive Connector 裏面定義了一個 HiveSourceSplit 類來包裝 InputSplit,包含了 Hive 表分區的信息。
public static List<HiveSourceSplit> createInputSplits(
int minNumSplits,
List<HiveTablePartition> partitions,
JobConf jobConf) throws IOException {
List<HiveSourceSplit> hiveSplits = new ArrayList<>();
FileSystem fs = null;
for (HiveTablePartition partition : partitions) {
StorageDescriptor sd = partition.getStorageDescriptor();
org.apache.hadoop.fs.Path inputPath = new org.apache.hadoop.fs.Path(sd.getLocation());
if (fs == null) {
fs = inputPath.getFileSystem(jobConf);
}
// it's possible a partition exists in metastore but the data has been removed
if (!fs.exists(inputPath)) {
continue;
}
InputFormat format;
try {
format = (InputFormat)
Class.forName(sd.getInputFormat(), true, Thread.currentThread().getContextClassLoader()).newInstance();
} catch (Exception e) {
throw new FlinkHiveException("Unable to instantiate the hadoop input format", e);
}
ReflectionUtils.setConf(format, jobConf);
jobConf.set(INPUT_DIR, sd.getLocation());
//TODO: we should consider how to calculate the splits according to minNumSplits in the future.
org.apache.hadoop.mapred.InputSplit[] splitArray = format.getSplits(jobConf, minNumSplits);
for (org.apache.hadoop.mapred.InputSplit inputSplit : splitArray) {
Preconditions.checkState(inputSplit instanceof FileSplit,
"Unsupported InputSplit type: " + inputSplit.getClass().getName());
hiveSplits.add(new HiveSourceSplit((FileSplit) inputSplit, partition, null));
}
}
return hiveSplits;
}
因爲上面兩個方法的執行可能需要一點時間,所以專門還寫了一個 logRunningTime 記錄其執行的時間。
如果文件數大於配置的最大並行度,那麼作業的並行度直接以配置的最大並行度爲準;否則取 InputSplit 個數與配置的最大並行度兩者最小值。
int lowerBound = logRunningTime("getNumFiles", numFiles);
if (lowerBound >= inferMaxParallelism) {
parallelism = inferMaxParallelism;
return this;
}
int splitNum = logRunningTime("createInputSplits", numSplits);
parallelism = Math.min(splitNum, inferMaxParallelism);
然後就是 limit 方法的限制並行度了:
/**
* Apply limit to calculate the parallelism.
* Here limit is the limit in query <code>SELECT * FROM xxx LIMIT [limit]</code>.
*/
int limit(Long limit) {
if (limit != null) {
parallelism = Math.min(parallelism, (int) (limit / 1000));
}
// make sure that parallelism is at least 1
return Math.max(1, parallelism);
}
這個方法的註釋的意思是根據查詢語句的 limit 來配置並行度,判斷前面得到的並行度與 limit/1000 的大小,取兩者最小值。舉個例子,前面判斷這個 Hive 表分區有非常多的文件,比如 10001 個,那大於默認的最大值 1000,那麼返回的並行度是 1000,但是因爲查詢 Hive 的 SQL 只是 100 條,那麼這裏取值得到的最小值是 0,最後通過 Math.max(1, parallelism) 返回的 source 並行度是 1。
注意⚠️:上面的並行度配置僅僅針對於批作業查 Hive 數據,不針對流讀 Hive 數據。
流讀 Hive
在 HiveTableSource 類中的 getDataStream 方法中並沒有針對流讀配置 Source 並行度。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/IEo88bDwms8T_scKGHEibA