MapReduce07 Join多种应用
阅读原文时间:2023年07月10日阅读:1

目录

1 Join多种应用

Map 端的主要工作:不同表或文件的 key/value 对, 打标签以区别不同来源的记录。然后用连接字段作为 key,其余部分和新加的标志作为 value,最后进行输出。

Reduce 端的主要工作:在 Reduce 端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在 Map 阶段已经打标志) 分开,最后进行合并。

需求

由两张表,表1 order.txt

表2 商品信息表

将商品信息表中数据根据商品pid合并到订单数据表中,预期合并结果

需求分析

Map数据处理

问题1:输入的是两个文件,如何区分输入数据来自哪个文件?

切片可以获取到文件的路径和文件的名称

切片.getPath().getName()

问题2:输出的kv是什么?

k是pid,这样pid相同的数据(来自两张表)才能一起给Reduce合并。

v应该包含 订单id、数量、产品名称

问题3:values是多个字段,这里需要序列化吗?

MapTask传数据给ReduceTask的时候,有可能会跨服务器传输,所以需要序列化。把values封装成bean对象。

问题4:bean对象如何设计?

需要包含所有的字段 订单id、数量、产品名称、flag

两张表的数据不同,没有的地方就为空。

1001 1 "" order
"" "" 小米 pd

Reduce端合并(数据倾斜)

Reduce端来Join

key是pid 如01,pid相同的三条数据传输到Reduce端,把pd.txt的产品名称替换order的pid

map输出过来的数据格式

如何合并?

创建两个集合,一个存放order表的数据(这个数据多),一个存放pd表的数据

代码实现

JoinBean类

序列化和反序列顺序一定要一致

package com.ranan.mapreduce.reducejoin;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author ranan
 * @create 2021-08-30 16:41
 */
public class JoinBean implements Writable {
    //id pid amount pid pname
    private String id; //订单id
    private String pid;//商品id
    private String pname;//产品名字
    private  int amout;//数量
    private String flag;//标记来自哪个表格

    //序列化需要空参构造,用于反射调用空参构造函数
    public JoinBean() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public int getAmout() {
        return amout;
    }

    public void setAmout(int amout) {
        this.amout = amout;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    //序列化方法
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(id); //String类型用writeUTF写
        dataOutput.writeUTF(pid);
        dataOutput.writeInt(amout); //int类型用writeInt
        dataOutput.writeUTF(pname);
        dataOutput.writeUTF(flag);

    }
    //反序列化方法
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        setId(dataInput.readUTF());
        setPid(dataInput.readUTF());
        setAmout(dataInput.readInt());
        setPname(dataInput.readUTF());
        setFlag(dataInput.readUTF());
    }
    //重写toString方法,最终输出的格式 id pname amout
    @Override
    public String toString() {
        return id + "\t" + pname + "\t" +amout +"\n";
    }
}

JoinMap

key:pid Text类型

value:JoinBean JoinBean

为什么需要初始化?

传过来了两个文件order.txt pd.txt

初始化的时候希望获得对应的文件名称,在后续操作给数据打标记

为什么在初始化的时候获取文件名字?

默认切片规则,一个文件一个切片(因为文件很小,所以文件里面不用切),一个切片开启一个MapTask。

每一个MapTask都有setup方法,map方法。map方法每一行输入执行一次,setup只执行一次。只需要获取一次文件名字就行了。

package com.ranan.mapreduce.reducejoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
 * @author ranan
 * @create 2021-08-30 17:09
 */
public class JoinMapper extends Mapper<LongWritable, Text,Text,JoinBean> {
    private String fileName;
    private Text outK = new Text();
    private JoinBean outV = new JoinBean();

    //初始化方法
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //获得切片信息 返回的是InputSplit抽象类,用它的子类FileSplit,因为我们输入的是文件
        FileSplit split = (FileSplit) context.getInputSplit();

        fileName = split.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        //判断是哪个文件
        if(fileName.contains("order")){
            //处理订单表   1001 01 1 order
            String[] order = line.split("\t");
            //封装kv
            outK.set(order[1]);
            outV.setId(order[0]);
            outV.setPid(order[1]);
            outV.setAmout(Integer.parseInt(order[2]));
            outV.getPname("");
            outV.setFlag("order");
        }else {
            //处理商品表 01 小米 pd
            String[] pd = line.split("\t");
            outK.set(pd[0]);
            outV.setId("");
            outV.setPid(pd[0]);
            outV.setAmout(0);
            outV.setPname(pd[1]);
            outV.setFlag("pd");
        }
        context.write(outK,outV);

    }
}

JoinReduce

输入的数据

希望输出的数据

为什么最终的输出是JoinBean?

因为重写了JoinBean的toString方法,最后打印会调用toString方法,只输出需要的三个变量。

Hadoop的迭代

Hadoop迭代器中使用了对象重用,迭代时value始终指向一个内存地址,改变的是那个内存地址中的数据。

for(JoinBean value:values){
if("order".equals(value.getFlag())){
     orderBeans.add(value);
     }
}

value指向一个地址且值不变,每次循环改变的是该内存地址的值,所以这样写最后的数据全是最后一次循环的数据。

应该用一个临时变量存储,使用工具类BeanUtils

