In previous article, we use sample datasets to join two tables in Hive. To promote the performance of table join, we could also use Partition or Bucket. Let’s first create a parquet format table with partition and bucket:
1 2 3 4 5 6 7 8 9 |
CREATE TABLE employee_p ( employee_id INT, birthday DATE, first_name STRING, family_name STRING, work_day DATE) PARTITIONED BY (gender CHAR(1)) CLUSTERED BY (employee_id) INTO 8 BUCKETS STORED AS PARQUET; |
Then import data into it:
1 2 3 4 |
SET hive.exec.dynamic.partition.mode=nonstrict; INSERT OVERWRITE TABLE employee_p PARTITION(gender) SELECT employee_id, birthday, first_name, family_name, gender, work_day FROM employee; |
But it reports error:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
Diagnostic Messages for this Task: Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{},"value":{"_col0":"499795 ","_col1":"1961-07-05","_col2":"Idoia","_col3":"Riefers","_col4":"M","_col5":"1986-01-16"}} at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:257) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{},"value":{"_col0":"499795","_col1":"1961-07-05"," _col2":"Idoia","_col3":"Riefers","_col4":"M","_col5":"1986-01-16"}} at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:245) ... 7 more Caused by: org.apache.hadoop.hive.ql.metadata.HiveFatalException: [Error 20004]: Fatal error occurred when node tried to create too many dynamic partitions. The maximu m number of dynamic partitions is controlled by hive.exec.max.dynamic.partitions and hive.exec.max.dynamic.partitions.pernode. Maximum was set to: 100 at org.apache.hadoop.hive.ql.exec.FileSinkOperator.getDynOutPaths(FileSinkOperator.java:922) at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:699) at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:837) at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:97) at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:236) ... 7 more |
All the employees have only two genders: “M” and “F”. How could Hive report “too many dynamic partitions”?
To look for the fundamental cause, I use “explain” before my HQL, and finally noticed by this line:
1 |
expressions: UDFToInteger(VALUE._col0) (type: int), CAST( VALUE._col1 AS DATE) (type: date), VALUE._col2 (type: string), VALUE._col3 (type: string), CAST( VALUE._col4 AS DATE) (type: date), VALUE._col5 (type: string) |
Hive use “_col4” as partition column and it’s type is DATE! So the correct import HQL should put partition column at last:
1 2 3 |
INSERT OVERWRITE TABLE employee_p PARTITION(gender) SELECT employee_id, birthday, first_name, family_name, work_day, gender FROM employee; |
We successfully import data by dynamic partitions.
Now we create new parquet format table “salary” (using buckets) and join two tables:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
CREATE TABLE salary_p ( employee_id INT, salary INT, start_date DATE, end_date DATE) CLUSTERED BY (employee_id) INTO 8 BUCKETS STORED AS PARQUET; INSERT OVERWRITE TABLE salary_p SELECT * FROM salary; /* Join two tables */ SELECT e.gender, AVG(s.salary) AS avg_salary FROM employee_p AS e JOIN salary_p as s ON (e.employee_id == s.employee_id) GROUP BY e.gender; |
The join operation only cost 90 seconds, much smaller than previous 140 seconds without bucketing and partitioning.