10.Flink实时项目之订单维度表关联
阅读原文时间:2022年04月01日阅读:1

1. 维度查询

在上一篇中,我们已经把订单和订单明细表join完,本文将关联订单的其他维度数据,维度关联实际上就是在流中查询存储在 hbase 中的数据表。但是即使通过主键的方式查询,hbase 速度的查询也是不及流之间的 join。外部数据源的查询常常是流式计算的性能瓶颈,所以我们在查询hbase维度数据的基础上做一些优化及封装。

phoenix查询封装

phoenix作为hbase的一个上层sql封装,或者叫做皮肤,可以使用标准的sql语法来使用hbase,我们做一些简单的查询hbase的工具类。

import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.common.GmallConfig;
import org.apache.commons.beanutils.BeanUtils;
​
import java.io.PrintStream;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
/**
 * @author zhangbaohpu
 * @date 2021/11/13 21:26
 * @desc phoenix 工具类,操作hbase数据
 */
public class PhoenixUtil {
​
    private static Connection conn = null;
​
    public static void init(){
        try {
            Class.forName(GmallConfig.PHOENIX_DRIVER);
            conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
            conn.setSchema(GmallConfig.HBASE_SCHEMA);
        }catch (Exception e){
            e.printStackTrace();
            throw new RuntimeException("连接phoenix失败 -> " + e.getMessage());
        }
    }
​
 &nbsp; &nbsp;public static <T> List<T> getList(String sql, Class<T> clazz){
 &nbsp; &nbsp; &nbsp; &nbsp;if(conn == null){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;init();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;PreparedStatement ps = null;
 &nbsp; &nbsp; &nbsp; &nbsp;ResultSet rs = null;
 &nbsp; &nbsp; &nbsp; &nbsp;List<T> resultList = new ArrayList<>();
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//获取数据库对象
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;ps = conn.prepareStatement(sql);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//执行sql语句
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;rs = ps.executeQuery();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//获取元数据
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;ResultSetMetaData metaData = rs.getMetaData();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;while (rs.next()){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//创建对象
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;T rowObj = clazz.newInstance();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//动态给对象赋值
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 1; i <= metaData.getColumnCount(); i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;BeanUtils.setProperty(rowObj,metaData.getColumnName(i),rs.getObject(i));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;resultList.add(rowObj);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }catch (Exception e){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;throw new RuntimeException("phoenix 查询失败 -> " + e.getMessage());
 &nbsp; &nbsp; &nbsp;  }finally {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if(rs!=null){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;rs.close();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  } catch (SQLException throwables) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;throwables.printStackTrace();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if(ps!=null){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;ps.close();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  } catch (SQLException throwables) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;throwables.printStackTrace();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if(conn!=null){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;conn.close();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  } catch (SQLException throwables) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;throwables.printStackTrace();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;return resultList;
 &nbsp;  }
​
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;String sql = "select * from GMALL_REALTIME.BASE_TRADEMARK";
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(getList(sql,JSONObject.class));
 &nbsp;  }
}

有了对hbase的查询,我们再对维度数据的查询做一个封装,根据某个表的id查询维度数据。

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
​
import java.util.List;
/**
 * @author zhangbaohpu
 * @date 2021/11/13 22:24
 * @desc 维度查询封装,底层调用PhoenixUtil
 */
public class DimUtil {
 &nbsp; &nbsp;//直接从 Phoenix 查询,没有缓存
 &nbsp; &nbsp;public static JSONObject getDimInfoNoCache(String tableName, Tuple2<String, String>...
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;colNameAndValue) {
 &nbsp; &nbsp; &nbsp; &nbsp;//组合查询条件
 &nbsp; &nbsp; &nbsp; &nbsp;String wheresql = new String(" where ");
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < colNameAndValue.length; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//获取查询列名以及对应的值
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Tuple2<String, String> nameValueTuple = colNameAndValue[i];
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;String fieldName = nameValueTuple.f0;
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;String fieldValue = nameValueTuple.f1;
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if (i > 0) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wheresql += " and ";
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wheresql += fieldName + "='" + fieldValue + "'";
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;//组合查询 SQL
 &nbsp; &nbsp; &nbsp; &nbsp;String sql = "select * from " + tableName + wheresql;
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("查询维度 SQL:" + sql);
 &nbsp; &nbsp; &nbsp; &nbsp;JSONObject dimInfoJsonObj = null;
 &nbsp; &nbsp; &nbsp; &nbsp;List<JSONObject> dimList = PhoenixUtil.getList(sql, JSONObject.class);
 &nbsp; &nbsp; &nbsp; &nbsp;if (dimList != null && dimList.size() > 0) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//因为关联维度,肯定都是根据 key 关联得到一条记录
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;dimInfoJsonObj = dimList.get(0);
 &nbsp; &nbsp; &nbsp;  }else{
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("维度数据未找到:" + sql);
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;return dimInfoJsonObj;
 &nbsp;  }
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;JSONObject dimInfooNoCache = DimUtil.getDimInfoNoCache("base_trademark",
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Tuple2.of("id", "13"));
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(dimInfooNoCache);
 &nbsp;  }
}

