In previous article, we use sample datasets to join two tables in Hive. To promote the performance of table join, we could also use Partition or Bucket. Let’s first create a parquet format table with partition and bucket:
CREATE TABLE employee_p (
employee_id INT,
birthday DATE,
first_name STRING,
family_name STRING,
work_day DATE)
PARTITIONED BY (gender CHAR(1))
CLUSTERED BY (employee_id) INTO 8 BUCKETS
STORED AS PARQUET;
Then import data into it:
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE employee_p PARTITION(gender)
SELECT employee_id, birthday, first_name, family_name, gender, work_day
FROM employee;
But it reports error:
Diagnostic Messages for this Task: Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{},"value":{"_col0":"499795 ","_col1":"1961-07-05","_col2":"Idoia","_col3":"Riefers","_col4":"M","_col5":"1986-01-16"}} at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:257) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{},"value":{"_col0":"499795","_col1":"1961-07-05"," _col2":"Idoia","_col3":"Riefers","_col4":"M","_col5":"1986-01-16"}} at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:245) ... 7 more Caused by: org.apache.hadoop.hive.ql.metadata.HiveFatalException: [Error 20004]: Fatal error occurred when node tried to create too many dynamic partitions. The maximu m number of dynamic partitions is controlled by hive.exec.max.dynamic.partitions and hive.exec.max.dynamic.partitions.pernode. Maximum was set to: 100 at org.apache.hadoop.hive.ql.exec.FileSinkOperator.getDynOutPaths(FileSinkOperator.java:922) at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:699) at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:837) at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:97) at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:236) ... 7 more
All the employees have only two genders: “M” and “F”. How could Hive report “too many dynamic partitions”?
To look for the fundamental cause, I use “explain” before my HQL, and finally noticed by this line:
expressions: UDFToInteger(VALUE._col0) (type: int), CAST( VALUE._col1 AS DATE) (type: date), VALUE._col2 (type: string), VALUE._col3 (type: string), CAST( VALUE._col4 AS DATE) (type: date), VALUE._col5 (type: string)
Hive use “_col4” as partition column and it’s type is DATE! So the correct import HQL should put partition column at last:
INSERT OVERWRITE TABLE employee_p PARTITION(gender)
SELECT employee_id, birthday, first_name, family_name, work_day, gender
FROM employee;
We successfully import data by dynamic partitions.
Now we create new parquet format table “salary” (using buckets) and join two tables:
CREATE TABLE salary_p (
employee_id INT,
salary INT,
start_date DATE,
end_date DATE)
CLUSTERED BY (employee_id) INTO 8 BUCKETS
STORED AS PARQUET;
INSERT OVERWRITE TABLE salary_p SELECT * FROM salary;
/* Join two tables */
SELECT e.gender, AVG(s.salary) AS avg_salary
FROM employee_p AS e
JOIN salary_p as s
ON (e.employee_id == s.employee_id)
GROUP BY e.gender;
The join operation only cost 90 seconds, much smaller than previous 140 seconds without bucketing and partitioning.