
在 hive-.xml 里有两个配置可以确定 reduce task 的数量。
以下是这两个参数在 hive-default.xml 的定义和默认值。
调大 hive.exec.reducers.maxhive.exec.reducers.bytes.per.reducer 256000000 size per reducer.The default is 256Mb, i.e if the input size is 1G, it will use 4 reducers. hive.exec.reducers.max 1009 max number of reducers will be used. If the one specified in the configuration parameter mapred.reduce.tasks is negative, Hive will use this one as the max number of reducers when automatically determine number of reducers.
如果处理的数据量大,而 reduce 的个数仅有 1099,特别是 reduce 任务出现 OOM 时,则需要在任务中设置参数 hive.exec.reducers.max。
如 set hive.exec.reducers.max=4999;,则说明最大的 reduce 数量调整到 4999。
第 1 步中的数据量怎么计算? 通过代码得知,不是 Map 处理的存储在 HDFS 上的文件的大小相加。
我们做一个测试。
hive> select length(c1),count(1) from hdfs.t3_parquet_snappy group by length(c1);
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 5 5 0 0 0 0
Reducer 2 ...... container SUCCEEDED 7 7 0 0 0 0
可以看到,用了 5 个 Map 任务,但是 7 个 reducer 任务。
看下文件的数据量,只有 172.9M。hive> dfs -du -s -h hdfs://localhost:9000/user/hive/warehouse/hdfs.db/t3_parquet_snappy; 172.9 M 172.9 M hdfs://localhost:9000/user/hive/warehouse/hdfs.db/t3_parquet_snappy表的信息
hive> desc extended hdfs.t3_parquet_snappy;
OK
c1 string
Detailed Table Information Table(tableName:t3_parquet_snappy, dbName:hdfs, owner:houzhizhen, createTime:1637661315, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:c1, type:string, comment:null)], location:hdfs://localhost:9000/user/hive/warehouse/hdfs.db/t3_parquet_snappy, inputFormat:org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{totalSize=181313115, numRows=8313906, rawDataSize=1577233803, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true","COLUMN_STATS":{"c1":"true"}}, numFiles=30, transient_lastDdlTime=1637666614, bucketing_version=2, parquet.compression=SNAPPY}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, rewriteEnabled:false, catName:hive, ownerType:USER)
Time taken: 0.069 seconds, Fetched: 3 row(s)
可以看到,表有属性 rawDataSize=1577233803。 1577233803/256000000= 6.16,向上取整,得到 7。
totalSize如果表的信息里没有rawDataSize,但是有 totalSize,代表现在的存储的文件大小之和。Hive 使用压缩比参数 hive.stats.deserialization.factor, 默认是10.0。得到原始数据量,再除以 256000000,得到 reduce 的数量。
如果表里既没有 rawDataSize,也没有 totalSize。则通过文件系统的接口,得到表的文件的大小,即 totalSize,剩下的计算和 totalSize 一样。
分区的表的计算方法和表一样,每个分区看是否有 rawDataSize 或者 totalSize,如果没有,则通过文件系统接口计算。