2. 优化1:加入旁路缓存模式

我们在上面实现的功能中,直接查询的 Hbase。外部数据源的查询常常是流式计算的性能瓶颈,所以我们需要在上面实现的基础上进行一定的优化。我们这里使用旁路缓存。

旁路缓存模式是一种非常常见的按需分配缓存的模式。如下图,任何请求优先访问缓存,缓存命中,直接获得数据返回请求。如果未命中则,查询数据库,同时把结果写入缓存以备后续请求使用。

1) 这种缓存策略有几个注意点

缓存要设过期时间,不然冷数据会常驻缓存浪费资源。

要考虑维度数据是否会发生变化,如果发生变化要主动清除缓存。

2) 缓存的选型

一般两种:堆缓存或者独立缓存服务(redis,memcache),

堆缓存,从性能角度看更好,毕竟访问数据路径更短,减少过程消耗。但是管理性差,其他进程无法维护缓存中的数据。

独立缓存服务(redis,memcache)本身性能也不错,不过会有创建连接、网络 IO 等 消耗。但是考虑到数据如果会发生变化,那还是独立缓存服务管理性更强,而且如果数据量特别大,独立缓存更容易扩展。

因为咱们的维度数据都是可变数据,所以这里还是采用 Redis 管理缓存。

代码优化

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;
import java.util.List;
/**
 * @author zhangbaohpu
 * @date 2021/11/13 22:24
 * @desc 维度查询封装,底层调用PhoenixUtil
 */
