注:参考文末文章,加上自己的理解。
有一个 base_table 表存放的是 12 月 15 日之前的所有数据,当 12 月 16 日的数据产生后,生成了一个 incremental_table 表。
现在需要,将 incremental_table 这个增量表的数据更新到 base_table 表中。
那么,就有两种情况:
(1)保留历史数据
通过拉链表实现:
创建一个拉链表;
使用初始全量载入到拉链表中;
将每日增量数据 INSERT OVERWRITE 到拉链表中。
这样的话,就会存在重复的数据,保留了历史数据。
(2)不保留了历史数据
方法1:先将 base_table 表和 incremental_table 表 left join,将修改的数据覆盖写到 base_table 表,通过 union 将新增数据插入到 base_table 表。
方法2:union all base_table 表和 incremental_table 表,再取更新时间最新记录。
这样,就不会存在重复的数据,但是没有了历史数据。
通过拉链表实现
(1)建表
create table incremental_table (
id string,
name string,
addr string
) comment '增量表'
partitioned by (dt string)
row format delimited fields terminated by ','
stored as textfile;
create table base_table (
id string,
name string,
addr string
) comment '主表'
partitioned by (dt string)
row format delimited fields terminated by ','
stored as textfile;
(2)数据
源数据incre0.txt
1,lijie,chongqing
2,zhangshan,sz
3,lisi,shanghai
4,wangwu,usa
增量数据incre1.txt
1,lijie,chengdu # 地址变了
2,zhangshan,huoxing # 地址变了
4,wangwu,lalalala # 地址变了
5,xinzeng,hehe # 新增数据
(3)将 incre0.txt 导入主表中,将 incre0.txt 和 incre1.txt 导入增量表中
load data local inpath '/root/data/incre0.txt' overwrite into table base_table partition (dt='20191020');
hive> select * from base_table;
OK
1 lijie chongqing 20191020
2 zhangshan sz 20191020
3 lisi shanghai 20191020
4 wangwu usa 20191020
load data local inpath '/root/data/incre0.txt' overwrite into table incremental_table partition (dt='20191020');
load data local inpath '/root/data/incre1.txt' overwrite into table incremental_table partition (dt='20191021');
hive> select * from incremental_table;
OK
1 lijie chongqing 20191020
2 zhangshan sz 20191020
3 lisi shanghai 20191020
4 wangwu usa 20191020
1 lijie chengdu 20191021
2 zhangshan huoxing 20191021
4 wangwu lalalala 20191021
5 xinzeng hehe 20191021
-- 新建拉链表
create table zipper_table (
id string,
name string,
addr string,
start_date string,
end_date string
) comment '拉链表'
row format delimited fields terminated by ','
stored as textfile;
-- 将主表中的数据导入拉链表进行初始化(初始全量载入到拉链表中)
insert into table zipper_table
select id,
name,
addr,
dt as start_date,
'99991231' as end_date
from base_table
where dt='20191020';
hive> select * from zipper_table;
OK
1 lijie chongqing 20191020 99991231
2 zhangshan sz 20191020 99991231
3 lisi shanghai 20191020 99991231
4 wangwu usa 20191020 99991231
-- 将每日增量数据 INSERT OVERWRITE 到拉链表中
-- 也可以使用 hive 的 merge into 语法
insert overwrite table zipper_table
select * from
(
select a.id,
a.name,
a.addr,
a.start_date,
case
when a.end_date='99991231' and b.id is not null then '20191020'
else a.end_date
end as end_date
from zipper_table as a
left join (select * from incremental_table where dt='20191021') as b
on a.id=b.id
union
select c.id,
c.name,
c.addr,
'20191021' as start_date,
'99991231' as end_date
from incremental_table c
where c.dt='20191021'
) as t;
hive> select * from zipper_table;
OK
1 lijie chengdu 20191021 99991231
1 lijie chongqing 20191020 20191020
2 zhangshan huoxing 20191021 99991231
2 zhangshan sz 20191020 20191020
3 lisi shanghai 20191020 99991231
4 wangwu lalalala 20191021 99991231
4 wangwu usa 20191020 20191020
5 xinzeng hehe 20191021 99991231
hive> select * from incremental_table;
OK
1 lijie chongqing 20191020
2 zhangshan sz 20191020
3 lisi shanghai 20191020
4 wangwu usa 20191020
1 lijie chengdu 20191021
2 zhangshan huoxing 20191021
4 wangwu lalalala 20191021
5 xinzeng hehe 20191021
先将 base_table 表和 incremental_table 表 left join,将未修改的数据覆盖写到 base_table 表,再将修改的数据插入到 base_table 表。
hive> select * from base_table;
OK
1 lijie chongqing 20191020
2 zhangshan sz 20191020
3 lisi shanghai 20191020
4 wangwu usa 20191020
hive> select * from incremental_table;
OK
1 lijie chongqing 20191020
2 zhangshan sz 20191020
3 lisi shanghai 20191020
4 wangwu usa 20191020
1 lijie chengdu 20191021
2 zhangshan huoxing 20191021
4 wangwu lalalala 20191021
5 xinzeng hehe 20191021
insert overwrite table base_table
select a.id,
a.name,
a.addr,
a.dt
from base_table a
left join (select * from incremental_table where dt='20191021') b
on a.id=b.id
where b.id is null
union all
select c.id,
c.name,
c.addr,
c.dt
from (select * from incremental_table where dt='20191021') c;
hive> select * from base_table;
OK
3 lisi shanghai 20191020
1 lijie chengdu 20191021
2 zhangshan huoxing 20191021
4 wangwu lalalala 20191021
5 xinzeng hehe 20191021
union all base_table 表和 incremental_table 表,再取更新时间最新的记录。
【可以通过窗口函数编一个序号,也可以使用 hive 的预定义属性最近更新时间字段】
hive> select * from base_table;
OK
1 lijie chongqing 20191020
2 zhangshan sz 20191020
3 lisi shanghai 20191020
4 wangwu usa 20191020
hive> select * from incremental_table;
OK
1 lijie chongqing 20191020
2 zhangshan sz 20191020
3 lisi shanghai 20191020
4 wangwu usa 20191020
1 lijie chengdu 20191021
2 zhangshan huoxing 20191021
4 wangwu lalalala 20191021
5 xinzeng hehe 20191021
insert overwrite table base_table
select b.id,b.name,b.addr,b.dt
from
(
select a.*,
row_number() over(distribute by a.id sort by a.dt desc) as rn
from
(
select id,name,addr,dt from base_table
union all
select id,name,addr,dt from incremental_table where dt='20191021'
) a
) b
where b.rn=1;
hive> select * from base_table;
OK
3 lisi shanghai 20191020
1 lijie chengdu 20191021
2 zhangshan huoxing 20191021
4 wangwu lalalala 20191021
5 xinzeng hehe 20191021
参考地址:
https://www.cnblogs.com/lxbmaomao/p/9821128.html
手机扫一扫
移动阅读更方便
你可能感兴趣的文章