for(JoinBean value:values){
   if("order".equals(value.getFlag())){
        JoinBean tmp = new JoinBean();
        //工具类
        try {
            BeanUtils.copyProperties(tmp,value);
         } catch (IllegalAccessException e) {
            e.printStackTrace();
         } catch (InvocationTargetException e) {
             e.printStackTrace();
         }
             orderBeans.add(tmp);
     }
}

完整代码

package com.ranan.mapreduce.reducejoin;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.w3c.dom.Text;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

/**
 * @author ranan
 * @create 2021-08-30 19:53
 */
public class JoinReduce extends Reducer<Text,JoinBean, JoinBean, NullWritable> {
/*
01 1001 1 order
01 1004 4 order
01 小米    pd
 */

    @Override
    protected void reduce(Text key, Iterable<JoinBean> values, Context context) throws IOException, InterruptedException {
        ArrayList <JoinBean> orderBeans =  new ArrayList<>();  //有很多个
        JoinBean pdBean = new JoinBean();//一个pid对应一个pname
        for(JoinBean value:values){
            if("order".equals(value.getFlag())){
                JoinBean tmp = new JoinBean();
                try {
         //*****************工具类使用******************
                    BeanUtils.copyProperties(tmp,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
                orderBeans.add(tmp);

            }else {
                try {
                    BeanUtils.copyProperties(pdBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
        //循环遍历orderBeans,为它的pname赋值
        for(JoinBean orderBean:orderBeans){
            orderBean.setPname(pdBean.getPname());
            context.write(orderBean,NullWritable.get());
        }
    }
}

JoinDriver

package com.ranan.mapreduce.reducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author ranan
 * @create 2021-08-30 20:39
 */
public class JoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(JoinDriver.class);
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(JoinBean.class);

        job.setOutputKeyClass(JoinBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job,new Path("D:\\hadoop\\hadoop_data\\inputtable"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\hadoop_data\\outputtable"));

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

执行结果

总结

缺点:这种方式中,合并的操作是在Reduce阶段完成,当一个表内容很多的时候,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在大量数据在Reduce阶段进行汇总,Reduce阶段极易产生数据倾斜。

解决方案:Map端实现数据合并。(能在Map阶段实现的操作在Map阶段实现)

使用场景

MapJoin适用于一张表十分小(用于放入缓存)、一张表很大的场景。

问题1:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

一个ReduceTask处理一张很大的表速度很慢,MapTask数量多,可以将合并操作放到Map端处理。

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数

据的压力,尽可能的减少数据倾斜。

问题2:如何将两张表在Map阶段进行合并?

默认是每张表单独切片,先将pd缓存,执行MapTask通过缓存读取pd

具体办法:采用DistributedCache

1.在Mapper的setup阶段,将文件读取到缓存集合中。

2.在Driver驱动类中加载缓存。

//如果是本地文件,缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));

需求分析

1.DistributedCacheDriver缓存文件

//1.加载缓存数据
job.addCacheFile(new
URI("file:///e:/cache/pd.txt"));
//2.Map端join的逻辑不需要Reduce阶段,设置ReduceTask数量为0
job.setNumReduceTasks(0);

2.读取缓存的文件数据

在setup()方法中

// 1.获取缓存的文件
// 2.循环读取缓存文件一行
// 3.切割
// 4.缓存数据到集合
<pid,pname>
01,小米
02,华为
03,格力
// 5.关流

3.map方法中读取order表的信息

// 1.获取一行
// 2.截取
// 3.获取pid
// 4.获取商品名称
// 5.拼接
// 6.写出

代码实现

MapJoinDrive

先在MapJoinDriver驱动类中添加缓存文件

package com.ranan.mapreduce.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * @author ranan
 * @create 2021-08-31 20:42
 */
public class MapJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(MapJoinDriver.class);
        job.setMapperClass(MapJoinMapper.class);
        //job.setReducerClass(MapJoinReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //加载缓存数据
        job.addCacheFile(new URI("file:///D:/hadoop/hadoop_data/inputtable/pd.txt"));
        //默认一个ReduceTask
        job.setNumReduceTasks(0);

        FileInputFormat.setInputPaths(job,new Path("D:/hadoop/hadoop_data/inputtable/order.txt"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\hadoop_data\\outputtable"));

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

MapJoinMapper

1.在Mapper的setup阶段,将文件读取到缓存集合中。

2.在Driver驱动类中加载缓存。

package com.ranan.mapreduce.mapjoin;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

/**
 * @author ranan
 * @create 2021-08-31 20:44
 */
public class MapJoinMapper extends Mapper<LongWritable,Text,Text, NullWritable> {
    private HashMap<String, String> pdMap = new HashMap<>();
    private Text outK = new Text();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      //获取缓存文件,并把文件内容封装到集合pd.txt
      URI[] cacheFiles = context.getCacheFiles();
        //获取流变量
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));
        //从流中读取数据
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "utf-8"));
        String line;
        //读取一行readLine()
        while(StringUtils.isNotEmpty(line = reader.readLine())){
            String[] items = line.split("\t");

            //赋值
            pdMap.put(items[0],items[1]);
        }
        //关流
        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //处理 order.txt
        String line = value.toString();
        String[] items = line.split("\t");

        //获取pname
        String pname= pdMap.get(items[1]);
        //获取订单数量和订单数量
        outK.set(items[0] + "\t" + pname +"\t" +items[2]);
        context.write(outK,NullWritable.get());
    }
}

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章