public class DimUtil {
​
 &nbsp; &nbsp;/**
 &nbsp; &nbsp; * 查询优化
 &nbsp; &nbsp; * redis缓存
 &nbsp; &nbsp; * &nbsp; &nbsp;  类型  string  list set zset hash
 &nbsp; &nbsp; * 这里使用key格式:
 &nbsp; &nbsp; * &nbsp; &nbsp;  key dim:table_name:value  示例:dim:base_trademark:13
 &nbsp; &nbsp; * &nbsp; &nbsp;  value &nbsp; json字符串
 &nbsp; &nbsp; * &nbsp; &nbsp;  过期时间:24*3600
 &nbsp; &nbsp; */
​
 &nbsp; &nbsp;public static JSONObject getDimInfo(String tableName, Tuple2<String, String>...
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;colNameAndValue) {
​
 &nbsp; &nbsp; &nbsp; &nbsp;//组合查询条件
 &nbsp; &nbsp; &nbsp; &nbsp;String wheresql = new String(" where ");
 &nbsp; &nbsp; &nbsp; &nbsp;//redis key
 &nbsp; &nbsp; &nbsp; &nbsp;String redisKey = "dim:"+tableName+":";
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < colNameAndValue.length; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//获取查询列名以及对应的值
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Tuple2<String, String> nameValueTuple = colNameAndValue[i];
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;String fieldName = nameValueTuple.f0;
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;String fieldValue = nameValueTuple.f1;
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if (i > 0) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wheresql += " and ";
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;redisKey += "_";
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wheresql += fieldName + "='" + fieldValue + "'";
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;redisKey += fieldValue;
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;Jedis jedis = null;
 &nbsp; &nbsp; &nbsp; &nbsp;String redisStr = null;
 &nbsp; &nbsp; &nbsp; &nbsp;JSONObject dimInfoJsonObj = null;
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;jedis = RedisUtil.getJedis();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;redisStr = jedis.get(redisKey);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;dimInfoJsonObj = null;
 &nbsp; &nbsp; &nbsp;  } catch (Exception e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("获取redis数据错误");
 &nbsp; &nbsp; &nbsp;  }
​
 &nbsp; &nbsp; &nbsp; &nbsp;if(redisStr!=null && redisStr.length()>0){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;dimInfoJsonObj = JSON.parseObject(redisStr);
 &nbsp; &nbsp; &nbsp;  }else {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//从phoenix中去数据
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//组合查询 SQL
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;String sql = "select * from " + tableName + wheresql;
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("查询维度 SQL:" + sql);
​
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;List<JSONObject> dimList = PhoenixUtil.getList(sql, JSONObject.class);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if (dimList != null && dimList.size() > 0) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//因为关联维度,肯定都是根据 key 关联得到一条记录
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;dimInfoJsonObj = dimList.get(0);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if(jedis!=null){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;jedis.setex(redisKey,3600*24,dimInfoJsonObj.toString());
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }else{
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("维度数据未找到:" + sql);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;//关闭jedis
 &nbsp; &nbsp; &nbsp; &nbsp;if(jedis!=null){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;jedis.close();
 &nbsp; &nbsp; &nbsp;  }
​
 &nbsp; &nbsp; &nbsp; &nbsp;return dimInfoJsonObj;
 &nbsp;  }
​
 &nbsp; &nbsp;public static JSONObject getDimInfoNoCacheById(String tableName, String idValue) {
 &nbsp; &nbsp; &nbsp; &nbsp;return getDimInfoNoCache(tableName,new Tuple2<>("id",idValue));
 &nbsp;  }
 &nbsp; &nbsp;//直接从 Phoenix 查询,没有缓存
 &nbsp; &nbsp;public static JSONObject getDimInfoNoCache(String tableName, Tuple2<String, String>...
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;colNameAndValue) {
 &nbsp; &nbsp; &nbsp; &nbsp;//组合查询条件
 &nbsp; &nbsp; &nbsp; &nbsp;String wheresql = new String(" where ");
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < colNameAndValue.length; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//获取查询列名以及对应的值
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Tuple2<String, String> nameValueTuple = colNameAndValue[i];
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;String fieldName = nameValueTuple.f0;
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;String fieldValue = nameValueTuple.f1;
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if (i > 0) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wheresql += " and ";
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wheresql += fieldName + "='" + fieldValue + "'";
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;//组合查询 SQL
 &nbsp; &nbsp; &nbsp; &nbsp;String sql = "select * from " + tableName + wheresql;
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("查询维度 SQL:" + sql);
 &nbsp; &nbsp; &nbsp; &nbsp;JSONObject dimInfoJsonObj = null;
 &nbsp; &nbsp; &nbsp; &nbsp;List<JSONObject> dimList = PhoenixUtil.getList(sql, JSONObject.class);
 &nbsp; &nbsp; &nbsp; &nbsp;if (dimList != null && dimList.size() > 0) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//因为关联维度,肯定都是根据 key 关联得到一条记录
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;dimInfoJsonObj = dimList.get(0);
 &nbsp; &nbsp; &nbsp;  }else{
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("维度数据未找到:" + sql);
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;return dimInfoJsonObj;
 &nbsp;  }
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;JSONObject dimInfooNoCache = DimUtil.getDimInfoNoCache("base_trademark",
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Tuple2.of("id", "13"));
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(dimInfooNoCache);
 &nbsp;  }
}

缓存依赖于redisUtil.java工具类

import redis.clients.jedis.*;
/**
 * @author zhangbaohpu
 * @date 2021/11/13 23:31
 * @desc
 */
public class RedisUtil {
 &nbsp; &nbsp;public static JedisPool jedisPool=null;
 &nbsp; &nbsp;public static Jedis getJedis(){
 &nbsp; &nbsp; &nbsp; &nbsp;if(jedisPool==null){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;jedisPoolConfig.setMaxTotal(100); //最大可用连接数
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;jedisPoolConfig.setMaxWaitMillis(2000); //等待时间
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;jedisPoolConfig.setMinIdle(5); //最小闲置连接数
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;jedisPool=new JedisPool( jedisPoolConfig, "hadoop101",6379 ,1000);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("开辟连接池");
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return jedisPool.getResource();
 &nbsp; &nbsp; &nbsp;  }else{
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(" 连接池:"+jedisPool.getNumActive());
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return jedisPool.getResource();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
​
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;Jedis jedis = getJedis();
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(jedis.ping());
 &nbsp;  }
}

维度数据发生变化

如果维度数据发生了变化,这时缓存的数据就不是最新的了,所以这里优化将发生变化的维度数据,在缓存中清除。

在DimUtil.java加入清除缓存方法

//根据 key 让 Redis 中的缓存失效
public static void deleteCached( String tableName, String id){
 &nbsp; &nbsp;String key = "dim:" + tableName.toLowerCase() + ":" + id;
 &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp;Jedis jedis = RedisUtil.getJedis();
 &nbsp; &nbsp; &nbsp; &nbsp;// 通过 key 清除缓存
 &nbsp; &nbsp; &nbsp; &nbsp;jedis.del(key);
 &nbsp; &nbsp; &nbsp; &nbsp;jedis.close();
 &nbsp;  } catch (Exception e) {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("缓存异常!");
 &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();
 &nbsp;  }
}

