目录
Map 端的主要工作:不同表或文件的 key/value 对, 打标签以区别不同来源的记录。然后用连接字段作为 key,其余部分和新加的标志作为 value,最后进行输出。
Reduce 端的主要工作:在 Reduce 端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在 Map 阶段已经打标志) 分开,最后进行合并。
由两张表,表1 order.txt
表2 商品信息表
将商品信息表中数据根据商品pid合并到订单数据表中,预期合并结果
问题1:输入的是两个文件,如何区分输入数据来自哪个文件?
切片可以获取到文件的路径和文件的名称
切片.getPath().getName()
问题2:输出的kv是什么?
k是pid,这样pid相同的数据(来自两张表)才能一起给Reduce合并。
v应该包含 订单id、数量、产品名称
问题3:values是多个字段,这里需要序列化吗?
MapTask传数据给ReduceTask的时候,有可能会跨服务器传输,所以需要序列化。把values封装成bean对象。
问题4:bean对象如何设计?
需要包含所有的字段 订单id、数量、产品名称、flag
两张表的数据不同,没有的地方就为空。
1001 1 "" order
"" "" 小米 pd
Reduce端来Join
key是pid 如01,pid相同的三条数据传输到Reduce端,把pd.txt的产品名称替换order的pid
map输出过来的数据格式
如何合并?
创建两个集合,一个存放order表的数据(这个数据多),一个存放pd表的数据
序列化和反序列顺序一定要一致
package com.ranan.mapreduce.reducejoin;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author ranan
* @create 2021-08-30 16:41
*/
public class JoinBean implements Writable {
//id pid amount pid pname
private String id; //订单id
private String pid;//商品id
private String pname;//产品名字
private int amout;//数量
private String flag;//标记来自哪个表格
//序列化需要空参构造,用于反射调用空参构造函数
public JoinBean() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public int getAmout() {
return amout;
}
public void setAmout(int amout) {
this.amout = amout;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
//序列化方法
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id); //String类型用writeUTF写
dataOutput.writeUTF(pid);
dataOutput.writeInt(amout); //int类型用writeInt
dataOutput.writeUTF(pname);
dataOutput.writeUTF(flag);
}
//反序列化方法
@Override
public void readFields(DataInput dataInput) throws IOException {
setId(dataInput.readUTF());
setPid(dataInput.readUTF());
setAmout(dataInput.readInt());
setPname(dataInput.readUTF());
setFlag(dataInput.readUTF());
}
//重写toString方法,最终输出的格式 id pname amout
@Override
public String toString() {
return id + "\t" + pname + "\t" +amout +"\n";
}
}
key:pid Text类型
value:JoinBean JoinBean
为什么需要初始化?
传过来了两个文件order.txt pd.txt
初始化的时候希望获得对应的文件名称,在后续操作给数据打标记
为什么在初始化的时候获取文件名字?
默认切片规则,一个文件一个切片(因为文件很小,所以文件里面不用切),一个切片开启一个MapTask。
每一个MapTask都有setup方法,map方法。map方法每一行输入执行一次,setup只执行一次。只需要获取一次文件名字就行了。
package com.ranan.mapreduce.reducejoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* @author ranan
* @create 2021-08-30 17:09
*/
public class JoinMapper extends Mapper<LongWritable, Text,Text,JoinBean> {
private String fileName;
private Text outK = new Text();
private JoinBean outV = new JoinBean();
//初始化方法
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获得切片信息 返回的是InputSplit抽象类,用它的子类FileSplit,因为我们输入的是文件
FileSplit split = (FileSplit) context.getInputSplit();
fileName = split.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//判断是哪个文件
if(fileName.contains("order")){
//处理订单表 1001 01 1 order
String[] order = line.split("\t");
//封装kv
outK.set(order[1]);
outV.setId(order[0]);
outV.setPid(order[1]);
outV.setAmout(Integer.parseInt(order[2]));
outV.getPname("");
outV.setFlag("order");
}else {
//处理商品表 01 小米 pd
String[] pd = line.split("\t");
outK.set(pd[0]);
outV.setId("");
outV.setPid(pd[0]);
outV.setAmout(0);
outV.setPname(pd[1]);
outV.setFlag("pd");
}
context.write(outK,outV);
}
}
输入的数据
希望输出的数据
为什么最终的输出是JoinBean?
因为重写了JoinBean的toString方法,最后打印会调用toString方法,只输出需要的三个变量。
Hadoop的迭代
Hadoop迭代器中使用了对象重用,迭代时value始终指向一个内存地址,改变的是那个内存地址中的数据。
for(JoinBean value:values){
if("order".equals(value.getFlag())){
orderBeans.add(value);
}
}
value指向一个地址且值不变,每次循环改变的是该内存地址的值,所以这样写最后的数据全是最后一次循环的数据。
应该用一个临时变量存储,使用工具类BeanUtils
for(JoinBean value:values){
if("order".equals(value.getFlag())){
JoinBean tmp = new JoinBean();
//工具类
try {
BeanUtils.copyProperties(tmp,value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
orderBeans.add(tmp);
}
}
完整代码
package com.ranan.mapreduce.reducejoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.w3c.dom.Text;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
/**
* @author ranan
* @create 2021-08-30 19:53
*/
public class JoinReduce extends Reducer<Text,JoinBean, JoinBean, NullWritable> {
/*
01 1001 1 order
01 1004 4 order
01 小米 pd
*/
@Override
protected void reduce(Text key, Iterable<JoinBean> values, Context context) throws IOException, InterruptedException {
ArrayList <JoinBean> orderBeans = new ArrayList<>(); //有很多个
JoinBean pdBean = new JoinBean();//一个pid对应一个pname
for(JoinBean value:values){
if("order".equals(value.getFlag())){
JoinBean tmp = new JoinBean();
try {
//*****************工具类使用******************
BeanUtils.copyProperties(tmp,value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
orderBeans.add(tmp);
}else {
try {
BeanUtils.copyProperties(pdBean,value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
//循环遍历orderBeans,为它的pname赋值
for(JoinBean orderBean:orderBeans){
orderBean.setPname(pdBean.getPname());
context.write(orderBean,NullWritable.get());
}
}
}
JoinDriver
package com.ranan.mapreduce.reducejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author ranan
* @create 2021-08-30 20:39
*/
public class JoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(JoinDriver.class);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(JoinBean.class);
job.setOutputKeyClass(JoinBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("D:\\hadoop\\hadoop_data\\inputtable"));
FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\hadoop_data\\outputtable"));
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
缺点:这种方式中,合并的操作是在Reduce阶段完成,当一个表内容很多的时候,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在大量数据在Reduce阶段进行汇总,Reduce阶段极易产生数据倾斜。
解决方案:Map端实现数据合并。(能在Map阶段实现的操作在Map阶段实现)
使用场景
MapJoin适用于一张表十分小(用于放入缓存)、一张表很大的场景。
问题1:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
一个ReduceTask处理一张很大的表速度很慢,MapTask数量多,可以将合并操作放到Map端处理。
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数
据的压力,尽可能的减少数据倾斜。
问题2:如何将两张表在Map阶段进行合并?
默认是每张表单独切片,先将pd缓存,执行MapTask通过缓存读取pd
具体办法:采用DistributedCache
1.在Mapper的setup阶段,将文件读取到缓存集合中。
2.在Driver驱动类中加载缓存。
//如果是本地文件,缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
1.DistributedCacheDriver缓存文件
//1.加载缓存数据
job.addCacheFile(new
URI("file:///e:/cache/pd.txt"));
//2.Map端join的逻辑不需要Reduce阶段,设置ReduceTask数量为0
job.setNumReduceTasks(0);
2.读取缓存的文件数据
在setup()方法中
// 1.获取缓存的文件
// 2.循环读取缓存文件一行
// 3.切割
// 4.缓存数据到集合
<pid,pname>
01,小米
02,华为
03,格力
// 5.关流
3.map方法中读取order表的信息
// 1.获取一行
// 2.截取
// 3.获取pid
// 4.获取商品名称
// 5.拼接
// 6.写出
先在MapJoinDriver驱动类中添加缓存文件
package com.ranan.mapreduce.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* @author ranan
* @create 2021-08-31 20:42
*/
public class MapJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(MapJoinDriver.class);
job.setMapperClass(MapJoinMapper.class);
//job.setReducerClass(MapJoinReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//加载缓存数据
job.addCacheFile(new URI("file:///D:/hadoop/hadoop_data/inputtable/pd.txt"));
//默认一个ReduceTask
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job,new Path("D:/hadoop/hadoop_data/inputtable/order.txt"));
FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\hadoop_data\\outputtable"));
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
1.在Mapper的setup阶段,将文件读取到缓存集合中。
2.在Driver驱动类中加载缓存。
package com.ranan.mapreduce.mapjoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
/**
* @author ranan
* @create 2021-08-31 20:44
*/
public class MapJoinMapper extends Mapper<LongWritable,Text,Text, NullWritable> {
private HashMap<String, String> pdMap = new HashMap<>();
private Text outK = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获取缓存文件,并把文件内容封装到集合pd.txt
URI[] cacheFiles = context.getCacheFiles();
//获取流变量
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));
//从流中读取数据
BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "utf-8"));
String line;
//读取一行readLine()
while(StringUtils.isNotEmpty(line = reader.readLine())){
String[] items = line.split("\t");
//赋值
pdMap.put(items[0],items[1]);
}
//关流
IOUtils.closeStream(reader);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//处理 order.txt
String line = value.toString();
String[] items = line.split("\t");
//获取pname
String pname= pdMap.get(items[1]);
//获取订单数量和订单数量
outK.set(items[0] + "\t" + pname +"\t" +items[2]);
context.write(outK,NullWritable.get());
}
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章