数仓增量更新hive实现
阅读原文时间:2023年07月08日阅读:8

注:参考文末文章,加上自己的理解。

有一个 base_table 表存放的是 12 月 15 日之前的所有数据,当 12 月 16 日的数据产生后,生成了一个 incremental_table 表。

现在需要,将 incremental_table 这个增量表的数据更新到 base_table 表中。

那么,就有两种情况:

(1)保留历史数据

通过拉链表实现:

创建一个拉链表;

使用初始全量载入到拉链表中;

将每日增量数据 INSERT OVERWRITE 到拉链表中。

这样的话,就会存在重复的数据,保留了历史数据。

(2)不保留了历史数据

方法1:先将 base_table 表和 incremental_table 表 left join,将修改的数据覆盖写到 base_table 表,通过 union 将新增数据插入到 base_table 表。

方法2:union all base_table 表和 incremental_table 表,再取更新时间最新记录。

这样,就不会存在重复的数据,但是没有了历史数据。

通过拉链表实现

2.1、准备工作

(1)建表

create table incremental_table (
    id string,
    name string,
    addr string
) comment '增量表'
partitioned by (dt string)
row format delimited fields terminated by ','
stored as textfile;

create table base_table (
    id string,
    name string,
    addr string
) comment '主表'
partitioned by (dt string)
row format delimited fields terminated by ','
stored as textfile;

(2)数据

源数据incre0.txt

1,lijie,chongqing
2,zhangshan,sz
3,lisi,shanghai
4,wangwu,usa

增量数据incre1.txt

1,lijie,chengdu      # 地址变了
2,zhangshan,huoxing  # 地址变了
4,wangwu,lalalala    # 地址变了
5,xinzeng,hehe       # 新增数据

(3)将 incre0.txt 导入主表中,将 incre0.txt 和 incre1.txt 导入增量表中

load data local inpath '/root/data/incre0.txt' overwrite into table base_table partition (dt='20191020');

hive> select * from base_table;
OK
1       lijie   chongqing       20191020
2       zhangshan       sz      20191020
3       lisi    shanghai        20191020
4       wangwu  usa     20191020

load data local inpath '/root/data/incre0.txt' overwrite into table incremental_table partition (dt='20191020');

load data local inpath '/root/data/incre1.txt' overwrite into table incremental_table partition (dt='20191021');

hive> select * from incremental_table;
OK
1       lijie   chongqing       20191020
2       zhangshan       sz      20191020
3       lisi    shanghai        20191020
4       wangwu  usa     20191020
1       lijie   chengdu 20191021
2       zhangshan       huoxing 20191021
4       wangwu  lalalala        20191021
5       xinzeng hehe    20191021

2.2、导入

-- 新建拉链表
create table zipper_table (
    id string,
    name string,
    addr string,
    start_date string,
    end_date string
) comment '拉链表'
row format delimited fields terminated by ','
stored as textfile;

-- 将主表中的数据导入拉链表进行初始化(初始全量载入到拉链表中)
insert into table zipper_table
    select id,
           name,
           addr,
           dt as start_date,
           '99991231' as end_date
    from base_table
    where dt='20191020';

hive> select * from zipper_table;
OK
1       lijie   chongqing       20191020        99991231
2       zhangshan       sz      20191020        99991231
3       lisi    shanghai        20191020        99991231
4       wangwu  usa     20191020        99991231

-- 将每日增量数据 INSERT OVERWRITE 到拉链表中
-- 也可以使用 hive 的 merge into 语法
insert overwrite table zipper_table
select * from
(
    select a.id,
           a.name,
           a.addr,
           a.start_date,
           case
                when a.end_date='99991231' and b.id is not null then '20191020'
                else a.end_date
           end as end_date
    from zipper_table as a
    left join (select * from incremental_table where dt='20191021') as b
    on a.id=b.id
union
    select c.id,
           c.name,
           c.addr,
           '20191021' as start_date,
           '99991231' as end_date
    from incremental_table c
    where c.dt='20191021'
) as t;