另外一个,在实时同步mysql数据BaseDbTask任务中,将维度数据通过DimSink.java放入hbase,在invoke方法中添加清除缓存操作

@Override
public void invoke(JSONObject jsonObject, Context context) throws Exception {
 &nbsp; &nbsp;String sinkTable = jsonObject.getString("sink_table");
 &nbsp; &nbsp;JSONObject data = jsonObject.getJSONObject("data");
 &nbsp; &nbsp;PreparedStatement ps = null;
 &nbsp; &nbsp;if(data!=null && data.size()>0){
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//生成phoenix的upsert语句,这个包含insert和update操作
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;String sql = generateUpsert(data,sinkTable.toUpperCase());
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;log.info("开始执行 phoenix sql -->{}",sql);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;ps = conn.prepareStatement(sql);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;ps.executeUpdate();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;conn.commit();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;log.info("执行 phoenix sql 成功");
 &nbsp; &nbsp; &nbsp;  } catch (SQLException throwables) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;throwables.printStackTrace();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;throw new RuntimeException("执行 phoenix sql 失败!");
 &nbsp; &nbsp; &nbsp;  }finally {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if(ps!=null){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;ps.close();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;//如果是更新维度数据,则把redis数据清空
 &nbsp; &nbsp; &nbsp; &nbsp;if(jsonObject.getString("type").endsWith("update")){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;DimUtil.deleteCached(sinkTable,data.getString("id"));
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}

3. 优化2:异步查询

在 Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。例如:在电商场景中,需要一个商品的 skuid 去关联商品的一些属性,例如商品所属行业、商品的生产厂家、生产厂家的一些情况;在物流场景中,知道包裹 id,需要去关联包裹的行业属性、发货信息、收货信息等等。

默认情况下,在 Flink 的 MapFunction 中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO 阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间。为了提高处理效率,可以增加 MapFunction 的并行度,但增加并行度就意味着更多的资源,并不是一种非常好的解决方式。

Flink 在 1.2 中引入了 Async I/O,在异步模式下,将 IO 操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。

Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,解决与外部系统交互时网络延迟成为了系统瓶颈的问题。

异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,单个并行可以连续发送多个请求,提高并发效率。

这种方式特别针对涉及网络 IO 的操作,减少因为请求等待带来的消耗。

flink异步查询官方文档:

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/operators/asyncio/#%e5%bc%82%e6%ad%a5-io-api

3.1 封装线程池工具

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * @author zhangbaohpu
 * @date 2021/11/28 12:18
 * @desc 线程池工具类
 *
 */
public class ThreadPoolUtil {
 &nbsp; &nbsp;private static ThreadPoolExecutor poolExecutor;
​
 &nbsp; &nbsp;/**
 &nbsp; &nbsp; * 获取单例的线程池对象
 &nbsp; &nbsp; *  corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到 workQueue任务队列中去;
 &nbsp; &nbsp; *  maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的 workQueue 任务队列的类型,决定线程池会开辟的最大线程数量;
 &nbsp; &nbsp; *  keepAliveTime:当线程池中空闲线程数量超过 corePoolSize 时,多余的线程会在多长时间内被销毁;
 &nbsp; &nbsp; *  unit:keepAliveTime 的单位
 &nbsp; &nbsp; *  workQueue:任务队列,被添加到线程池中,但尚未被执行的任务
 &nbsp; &nbsp; * @return
 &nbsp; &nbsp; */
 &nbsp; &nbsp;public static ThreadPoolExecutor getPoolExecutor(){
 &nbsp; &nbsp; &nbsp; &nbsp;if (poolExecutor == null){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;synchronized (ThreadPoolUtil.class){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if (poolExecutor == null){
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;poolExecutor = new ThreadPoolExecutor(
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;4,20,300, TimeUnit.SECONDS,new LinkedBlockingDeque<>(Integer.MAX_VALUE)
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  );
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;return poolExecutor;
 &nbsp;  }
}

3.2 自定义维度接口

这个异步维表查询的方法适用于各种维表的查询,用什么条件查,查出来的结果如何合并到数据流对象中,需要使用者自己定义。

这就是自己定义了一个接口 DimJoinFunction包括两个方法。

import com.alibaba.fastjson.JSONObject;
​
/**
 * @author zhangbaohpu
 * @date 2021/11/28 12:34
 * @desc  维度关联接口
 */
public interface DimJoinFunction<T> {
​
 &nbsp; &nbsp;//根据流中获取主键
 &nbsp; &nbsp;String getKey(T obj);
​
 &nbsp; &nbsp;//维度关联
 &nbsp; &nbsp;void join(T stream, JSONObject dimInfo);
}

3.3 封装维度异步查询类

新建包func下创建DimAsyncFunction.java,该类继承异步方法类 RichAsyncFunction,实现自定义维度查询接口,其中 RichAsyncFunction是 Flink 提供的异步方法类,此处因为是查询操作输入类和返回类一致,所以是

RichAsyncFunction 这个类要实现两个方法:

  • open 用于初始化异步连接池。

  • asyncInvoke 方法是核心方法,里面的操作必须是异步的,如果你查询的数据库有异步api 也可以用线程的异步方法,如果没有异步方法,就要自己利用线程池等方式实现异步查询。

    import com.alibaba.fastjson.JSONObject;
    import com.google.common.collect.Lists;
    import com.zhangbao.gmall.realtime.utils.DimUtil;
    import com.zhangbao.gmall.realtime.utils.ThreadPoolUtil;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

    import java.util.Arrays;
    import java.util.concurrent.ExecutorService;

    /**

    • @author zhangbaohpu
    • @date 2021/11/28 12:24
    • @desc 通用的维度关联查询接口
    • 模板方法设计模式
    •   在父类中只定义方法的声明
    •   具体实现由子类完成
      */
      public abstract class DimAsyncFunction extends RichAsyncFunction implements DimJoinFunction {

         private String tableName;
         
         private static ExecutorService executorPool;

         public DimAsyncFunction(String tableName) {
             this.tableName = tableName;
        }

         @Override
         public void open(Configuration parameters) throws Exception {
             //初始化线程池
             executorPool = ThreadPoolUtil.getPoolExecutor();
        }

         @Override
         public void asyncInvoke(T obj, ResultFuture resultFuture) throws Exception {
             executorPool.submit(new Runnable() {
                 @Override
                 public void run() {
                     try {
                         long start = System.currentTimeMillis();
                         String key = getKey(obj);
                         //获取维度信息
                         JSONObject dimInfoJsonObj = DimUtil.getDimInfo(tableName, key);

                         //关联维度
                         if (dimInfoJsonObj != null){
                             join(obj,dimInfoJsonObj);
                        }
                         long end = System.currentTimeMillis();
                         System.out.println("关联维度数据,耗时:"+(end - start)+" 毫秒。");
                         resultFuture.complete(Arrays.asList(obj));
                    } catch (Exception e) {
                         e.printStackTrace();
                         throw new RuntimeException(tableName+"维度查询失败");
                    }
                }
            });
        }
      }

3.4 添加到主任务

将维度数据加入到订单宽表任务中,在订单宽表任务中OrderWideApp.java,完成对订单明细的双流join后,将用户维度数据关联到订单宽表中。

/**
 * 关联用户维度数据
 * flink异步查询
 * https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/operators/asyncio/#%e5%bc%82%e6%ad%a5-io-api
 */
SingleOutputStreamOperator<OrderWide> orderWideWithUserDs = AsyncDataStream.unorderedWait(orderWideDs, new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
 &nbsp; &nbsp;@Override
 &nbsp; &nbsp;public String getKey(OrderWide obj) {
 &nbsp; &nbsp; &nbsp; &nbsp;return obj.getOrder_id().toString();
 &nbsp;  }
​
 &nbsp; &nbsp;@Override
 &nbsp; &nbsp;public void join(OrderWide orderWide, JSONObject dimInfo) {
 &nbsp; &nbsp; &nbsp; &nbsp;Date birthday = dimInfo.getDate("BIRTHDAY");
 &nbsp; &nbsp; &nbsp; &nbsp;Long age = DateUtil.betweenYear(birthday, new Date(), false);
 &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setUser_age(age.intValue());
 &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setUser_gender(dimInfo.getString("GENDER"));
 &nbsp;  }
}, 60, TimeUnit.SECONDS);
​
orderWideWithUserDs.print("order wide with users >>>");

3.5 测试

开启的服务:zk,kf,redis,hdfs,hbase,maxwell,BaseDbTask.java

注:要清除的数据

  • mysql配置表,之前手动加的配置表删除,通过脚本执行要同步的表

    /*Data for the table `table_process` */
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_info', 'insert', 'hbase', 'dim_activity_info', 'id,activity_name,activity_type,activity_desc,start_time,end_time,create_time', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_info', 'update', 'hbase', 'dim_activity_info', 'id,activity_name,activity_type,activity_desc,start_time,end_time,create_time', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_rule', 'insert', 'hbase', 'dim_activity_rule', 'id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_rule', 'update', 'hbase', 'dim_activity_rule', 'id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_sku', 'insert', 'hbase', 'dim_activity_sku', 'id,activity_id,sku_id,create_time', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_sku', 'update', 'hbase', 'dim_activity_sku', 'id,activity_id,sku_id,create_time', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category1', 'insert', 'hbase', 'dim_base_category1', 'id,name', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category1', 'update', 'hbase', 'dim_base_category1', 'id,name', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category2', 'insert', 'hbase', 'dim_base_category2', 'id,name,category1_id', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category2', 'update', 'hbase', 'dim_base_category2', 'id,name,category1_id', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category3', 'insert', 'hbase', 'dim_base_category3', 'id,name,category2_id', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category3', 'update', 'hbase', 'dim_base_category3', 'id,name,category2_id', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_dic', 'insert', 'hbase', 'dim_base_dic', 'id,dic_name,parent_code,create_time,operate_time', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_dic', 'update', 'hbase', 'dim_base_dic', 'id,dic_name,parent_code,create_time,operate_time', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_province', 'insert', 'hbase', 'dim_base_province', 'id,name,region_id,area_code,iso_code,iso_3166_2', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_province', 'update', 'hbase', 'dim_base_province', 'id,name,region_id,area_code,iso_code,iso_3166_2', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_region', 'insert', 'hbase', 'dim_base_region', 'id,region_name', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_region', 'update', 'hbase', 'dim_base_region', 'id,region_name', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_trademark', 'insert', 'hbase', 'dim_base_trademark', 'id,tm_name', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_trademark', 'update', 'hbase', 'dim_base_trademark', 'id,tm_name', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('cart_info', 'insert', 'kafka', 'dwd_cart_info', 'id,user_id,sku_id,cart_price,sku_num,img_url,sku_name,is_checked,create_time,operate_time,is_ordered,order_time,source_type,source_id', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('comment_info', 'insert', 'kafka', 'dwd_comment_info', 'id,user_id,nick_name,head_img,sku_id,spu_id,order_id,appraise,comment_txt,create_time,operate_time', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_info', 'insert', 'hbase', 'dim_coupon_info', 'id,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,limit_num,taken_count,start_time,end_time,operate_time,expire_time,range_desc', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_info', 'update', 'hbase', 'dim_coupon_info', 'id,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,limit_num,taken_count,start_time,end_time,operate_time,expire_time,range_desc', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_range', 'insert', 'hbase', 'dim_coupon_range', 'id,coupon_id,range_type,range_id', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_range', 'update', 'hbase', 'dim_coupon_range', 'id,coupon_id,range_type,range_id', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_use', 'insert', 'kafka', 'dwd_coupon_use', 'id,coupon_id,user_id,order_id,coupon_status,get_type,get_time,using_time,used_time,expire_time', 'id', ' SALT_BUCKETS = 3');
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_use', 'update', 'kafka', 'dwd_coupon_use', 'id,coupon_id,user_id,order_id,coupon_status,get_type,get_time,using_time,used_time,expire_time', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('favor_info', 'insert', 'kafka', 'dwd_favor_info', 'id,user_id,sku_id,spu_id,is_cancel,create_time,cancel_time', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('financial_sku_cost', 'insert', 'hbase', 'dim_financial_sku_cost', 'id,sku_id,sku_name,busi_date,is_lastest,sku_cost,create_time', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('financial_sku_cost', 'update', 'hbase', 'dim_financial_sku_cost', 'id,sku_id,sku_name,busi_date,is_lastest,sku_cost,create_time', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_detail', 'insert', 'kafka', 'dwd_order_detail', 'id,order_id,sku_id,sku_name,order_price,sku_num,create_time,source_type,source_id,split_activity_amount,split_coupon_amount,split_total_amount', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_detail_activity', 'insert', 'kafka', 'dwd_order_detail_activity', 'id,order_id,order_detail_id,activity_id,activity_rule_id,sku_id,create_time', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_detail_coupon', 'insert', 'kafka', 'dwd_order_detail_coupon', 'id,order_id,order_detail_id,coupon_id,coupon_use_id,sku_id,create_time', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_info', 'insert', 'kafka', 'dwd_order_info', 'id,consignee,consignee_tel,total_amount,order_status,user_id,payment_way,delivery_address,order_comment,out_trade_no,trade_body,create_time,operate_time,expire_time,process_status,tracking_no,parent_order_id,img_url,province_id,activity_reduce_amount,coupon_reduce_amount,original_total_amount,feight_fee,feight_fee_reduce,refundable_time', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_info', 'update', 'kafka', 'dwd_order_info_update', 'id,consignee,consignee_tel,total_amount,order_status,user_id,payment_way,delivery_address,order_comment,out_trade_no,trade_body,create_time,operate_time,expire_time,process_status,tracking_no,parent_order_id,img_url,province_id,activity_reduce_amount,coupon_reduce_amount,original_total_amount,feight_fee,feight_fee_reduce,refundable_time', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_refund_info', 'insert', 'kafka', 'dwd_order_refund_info', 'id,user_id,order_id,sku_id,refund_type,refund_num,refund_amount,refund_reason_type,refund_reason_txt,refund_status,create_time', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('payment_info', 'insert', 'kafka', 'dwd_payment_info', 'id,out_trade_no,order_id,user_id,payment_type,trade_no,total_amount,subject,payment_status,create_time,callback_time,callback_content', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('payment_info', 'update', 'kafka', 'dwd_payment_info', 'id,out_trade_no,order_id,user_id,payment_type,trade_no,total_amount,subject,payment_status,create_time,callback_time,callback_content', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('refund_payment', 'insert', 'kafka', 'dwd_refund_payment', 'id,out_trade_no,order_id,sku_id,payment_type,trade_no,total_amount,subject,refund_status,create_time,callback_time,callback_content', 'id', NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('refund_payment', 'update', 'kafka', 'dwd_refund_payment', 'id,out_trade_no,order_id,sku_id,payment_type,trade_no,total_amount,subject,refund_status,create_time,callback_time,callback_content', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('sku_info', 'insert', 'hbase', 'dim_sku_info', 'id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,sku_default_img,is_sale,create_time', 'id', ' SALT_BUCKETS = 4');
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('sku_info', 'update', 'hbase', 'dim_sku_info', 'id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,sku_default_img,is_sale,create_time', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('spu_info', 'insert', 'hbase', 'dim_spu_info', 'id,spu_name,description,category3_id,tm_id', 'id', ' SALT_BUCKETS = 3');
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('spu_info', 'update', 'hbase', 'dim_spu_info', 'id,spu_name,description,category3_id,tm_id', NULL, NULL);
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('user_info', 'insert', 'hbase', 'dim_user_info', 'id,login_name,name,user_level,birthday,gender,create_time,operate_time', 'id', ' SALT_BUCKETS = 3');
    INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('user_info', 'update', 'hbase', 'dim_user_info', 'id,login_name,name,user_level,birthday,gender,create_time,operate_time', NULL, NULL);
    ​
  • hbase数据清除,重新建立维度表

    !tables:查看所有表

    drop table GMALL_REALTIME.BASE_TRADEMARK;:删除表

  • 初始化维度数据

    将用户表的历史全量同步到hbase中,通过Maxwell的Bootstrap完成,Maxwell安装及使用可查看之前的文章。

    bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2021 --table user_info --client_id maxwell_1
  • 启动模拟生成业务数据jar

经过测试,可以看到订单宽表中用户信息的年龄及性别分别都有值。

4. 其他维度关联

关联省份维度和关联用户维度处理逻辑一样,这里就要以关联用户维度后的结果流为基础,再去关联省份

需要做的要先把省份的维度数据全同步到hbase,还是通过Maxwell完成

bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2021 --table base_province --client_id maxwell_1

/**
 * 关联省份维度
 * 以上一个流为基础,关联省份数据
 */
SingleOutputStreamOperator<OrderWide> orderWideWithProvinceDs = AsyncDataStream.unorderedWait(orderWideWithUserDs,
 &nbsp; &nbsp; &nbsp; &nbsp;new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;@Override
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;public String getKey(OrderWide orderWide) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return orderWide.getProvince_id().toString();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;@Override
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;public void join(OrderWide orderWide, JSONObject dimInfo) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setProvince_name(dimInfo.getString("NAME"));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setProvince_iso_code(dimInfo.getString("ISO_CODE"));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setProvince_area_code(dimInfo.getString("AREA_CODE"));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setProvince_3166_2_code(dimInfo.getString("ISO_3166_2"));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }, 60, TimeUnit.SECONDS);
orderWideWithProvinceDs.print("order wide with province>>>");

初始化sku维度数据

bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2021 --table sku_info --client_id maxwell_1

/**
 * 关联sku数据
 */
SingleOutputStreamOperator<OrderWide> orderWideWithSkuDs = AsyncDataStream.unorderedWait(orderWideWithProvinceDs,
 &nbsp; &nbsp; &nbsp; &nbsp;new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;@Override
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;public String getKey(OrderWide orderWide) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return orderWide.getSku_id().toString();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
​
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;@Override
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;public void join(OrderWide orderWide, JSONObject dimInfo) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setSku_name(dimInfo.getString("SKU_NAME"));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setSpu_id(dimInfo.getLong("SPU_ID"));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setCategory3_id(dimInfo.getLong("CATEGORY3_ID"));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setTm_id(dimInfo.getLong("TM_ID"));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }, 60, TimeUnit.SECONDS);

初始化spu维度数据

bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2021 --table spu_info --client_id maxwell_1

/**
 * 关联spu数据
 */
SingleOutputStreamOperator<OrderWide> orderWideWithSpuDs = AsyncDataStream.unorderedWait(orderWideWithSkuDs, new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") {
 &nbsp; &nbsp;@Override
 &nbsp; &nbsp;public String getKey(OrderWide orderWide) {
 &nbsp; &nbsp; &nbsp; &nbsp;return orderWide.getSpu_id().toString();
 &nbsp;  }
​
 &nbsp; &nbsp;@Override
 &nbsp; &nbsp;public void join(OrderWide orderWide, JSONObject dimInfo) {
 &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setSpu_name(dimInfo.getString("SPU_NAME"));
​
 &nbsp;  }
}, 60, TimeUnit.SECONDS);

初始化品类维度数据

bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2021 --table base_category3 --client_id maxwell_1

/**
 * 关联品类数据
 */
​
SingleOutputStreamOperator<OrderWide> orderWideWithCategoryDs = AsyncDataStream.unorderedWait(orderWideWithSpuDs, new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") {
 &nbsp; &nbsp;@Override
 &nbsp; &nbsp;public String getKey(OrderWide orderWide) {
 &nbsp; &nbsp; &nbsp; &nbsp;return orderWide.getCategory3_id().toString();
 &nbsp;  }
​
 &nbsp; &nbsp;@Override
 &nbsp; &nbsp;public void join(OrderWide orderWide, JSONObject dimInfo) {
 &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setCategory3_name(dimInfo.getString("NAME"));
 &nbsp;  }
}, 60, TimeUnit.SECONDS);

初始化品牌维度数据

bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2021 --table base_trademark --client_id maxwell_1

/**
 * 关联品牌数据
 */
​
SingleOutputStreamOperator<OrderWide> orderWideWithTmDs = AsyncDataStream.unorderedWait(orderWideWithCategoryDs, new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") {
 &nbsp; &nbsp;@Override
 &nbsp; &nbsp;public String getKey(OrderWide orderWide) {
 &nbsp; &nbsp; &nbsp; &nbsp;return orderWide.getTm_id().toString();
 &nbsp;  }
​
 &nbsp; &nbsp;@Override
 &nbsp; &nbsp;public void join(OrderWide orderWide, JSONObject dimInfo) {
 &nbsp; &nbsp; &nbsp; &nbsp;orderWide.setTm_name(dimInfo.getString("TM_NAME"));
 &nbsp;  }
}, 60, TimeUnit.SECONDS);
orderWideWithTmDs.print("order wide with sku_spu_category_tm >>> ");

5. 订单宽表写入kafka

/**
 * 将关联后的订单宽表数据发送到kafka的dwm层
 */
orderWideWithTmDs.map(orderWide -> JSONObject.toJSONString(orderWide))
 &nbsp; &nbsp; &nbsp;  .addSink(MyKafkaUtil.getKafkaSink(orderWideTopic));

项目地址:https://github.com/zhangbaohpu/gmall-flink-parent

更多请在某公号平台搜索:选手一号位,本文编号:1010,回复即可获取。