Hadoop适合海量数据分布式存储和分布式计算
运行用户使用简单的编程模型实现跨机器集群对海量数据进行分布式计算处理
Hadoop核心组件
Hadoop发展简史
Hadoop起源于Apache Lucen子项目:Nutch
Nutch的设计目标是构建一个大型的全网搜索引擎
问题:如何解决数十亿网页的存储和索引问题
三篇论文Google
Hadoop现状
Hadoop特性优点
发行版本
开源社区版:Apache开源社区发型
商业发行版
架构变迁
Hadoop 1.0
HDFS
MapReduce
Hadoop 2.0
HDFS
MapReduce
YARN
Hadoop 3.0
着重于性能优化
Hadoop集群整体概述
HDFS集群
Yarn集群
安装Hadoop
Hadoop安装包结构
集群安装部署
要启动hadoop集群,首先要格式化HDFS
$HADOOP_HOME/bin/hdfs namenode -format
启动hdfs
方法一:在主节点上启动namenode,在每一个从节点上启动datanode
# node1
$HADOOP_HOME/bin/hdfs --daemon start namenode
# node2 node3...
$HADOOP_HOME/bin/hdfs --daemon start datanode
# 关闭
$HADOOP_HOME/bin/hdfs --daemon stop namenode
$HADOOP_HOME/bin/hdfs --daemon stop datanode
方法二:如果配置了 etc/hadoop/workers
且所有的节点都配置了ssh免密登陆,在任意一个节点上都可以启动,运行一次即可
$HADOOP_HOME/sbin/start-dfs.sh
# 关闭
$HADOOP_HOME/sbin/stop-dfs.sh
启动YARN
方法一:启动ResourceManager,在主角色的节点上运行,启动NodeManager,在每一个从角色上运行
# node1
$HADOOP_HOME/bin/yarn --daemon start resourcemanager
# node2 ...
$HADOOP_HOME/bin/yarn --daemon start nodemanager
#关闭
$HADOOP_HOME/bin/yarn --daemon stop resourcemanager
$HADOOP_HOME/bin/yarn --daemon stop nodemanager
方法二:如果配置了 etc/hadoop/workers
且所有的节点都配置了ssh免密登陆,在任意一个节点上运行都可以启动
$HADOOP_HOME/sbin/start-yarn.sh
#关闭
$HADOOP_HOME/sbin/stop-yarn.sh
也可以使用一键运行的脚本开启yarn和hdfs
$HADOOP_HOME/sbin/start-all.sh
$HADOOP_HOME/sbin/stop-all.sh
开启日志服务器(可选)
开启之前需要开启日志聚合功能,需要修改bin/yarn-site.xml
,添加如下内容
要根据自己的配置,修改服务器地址
<!-- 开启日志聚集 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 设置yarn历史服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://node1:19888/jobhistory/logs</value>
</property>
<!-- 保存的时间7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
启动日志服务器
$HADOOP_HOME/bin/mapred --daemon start historyserver
#关闭
$HADOOP_HOME/bin/mapred --daemon stop historyserver
Hadoop Distributed File System, Hadoop的分布式文件系统
HDFS主要是解决大数据如何存储问题的。分布式意味着是HDFS是横跨在多台计算机上的存储系统。
HDFS是一种能够在普通硬件上运行的分布式文件系统,它是高度容错的,适应于具有大数据集的应用程序,它非 常适于存储大型数据 (比如 TB 和 PB)。
HDFS使用多台计算机存储文件, 并且提供统一的访问接口, 像是访问一个普通文件系统一样使用分布式文件系统
特点
设计目标
应用场景
主要特性
主从架构
分块存储
hdfs-default.xml
中:dfs.blocksize
副本机制
dfs.replication
控制,默认值是3,也就是会额外再复制2份,连同本身总共3份副本。元数据记录
在HDFS中,Namenode管理的元数据具有两种类型:
文件自身属性信息 文件名称、权限,修改时间,文件大小,复制因子,数据块大小。
文件块位置映射信息 记录文件块和DataNode之间的映射信息,即哪个块位于哪个节点上。
抽象统一的目录树结构(namespace)
HDFS支持传统的层次型文件组织结构。用户可以创建目录,然后将文件保存在这些目录里。文件系统名字空间的 层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件。
Namenode负责维护文件系统的namespace名称空间,任何对文件系统名称空间或属性的修改都将被Namenode 记录下来。
HDFS会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dira/dir-b/dir-c/file.data
数据库存储
简介
command-line interface
,缩写:CLI
),是指用户通过键盘输入指令,计算机接收到指令后 ,予以执行一种人际交互方式。hadoop fs [generic options]
文件系统协议
HDFS Shell CLI
支持操作多种文件系统,包括本地文件系统(file:///)
、分布式文件系统(hdfs://nn:8020)
等fs.defaultFS
属性,以该属性值作为默认文件系统。区别
hadoop dfs
只能操作HDFS文件系统(包括与Local FS间的操作),不过已经Deprecatedhdfs dfs
只能操作HDFS文件系统相关(包括与Local FS间的操作),常用hadoop fs
可操作任意文件系统,不仅仅是hdfs文件系统,使用范围更广目前版本来看,官方最终推荐使用的是hadoop fs。当然hdfs dfs在市面上的使用也比较多。
参数说明
操作命令
hadoop fs -xxx [x] <path> ...
ls
:查询指定路径信息
hadoop fs -ls [-h] [-R] [<path> ...]
[-h]
:人性化显示文件size[-R]
:递归查看指定目录及其子目录put
:从本地上传文件
hadoop fs -put [-f] [-p] <localsrc> ... <dst>
[-f]
覆盖目标文件(已存在下)[-p]
保留访问和修改时间,所有权和权限localsrc
本地文件系统(客户端所在机器)dst
目标文件系统(HDFS)get
:下载文件到本地
hadoop fs -get [-f] [-p] <src> ... <localdst>
localdst
必须是目录-f
覆盖目标文件(已存在下)-p
保留访问和修改时间,所有权和权限cat:查看HDFS文件内容
hadoop fs -cat <src> ...
cp
hadoop fs -cp [-f] <src> ... <dst>
mkdir
:创建文件夹
[-p]
:递归创建文件夹rm [-r]
:删除文件/文件夹
apped
:追加文件
hadoop fs -appendToFile <localsrc> ... <dst>
将所有给定本地文件的内容追加到给定dst文件
dst如果文件不存在,将创建该文件
如果<localSrc>
为-
,则输入为从标准输入中读取
适合小文件合并
#追加内容到文件尾部 appendToFile
[root@node3 ~]# echo 1 >> 1.txt
[root@node3 ~]# echo 2 >> 2.txt
[root@node3 ~]# echo 3 >> 3.txt
[root@node3 ~]# hadoop fs -put 1.txt /
[root@node3 ~]# hadoop fs -cat /1.txt
1
[root@node3 ~]# hadoop fs -appendToFile 2.txt 3.txt /1.txt
[root@node3 ~]# hadoop fs -cat /1.txt
1
2
3
示例
# 完整命令
bin/hadoop fs -xxx scheme://authority/path
# 显示文件
hadoop fs -ls /
# 上传文件
hadoop fs -put readme.txt /
# 显示文件
hadoop fs -ls /
# Found 1 items
# -rw-r--r-- 1 root supergroup 0 2022-04-18 21:37 /readme.txt
# 查看文件内容
hadoop fs -cat /readme.txt
# 下载文件到本地
hadoop fs -get /readme.txt read.txt
# 创建文件夹(-p递归创建目录)
hadoop fs -mkdir /test
hadoop fs -mkdir -p /test2/cur/
统计文件数量
hadoop fs -ls / | grep / | wc -l
统计文件大小
hadoop fs -ls / | grep / | awk '{print $8,$5}'
/readme.txt 0
/test 0
架构图
NameNode
主角色
Fsimage
内存元数据镜像文件edits log(Journal)
编辑日志,记录用户的操作DataNode
从角色
SecondaryNameNode
NameNode职责
DataNode职责
写数据完整流程
pipeline管道
为什么DataNode之间采用pipeline线性传输,而不是一次给三个DataNode拓扑式传输呢?
ACK应答响应
默认3副本存储策略
BlockPlacementPolicyDefault
指定。实际流程
HDFS客户端创建对象实例DistributedFileSystem
, 该对象中封装了与HDFS文件系统操作的相关方法。
调用DistributedFileSystem
对象的create()
方法,通过RPC请求NameNode创建文件。
FSDataOutputStream
输出流对象给客户端用于写数据。客户端通过FSDataOutputStream输出流开始写入数据。
客户端写入数据时,将数据分成一个个数据包(packet 默认64k), 内部组件DataStreamer请求NameNode挑 选出适合存储数据副本的一组DataNode地址,默认是3副本存储
传输的反方向上,会通过ACK机制校验数据包传输是否成功
客户端完成数据写入后,在FSDataOutputStream
输出流上调用close()
方法关闭
DistributedFileSystem
联系NameNode告知其文件写入完成,等待NameNode确认
因为NameNode已经知道文件由哪些块组成(DataStream请求分配数据块),因此仅需等待最小复制块即可成功返回 。 最小复制是由参数dfs.namenode.replication.min
指定,默认是1.
流程图
HDFS客户端创建对象实例DistributedFileSystem, 调用该对象的open()方法来打开希望读取的文件
DistributedFileSystem使用RPC调用namenode来确定文件中前几个块的块位置(分批次读取)信息。 对于每个块,namenode返回具有该块所有副本的datanode位置地址列表,并且该地址列表是排序好的,与客户端的网络拓扑距离近的排序靠前
DistributedFileSystem将FSDataInputStream输入流返回到客户端以供其读取数据
客户端在FSDataInputStream输入流上调用read()方法。
然后,已存储DataNode地址的InputStream连接到文件 中第一个块的最近的DataNode。数据从DataNode流回客户端,结果客户端可以在流上重复调用read()
当该块结束时,FSDataInputStream将关闭与DataNode的连接,然后寻找下一个block块的最佳datanode位置。 这些操作对用户来说是透明的。所以用户感觉起来它一直在读取一个连续的流。
客户端从流中读取数据时,也会根据需要询问NameNode来检索下一批数据块的DataNode位置信息。
一旦客户端完成读取,就对FSDataInputStream调用close()方法
High Available
Federation
高可用(High Available)
HDFS的HA,指的是在一个集群中存在多个NameNode,分别运行在独立的物理节点上。在任何时间点,只有一个NameNode是处于Active状态,其它的是处于Standby状态。 Active NameNode(简写为Active NN)负责所有的客户端的操作,而Standby NameNode(简写为Standby NN)用来同步Active NameNode的状态信息,以提供快速的故障恢复能力。
为了保证Active NN与Standby NN节点状态同步,即元数据保持一致。除了DataNode需要向这些NameNode发送block位置信息外,还构建了一组独立的守护进程”JournalNodes”(简写为JN),用来同步Edits信息。当Active NN执行任何有关命名空间的修改,它需要持久化到一半以上的JNs上。而Standby NN负责观察JNs的变化,读取从Active NN发送过来的Edits信息,并更新自己内部的命名空间。一旦Active NN遇到错误,Standby NN需要保证从JNs中读出了全部的Edits,然后切换成Active状态,如果有多个Standby NN,还会涉及到选主的操作,选择一个切换为Active 状态。
需要注意一点,为了保证Active NN与Standby NN节点状态同步,即元数据保持一致
这里的元数据包含两块,一个是静态的,一个是动态的
静态的是fsimage和edits,其实fsimage是由edits文件合并生成的,所以只需要保证edits文件内容的一致性。这个就是需要保证多个NameNode中edits文件内容的事务性同步。这块的工作是由JournalNodes集群进行同步的
动态数据是指block和DataNode节点的信息,这个如何保证呢? 当DataNode启动的时候,上报数据信息的时候需要向每个NameNode都上报一份。 这样就可以保证多个NameNode的元数据信息都一样了,当一个NameNode down掉以后,立刻从Standby NN中选择一个进行接管,没有影响,因为每个NameNode 的元数据时刻都是同步的。
注意:使用HA的时候,不能启动SecondaryNameNode,会出错。 之前是SecondaryNameNode负责合并edits到fsimage文件 那么现在这个工作被standby NN负责了。
NameNode 切换可以自动切换,也可以手工切换,如果想要实现自动切换,需要使用到zookeeper集群。
使用zookeeper集群自动切换的原理是这样的
当多个NameNode 启动的时候会向zookeeper中注册一个临时节点,当NameNode挂掉的时候,这个临时节点也就消失了,这属于zookeeper的特性,这个时候,zookeeper就会有一个watcher监视器监视到,就知道这个节点down掉了,然后会选择一个节点转为Active,把down掉的节点转为Standby
高扩展(Federation)
HDFS Federation可以解决单一命名空间存在的问题,使用多个NameNode,每个NameNode负责一个命令空间
这种设计可提供以下特性:
如果真用到了Federation,一般也会和前面我们讲的HA结合起来使用,来看这个图
1. 如何对付大数据处理场景
2. 构建抽象编程模型
MapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型。
map: 对一组数据元素进行某种重复式的处理
MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:
map: (k1; v1) → (k2; v2)
reduce: (k2; [v2]) → (k3; v3)
通过以上两个编程接口,大家可以看出MapReduce处理的数据类型是键值对。
3. 统一架构、隐藏底层细节
分布式计算概念
MapReduce
产生背景
特点
易于编程
MapReduce框架提供了用于二次开发的接口;简单地实现一些接口,就可以完成一个分布式程序。任务计算交给计算 框架去处理,将分布式程序部署到hadoop集群上运行,集群节点可以扩展到成百上千个等。
良好的扩展性
当计算机资源不能得到满足的时候,可以通过增加机器来扩展它的计算能力。基于MapReduce的分布式计算得特点可 以随节点数目增长保持近似于线性的增长,这个特点是MapReduce处理海量数据的关键,通过将计算节点增至几百或 者几千可以很容易地处理数百TB甚至PB级别的离线数据。
高容错性
Hadoop集群是分布式搭建和部署得,任何单一机器节点宕机了,它可以把上面的计算任务转移到另一个节点上运行, 不影响整个作业任务得完成,过程完全是由Hadoop内部完成的。
适合海量数据的离线处理
可以处理GB、TB和PB级别得数据量
局限性
MapReduce虽然有很多的优势,也有相对得局限性,局限性不代表不能做,而是在有些场景下实现的效果比较差,并 不适合用MapReduce来处理,主要表现在以下结果方面:
实时计算性能差
MapReduce主要应用于离线作业,无法作到秒级或者是亚秒级得数据响应。
不能进行流式计算
流式计算特点是数据是源源不断得计算,并且数据是动态的;而MapReduce作为一个离线计算框架,主要是针对静态 数据集得,数据是不能动态变化得。
实例进程
一个完整的MapReduce程序在分布式运行时有三类
阶段组成
数据类型
示例说明
/XXX/hadoop-XXX/share/hadoop/mapreduce/
hadoop-mapreduce-examples-3.3.0.jar
[hadoop jar|yarn jar] hadoop-mapreduce-examples-XXX.jar args…
1. 评估圆周率的值
蒙特卡洛方法
运行MapReduce程序评估一下圆周率的值,执行中可以去YARN页面上观察程序的执行的情况。
第一个参数:pi表示MapReduce程序执行圆周率计算任务
第二个参数:用于指定map阶段运行的任务task次数,并发度,这里是10
第三个参数:用于指定每个map任务取样的个数,这里是50。
/opt/hadoop-3.3.0/share/hadoop/mapreduce# hadoop jar hadoop-mapreduce-examples-3.3.0.jar pi 10 50
2. WordCount单词词频统计
WordCount中文叫做单词统计、词频统计,指的是统计指定文件中,每个单词出现的总次数
实现思路
map阶段的核心:把输入的数据经过切割,全部标记1,因此输出就是<单词,1>
shuffle阶段核心:经过MR程序内部自带默认的排序分组等功能,把key相同的单词会作为一组数据构成新的kv对
reduce阶段核心:处理shuffle完的一组数据,该组数据就是该单词所有的键值对。对所有的1进行累加求和,就是 单词的总次数
程序提交
自己随便写个文本文件1.txt
到HDFS文件系统的/input
目录下,如果没有这个目录,使用shell创建
hadoop fs -mkdir /input
hadoop fs -put 1.txt /input
准备好之后,执行官方MapReduce实例,对上述文件进行单词次数统计
hadoop jar hadoop-mapreduce-examples-3.3.0.jar wordcount /input /output
流程图
Map执行过程
第一阶段:把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。
默认Split size = Block size(128M)
,每一个切片由一个MapTask处理。(getSplits)
第二阶段:对切片中的数据按照一定的规则读取解析返回键值对
默认是按行读取数据。key是每一行的起始位置偏移量,value是本行的文本内容。(TextInputFormat)
第三阶段:调用Mapper类中的map方法处理数据。
每读取解析出来的一个<key, value>
,调用一次map方法
第四阶段:按照一定的规则对Map输出的键值对进行分区partition。默认不分区,因为只有一个reducetask。 分区的数量就是reducetask运行的数量。 (分区方法默认是hash求余法)
第五阶段:Map输出数据写入内存缓冲区,达到比例溢出到磁盘上。溢出spill的时候根据key进行排序sort。 默认根据key字典序排序。
第六阶段:对所有溢出文件进行最终的merge合并,成为一个文件。
Reduce执行过程
shuffle
Map端Shuffle
一个map最后只会产生一个文件
Reducer端Shuffle
Shuffle机制弊端
需要开启YARN的日志聚合功能,把散落在NodeManager节点上的日志统一收集管理,方便查看日志
bin\mapred --daemion start historyserver
yarn logs -applicationId <ID>
在命令行中ctrl+c无法停止程序,因为程序已经提交到Hadoop集群运行 了
yarn application -kill <ID>
功能
概述
yarn.nodemanager.resource.memory-mb
:单节点可分配的物理内存总量,默认是8MB*1024,即8Gyarn.nodemanager.resource.cpu-vcores
:单节点可分配的虚拟CPU个数默认是8架构图
client
container 容器(资源的抽象):容器之间逻辑上隔离的
YARN三大组件
ResourceManager(RM)
NodeManager(NM)
ApplicationMaster (App Mstr) (AM)
核心交互流程
整体概述
当用户向 YARN 中提交一个应用程序后, YARN将分两个阶段运行该应用程序 。
MR提交YARN交互流程
在理想情况下,应用程序提出的请求将立即得到YARN批准。但是实际中,资源是有限的,并且在繁忙的群集上, 应用程序通常将需要等待其某些请求得到满足。YARN调度程序的工作是根据一些定义的策略为应用程序分配资源
在YARN中,负责给应用分配资源的就是Scheduler,它是ResourceManager的核心组件之一。
Scheduler完全专用于调度作业,它无法跟踪应用程序的状态。
一般而言,调度是一个难题,并且没有一个“最佳”策略,为此,YARN提供了多种调度器和可配置的策略供选择
调度器策略
优势
劣势
FIFO Schedule的多队列版本
资源队列划分
优势
层次化的队列设计(Hierarchical Queues)
容量保证(Capacity Guarantees)
安全(Security)
弹性分配(Elasticity)
如何理解公平共享
优势
修改hadoop文件中 etc/hadoop/capacity-scheduler.xml
下面增加了两个队列online和offline,将以下内容添加进去,而不是覆盖。
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,online,offline</value>
<description>队列列表,多个队列之间使用逗号分割</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>70</value>
<description>default队列70%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.online.capacity</name>
<value>10</value>
<description>online队列10%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.offline.capacity</name>
<value>20</value>
<description>offline队列20%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>70</value>
<description>Default队列可使用的资源上限.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.online.maximum-capacity</name>
<value>10</value>
<description>online队列可使用的资源上限.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.offline.maximum-capacity</name>
<value>20</value>
<description>offline队列可使用的资源上限.</description>
</property>
为了提高磁盘IO性能,Hadoop弃用了java中的序列化,自己编写了writable实现类
注意:
Hadoop序列化机制的特点
Java序列化的不足
源码
getSplits
: 对文件进行分区
createRecordReader
: 将InputSplit
中的数据解析成Record,即<k1, v1>
public abstract class InputFormat
/**
Logically split the set of input files for the job.
*
Each {@link InputSplit} is then assigned to an individual {@link Mapper}
for processing.
Note: The split is a logical split of the inputs and the
input files are not physically split into chunks. For e.g. a split could
be <input-file-path, start, offset> tuple. The InputFormat
also creates the {@link RecordReader} to read the {@link InputSplit}.
*
@param context job configuration.
@return an array of {@link InputSplit}s for the job.
*/
public abstract
List
) throws IOException, InterruptedException;
/**
Create a record reader for a given split. The framework will call
{@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
the split is used.
@param split the split to be read
@param context the information about the task
@return a new record reader
@throws IOException
@throws InterruptedException
*/
public abstract
RecordReader
TaskAttemptContext context
) throws IOException,
InterruptedException;
}
仅对FileInputFormat源码进行分析
源码注解(Hadoop3.3.0)
/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
/*
getFormatMinSplitSize() = 1
getMinSplitSize(job) = 0
minSize = 1
*/
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
/*
没有默认值
getMaxSplitSize(job) = Long.MAX_VALUE
所以maxSize等于Long的最大值
*/
long maxSize = getMaxSplitSize(job);
// generate splits
// 创建List,总部内保存生成的InputSplit
List<InputSplit> splits = new ArrayList<InputSplit>();
// 获取输入文件列表
List<FileStatus> files = listStatus(job);
// ignoreDirs = false
boolean ignoreDirs = !getInputDirRecursive(job)
&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
// 迭代输入文件列表
for (FileStatus file: files) {
// 是否忽略子目录,默认不忽略
if (ignoreDirs && file.isDirectory()) {
continue;
}
// 获取 文件/目录 路径
Path path = file.getPath();
// 获取 文件/目录 长度
long length = file.getLen();
if (length != 0) {
// 保存文件的Block块所在的位置
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 判断文件是否支持切割,默认为true
if (isSplitable(job, path)) {
// 获取文件的Block大小,默认128M
long blockSize = file.getBlockSize();
// 计算split的大小
/*
内部使用的公式是: Math.max(minSize, Math.min(maxSize, blockSize))
Math.max(1, Math.min(Long.MAX_VALUE, 128))
默认情况下split的大小和Block size相等
*/
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
// 还需要处理的文件剩余字节大小,其实就是这个文件的原始大小
long bytesRemaining = length;
/*
SPLIT_SLOP = 1.1
只要剩余文件大于1.1倍的分区size就继续切割
*/
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// 获取block的索引
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
/*
组装InputSplit
path: 路径
length-bytesRemaining 起始位置
splitSize 大小
blkLocations[blkIndex].getHosts() 和 blkLocations[blkIndex].getCachedHosts() 所在的主机
*/
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
// 不支持切割
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
// 整个作为一个InputSplit
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
InputSplit
都有一个RecordReader,作用是把InputSplit中的数据解析成Record,即<k1, v1>
行阅读器的初始化方法源码
// 初始化方法
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
// 获取传过来的InputSplit,将InputSplit转换成子类FileSplit
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
// MAX_LINE_LENGTH对应的参数默认没有配置,所以会取Integer.MAX_VALUE
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
// 获取InputSplit的起始位置
start = split.getStart();
// 获取InputSplit的结束位置
end = start + split.getLength();
// 获取InputSplit的路径
final Path file = split.getPath();
// open the file and seek to the start of the split
// 打开文件,并跳到InputSplit的起始位置
final FutureDataInputStreamBuilder builder =
file.getFileSystem(job).openFile(file);
FutureIOSupport.propagateOptions(builder, job,
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
fileIn = FutureIOSupport.awaitFuture(builder.build());
// 获取文件的压缩信息
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
// 如果文件是压缩文件,则执行if中的语句
if (null!=codec) {
//... 省略代码
} else {
// 跳转到文件的起始位置
fileIn.seek(start);
// 针对未压缩文件,创建一个阅读器读取一行行的数据
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
/*
注意:如果这个InputSplit不是第一个InputSplit,我们将会丢弃读取出来的第一行
因为我们总是通过next方法多读取一行
因此,如果一行数据被拆分到了两个InputSplit中,不会产生问题。
*/
// 如果start不等于0,表示不是第一个inputsplit,所以把start的值重置为第二行的起始位置
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
SequenceFile
<key, value>
对序列化到文件中代码实现
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import java.io.File;
/*
small files
*/
public class SmallFileSeq {
public static void main(String[] args) throws Exception {
write("/root/smallfiles", "/seqFile");
read("/seqFile");
}
/**
* 生成SequenceFile文件
* @param inputDir 本地文件
* @param outputFile hdfs文件
* @throws Exception
*/
private static void write(String inputDir,String outputFile) throws Exception {
// 创建一个配置对象
Configuration conf = new Configuration();
// 指定hdfs路径
conf.set("fs.defaultFS", "hdfs://node1:8020");
// 删除输出文件
FileSystem fileSystem = FileSystem.get(conf);
fileSystem.delete(new Path(outputFile), true);
// 三个元素:输出路径、key的类型、value的类型
SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
SequenceFile.Writer.file(new Path(outputFile)),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class)
};
// 创建一个writer实例
SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts);
// 指定需要压缩的文件的目录
File inputDirPath = new File(inputDir);
if (inputDirPath.isDirectory()) {
// 获取目录中的文件
File[] files = inputDirPath.listFiles();
assert files != null;
for (File file : files) {
// 获取文件的全部内存
String content = FileUtils.readFileToString(file, "UTF-8");
// 获取文件名
String fileName = file.getName();
Text key = new Text(fileName);
Text value = new Text(content);
// 写入数据
writer.append(key, value);
}
}
writer.close();
}
/**
* 读取SequenceFile文件
* @param inputFile
* @throws Exception
*/
private static void read(String inputFile) throws Exception {
// 创建一个配置对象
Configuration conf = new Configuration();
// 指定hdfs路径
conf.set("fs.defaultFS", "hdfs://node1:8020");
// 创建阅读器
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));
Text key = new Text();
Text value = new Text();
while (reader.next(key, value)) {
System.out.println("文件名:" + key.toString() + ",");
System.out.println("文件内容:\n" + value.toString() + ".");
}
reader.close();
}
}
MapFile
SequenceFile文件是用来存储key-value数据的,但它并不保证这些存储的key-value是有序的,而MapFile文件则可以看做是存储有序key-value的SequenceFile文件。MapFile文件保证key-value的有序(基于key)是通过每一次写入key-value时的检查机制,这种检查机制其实很简单,就是保证当前正要写入的key-value与上一个刚写入的key-value符合设定的顺序,但是,这种有序是由用户来保证的,一旦写入的key-value不符合key的非递减顺序,则会直接报错而不是自动的去对输入的key-value排序
代码实例
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import java.io.File;
/*
small files
*/
public class SmallFilemap {
public static void main(String[] args) throws Exception {
write("/root/smallfiles", "/mapFile");
read("/mapFile");
}
/**
* 生成MapFile文件
* @param inputDir 本地目录
* @param outputDir hdfs目录
* @throws Exception
*/
private static void write(String inputDir,String outputDir) throws Exception {
// 创建一个配置对象
Configuration conf = new Configuration();
// 指定hdfs路径
conf.set("fs.defaultFS", "hdfs://node1:8020");
// 删除输出文件
FileSystem fileSystem = FileSystem.get(conf);
fileSystem.delete(new Path(outputDir), true);
// 两个元素:key的类型、value的类型
SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
MapFile.Writer.keyClass(Text.class),
MapFile.Writer.valueClass(Text.class)
};
// 创建一个writer实例
MapFile.Writer writer = new MapFile.Writer(conf, new Path(outputDir), opts);
// 指定需要压缩的文件的目录
File inputDirPath = new File(inputDir);
if (inputDirPath.isDirectory()) {
// 获取目录中的文件
File[] files = inputDirPath.listFiles();
for (File file : files) {
// 获取文件的全部内存
String content = FileUtils.readFileToString(file, "UTF-8");
// 获取文件名
String fileName = file.getName();
Text key = new Text(fileName);
Text value = new Text(content);
// 写入数据
writer.append(key, value);
}
}
writer.close();
}
/**
* 读取MapFile文件
* @param inputDir MapFile文件路径
* @throws Exception
*/
private static void read(String inputDir)throws Exception{
// 创建一个配置对象
Configuration conf = new Configuration();
// 指定hdfs路径
conf.set("fs.defaultFS", "hdfs://node1:8020");
//创建阅读器
MapFile.Reader reader = new MapFile.Reader(new Path(inputDir),conf);
Text key = new Text();
Text value = new Text();
//循环读取数据
while(reader.next(key,value)){
//输出文件名称
System.out.print("文件名:"+key.toString()+",");
//输出文件内容
System.out.println("文件内容:"+value.toString()+"");
}
reader.close();
}
}
MapReduce程序执行时,Reduce节点大部分执行完毕,但是有一个或者几个Reduce节点运行很慢,导致整个程序处理时间变得很长,具体表现为:Reduce阶段卡着不动
解决方法
手机扫一扫
移动阅读更方便
你可能感兴趣的文章