hive> select * from zipper_table;
OK
1       lijie   chengdu 20191021        99991231
1       lijie   chongqing       20191020        20191020
2       zhangshan       huoxing 20191021        99991231
2       zhangshan       sz      20191020        20191020
3       lisi    shanghai        20191020        99991231
4       wangwu  lalalala        20191021        99991231
4       wangwu  usa     20191020        20191020
5       xinzeng hehe    20191021        99991231

hive> select * from incremental_table;
OK
1       lijie   chongqing       20191020
2       zhangshan       sz      20191020
3       lisi    shanghai        20191020
4       wangwu  usa     20191020
1       lijie   chengdu 20191021
2       zhangshan       huoxing 20191021
4       wangwu  lalalala        20191021
5       xinzeng hehe    20191021

3.1、方法1

先将 base_table 表和 incremental_table 表 left join,将未修改的数据覆盖写到 base_table 表,再将修改的数据插入到 base_table 表。

hive> select * from base_table;
OK
1       lijie   chongqing       20191020
2       zhangshan       sz      20191020
3       lisi    shanghai        20191020
4       wangwu  usa     20191020

hive> select * from incremental_table;
OK
1       lijie   chongqing       20191020
2       zhangshan       sz      20191020
3       lisi    shanghai        20191020
4       wangwu  usa     20191020
1       lijie   chengdu 20191021
2       zhangshan       huoxing 20191021
4       wangwu  lalalala        20191021
5       xinzeng hehe    20191021

insert overwrite table base_table
    select a.id,
           a.name,
           a.addr,
           a.dt
    from base_table a
    left join (select * from incremental_table where dt='20191021') b
    on a.id=b.id
    where b.id is null
    union all
    select c.id,
           c.name,
           c.addr,
           c.dt
    from (select * from incremental_table where dt='20191021') c;

hive> select * from base_table;
OK
3       lisi    shanghai        20191020
1       lijie   chengdu 20191021
2       zhangshan       huoxing 20191021
4       wangwu  lalalala        20191021
5       xinzeng hehe    20191021

3.2、方法2

union all base_table 表和 incremental_table 表,再取更新时间最新的记录。

【可以通过窗口函数编一个序号,也可以使用 hive 的预定义属性最近更新时间字段】

hive> select * from base_table;
OK
1       lijie   chongqing       20191020
2       zhangshan       sz      20191020
3       lisi    shanghai        20191020
4       wangwu  usa     20191020

hive> select * from incremental_table;
OK
1       lijie   chongqing       20191020
2       zhangshan       sz      20191020
3       lisi    shanghai        20191020
4       wangwu  usa     20191020
1       lijie   chengdu 20191021
2       zhangshan       huoxing 20191021
4       wangwu  lalalala        20191021
5       xinzeng hehe    20191021

insert overwrite table base_table
select b.id,b.name,b.addr,b.dt
from
(
    select a.*,
           row_number() over(distribute by a.id sort by a.dt desc) as rn
    from
    (
        select id,name,addr,dt from base_table
        union all
        select id,name,addr,dt from incremental_table where dt='20191021'
    ) a
) b
where b.rn=1;

hive> select * from base_table;
OK
3       lisi    shanghai        20191020
1       lijie   chengdu 20191021
2       zhangshan       huoxing 20191021
4       wangwu  lalalala        20191021
5       xinzeng hehe    20191021

参考地址:

https://www.cnblogs.com/lxbmaomao/p/9821128.html

https://blog.csdn.net/qq_20641565/article/details/52763663

https://blog.csdn.net/qq_20641565/article/details/53164155?utm_medium=distribute.pc_relevant_t0.none-task-blog-BlogCommendFromMachineLearnPai2-1.control&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-BlogCommendFromMachineLearnPai2-1.control

https://blog.csdn.net/ZhouyuanLinli/article/details/86638454?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.control

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章