hadoop 学习笔记二
阅读原文时间:2023年07月11日阅读:2
NameNode的持久化(persistent)(day4,1)

类似于:Redis redis中的持久化文件是相互独立的当两个持久化文件同时存在时默认使用的是aof ,但是namenode 的持久化文件是相互配合的。

checkpint (初始化保存点) 格式化hdfs集群时(bin/hdfs -namenode format) -->生成一个新的FsImage 每一次从新启动hdfs集群时,hdfs 会把editsLog中的数据整合到Fsimage中,

又因为持久化的过程中会在checkpoint 后将数据保存到editsLog中,但是如果服务器一直启动那么ditsLog就会一直写,会很大,为了减少editsLog中文件过大采取的其中一种方法就是,再整合的时候

会将整合的内容产生一个新的FsImage 然后nameNode中的数据也会开始写在一个新的EditsLog文件中,那么旧版本的EditsLog和FsImage并不会被删除掉。

NameNode在运行时,把重要的元数据放置在内存中,如果内存出现问题,则元数据丢失,为了保证元数据安全,NameNode有对应的持久化机制,把元数据持久化到硬盘存储。
而持久化的数据放哪,类似于redis将持久化的文件放在rdb aof , 而namenode 将持久化的文件放在EditsLog (可编辑日志 ,二进制)和FsImage(文件系统镜像,二进制)中
EditsLog:可编辑日志,文件中存放的是二进制 保存的是(checkpoint)之后的用户所有的写操作日志
FsImage:文件系统镜像,文件中存放的也是二进制,某一个时间点(checkpoint)的namenode镜像数据。

对于editsLog 和FSImage这两个可以和redid 对比着记,redis是 rdb aof 各自分离的,但是FSImage 和EditsLog 确实相互合作的editsLog 中记录了系统在运行过程中的素有操作信息,FSImage中存储了

整个HDFS文件系统的所有目录和文件的信息。对于文件来说包括了数据块描述信息、修改时间、访问时间等;对于目录来说包括修改时间、访问权限控制信息(目录所属用户,所在组)等。 主要是记录edits操作产生的日志信息。如果edits什么也没有做,它也会不断的产生信息.

FSimage是NameNode的元数据存储快照

1. FSImage和EditsLog存储位置

#fsImage默认存储位置 /opt/install/hadoop-2.5.2/data/tmp/dfs/name
dfs.namenode.name.dir
#editslog默认存储位置
dfs.namenode.edits.dir

2.定制FSImage和EditsLog的存储位置

hdfs-site.xml
dfs.namenode.name.dir file:///xxx/xxxx
dfs.namenode.edits.dir file:///xxx/xxxx

3. 安全模式(safe mode)

每一次启动namenode时,hdfs都需要进行FSImage和EditsLog的整合,在这个过程中,不允许用户做写操作,把这个过程称之为安全模式(safe mode),默认30秒

*NameNode重启的时候先进行FSImage和EditsLog的合并 然后,NameNode从FSImage和Edits文件中读取数据,加载到内存中。*

1. safe mode相关命令

bin/hdfs dfsadmin -safemode [enter leave get]

2. HDFS集群启动时,完成流程

HDFS集群启动 过程 (安全模式)
1,整合 FSImage和EditsLog 生成新的EditsLog 和 FSImage,由新EditsLog接收用户写操作命令
2, DataNode都需NameNode主动汇报健康情况(心跳)3秒
3, 汇报块列表 通过校验和 检查块是否可用,并定期1小时汇报。

我很喜欢这篇博客讲的很符合我理解的方式:(http://www.sohu.com/a/300240640_315839
1.NameNode: NameNode是一个中心服务器,负责管理文件系统的名字空间以及客户端的访问,比如文件的打卡、关闭、重命名文件或者目录。它负责确定数据块到具体的存储节点的映射。在其同意调度下进行数据块的创建、删除、复制。

2.DataNode: DataNode是HDFS的实际存储节点,负责管理它所在节点的存储;客户端的读写请求。并且定期上报心跳和块的存储位置。

3.Block: HDFS上文件,从其内部看,一个文件其实是被分成一个或者多个数据块存储的,这些数据块存储在一组DataNode上。

4.Edits: 在HDFS发起的创建、删除等操作其实是一个事物,事物在NameNode上以Edit对象存储在edits文件中,持久化在NameNode的本地磁盘上

5.FSimage: FSimage是NameNode的元数据存储快照,持久化在NameNode的本地磁盘上。

1.客户端操作HDFS上的文件时,向NameNode获取文件的元数据信息,NameNode返回文件的块存储位置,Client选择块存储位置最近的节点进行块操作。通常优先级是本机>本机柜>其他机柜的节点。数据块的分布式通常是在同一机架的两个节点存储两份,为了避免单个机架的故障导致数据块丢失,会选择在另外一个机架上的节点存储一份。如果把数据存储在三个不同的机架上,由于不同机架之间通过交换机进行数据交换,网络速度会比单机架慢,因此复制数据也会慢,此外,还会增加机架之间的交换机的压力

2.DataNode除了负责客户端的读写操作外,还需要定期的向NameNode的active和standby做心跳汇报,如果有DataNode的心跳异常,被确定为死的节点,NameNode将会对存储在该节点的数据进行复制,保证数据块的数据块的副本数。DataNode除了心跳还会将本节点的数据块上报给NameNode的active和standby

SecondaryNameNode【了解】

产生的背景:

1. 尽管已经对namenode做了持久化但是如果namenode宕机了虽然数据没有丢是但是服务却停止了

2. 如何保证即使namenode宕机服务也不停止就是要做集群所以此时SecondNamenode 诞生,

3.  如果namenode宕机了又有了secondNamenode 但是又如何切换呢?

4. secondaryname又是如何同步namenode中的数据呢?

5. 如果当服务启动的时候开始使用EdittsLog进行写操作当hdfs这时才进行FSImage和EditsLog的整合但是如果服务是第一次启动,并且服务一旦启动就不会停那么FsImage

就会一直是空的,而且如果editsLog中充满着很多无意义的操作  又是怎么解决这个问题。

6. 如果服务是频繁的开启那么日志文件EditsLog会有很多,又怎么处理的这个问题

7. 如果namenode整合FsImage和EditsLog那么就会降低效率

SecondaryNameNode是NameNode的助手

作用:1. 定期拉去FsImage和EditsLog中的数据(默认是1小时 或者在1小时内执行了事务1百万次)

他会将FsImage和EditLog拉去出来合并后生成一个新的FsImage,整合完成后再将这个新的FsImage替换掉老版本的FsImage周而复始这个过程

hadoop1的时候是:如果出现一个极限情况,内存出现问题了,硬盘也出现问题了,那么就可以部分还原nameNode的数据因为SecondaryNameNode的数据默认是1小时拉取数据

hadoop2的时候已经不用这种方法了,可以搭建集群了。

解决方案:

将在SecondaryNameNode 合并后的FSImage文件替换掉NameNode中的FsImage文件

自定义SecondaryNameNode 拉取数据的周期
hdfs-site.xml
dfs.namenode.checkpoint.period 3600秒

dfs.namenode.checkpoint.txns 1000000

secondaryNameNode启动方式:sbin/start-dfs.sh 或者
sbin/hadoop-daemon.sh start secondarynamenode

定制secondaryNameNode 启动的节点不想让SecondaryNameNode和NameNode一起启动
hdfs-site.xml
dfs.namenode.secondary.http-address 0.0.0.0:50090
dfs.namenode.secondary.https-address 0.0.0.0:50091

HANameNode集群(Hadoop NameNode 集群)

NameNode 设置的是双主  一个的状态是Active  一个的状态是Standy

如果active这个主结点宕机后会切换到standy 这个主节点注意:在切换的过程中一定会有卡顿的但是短时间的卡顿客户端是感受不到的。

还有一个问题两个主结点一定是搭建在两条服务器上的那么他们的IP  端口号不相同,而我们的core-site.xml中是要指定主结点的IP 和端口号的

这个时候怎么切换呢,到目前为止有两种做法一种是IP漂移 一旦我发现NAmeNode不可用了那么就将Standy的IP改变成Active的IP

第二种就是虚拟IP  我们访问第三方的IP 由第三方的IP来决定如果active宕机后路由到哪去。但是这种方式还有一个问题就是我哪知道主机有没有问题

也是采用将两台主结点发送心跳,如果不发送心跳了就代表出现了问题。但是还有一个问题就是如果这个虚拟IP出现了问题呢,这就使用了zookeeper

这个虚拟IP就是用了zookeeper 因为zookeeper 也搭建了集群。

===============================================================================================================

HANameNode 的与案例分析

在切换主备的时候要注意数据的同步两个nameNode 要拥有相同的数据

这里standBynamenode起到了两个作用一个是secondarynamenode 定期整合fsimage 和editsLog的内容和备选主节点 的作用

脑裂:就是指:如果nameNode 由于网络延迟没能在有效期内发送心跳包,那么就有一种可能存在两个nameNode 这就是脑裂,这时不管你是真宕机还是假宕机那么我就是让你宕机

使用ssh 免密登录到你的服务器上执行脚本杀掉你。 这就是解决脑裂的方法

zookeeper 是如何选主的。

HANameNode集群的搭建

1. zookeeper集群
1.1 解压缩
tar -zxvf zookeeper-xxx-tar.gz -C /opt/install 1.2 创建数据文件夹
** mdkir zookeeper_home(zookeeper的安装目录)/data* 1.3 conf目录修改zookeeper的配置文件 ** 修改zoo_sample.cfg 为 zoo.cfg mv zoo_sample.cfg zoo.cfg 编辑内容 dataDir*=/opt/install/zookeeper-3.4.5/data

   server.0=hadoop2.baizhiedu.com:2888:3888  
   server.1=hadoop3.baizhiedu.com:2888:3888  
   server.2=hadoop4.baizhiedu.com:2888:3888**  

1.4 zookeeper_home/data
在data下建立一个文件myid
第一台服务器:myid文件 输入 0 对应 hadoop2.baizhiedu.com 代表的是hadoop2这个服务 第二台服务器:myid文件 输入 1 对应 hadoop3.baizhiedu.com 代表的是hadoop3这个服务 第三台服务器:myid文件 输入 2 对应 hadoop4.baizhiedu.com 代表的死hadoop4这个服务 1.5 scp -r 命令 同步集群中所有节点 并 修改对应的myid文件
 scp -r zookeeper-3.4.5/ root@hadoop3.zhulifei.com:/opt/install/
 scp -r zookeeper-3.4.5/ root@hadoop4.zhulifei.com:/opt/install/
zookeeper 选取主节点的原理:
假设有三台服务:当第一台启动的时候自认为是主节点这时剩下的不给他投票,第二台启动说编号比第一台大给自己投一票第一台认可也投一票
第三台服务启动,编号最大给自己投一票但是现在没有服务为他投票所以第一次充当namenode的往往是中间那一台。

1.6 主节点 ssh 其他节点
1.7 启动zk服务
** bin****/zkServer.sh start | stop | restart
bin/zkServer.sh status 查看集群状态 【必须集群完整启动完成】

   bin/zkCli.sh \[leader\]   访问必须要在leader这个节点执行  
     zookeeper是一个树状结构** 2\. HA-HDFS集群  

** 清空 data**/tmp/ 目录下的内容 2.1 core-site.xml
fs.defaultFS hdfs://ns //ns 这个名字可以随便起 hadoop.tmp.dir /opt/install/hadoop-2.5.2/data/tmp ha.zookeeper.quorum hadoop2.baizhiedu.com:2181,hadoop3.baizhiedu.com:2181,hadoop4.baizhiedu.com:2181
2.2 hdfs-site.xml
** dfs.permissions.enabled false

  <!--指定hdfs的nameservice为ns,需要和core-site.xml中的保持一致 -->  
  <property>  
      <name>dfs.nameservices</name>  
      <value>ns</value>  
  </property>  
  <!-- ns下面有两个NameNode,分别是nn1,nn2 -->  
  <property>  
      <name>dfs.ha.namenodes.ns</name>  
      <value>nn1,nn2</value>  
  </property>  


dfs.namenode.rpc-address.ns.nn1 hadoop2.baizhiedu.com:8020

dfs.namenode.http-address.ns.nn1 hadoop2.baizhiedu.com:50070


dfs.namenode.rpc-address.ns.nn2 hadoop3.baizhiedu.com:8020

dfs.namenode.http-address.ns.nn2 hadoop3.baizhiedu.com:50070

<!-- 指定NameNode的元数据在JournalNode上的存放位置 -->  
<property>  
    <name>dfs.namenode.shared.edits.dir</name>  
    <value>qjournal://hadoop2.baizhiedu.com:8485;hadoop3.baizhiedu.com:8485;hadoop4.baizhiedu.com:8485/ns</value>  
</property>  


dfs.journalnode.edits.dir /opt/install/hadoop-2.5.2/journal

dfs.ha.automatic-failover.enabled true

dfs.client.failover.proxy.provider.ns org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

dfs.ha.fencing.methods sshfence

dfs.ha.fencing.ssh.private-key-files /root/.ssh/id_rsa **
配置hadoop2这台后不要忘记了还有两台也要配置
scp -r etc/hadoop/ root@hadoop3.zhulifei.com:/opt/install/hadoop-2.5.2/etc/ 
scp -r etc/hadoop/ root@hadoop4.zhulifei.com:/opt/install/hadoop-2.5.2/etc/

2.3 yarn-env.sh添加如下内容
export JAVA_HOME=/usr/java/jdk1.7.0_71

** 首先启动各个节点的Zookeeper,在各个节点上执行以下命令:
bin****/zkServer.sh start

在某一个namenode节点执行如下命令,创建命名空间  
bin/hdfs zkfc -formatZK  (主节点hadoop2)

在每个journalnode节点用如下命令启动journalnode  
sbin/hadoop-daemon.sh start journalnode    (journalnode同步EditsLog 三个hadoop 都要启动部署在zk中的namenode)

在主namenode节点格式化namenode和journalnode目录  
bin/hdfs namenode -format ns

在主namenode节点启动namenode进程  
sbin/hadoop-daemon.sh start namenode

在备namenode节点执行第一行命令,这个是把备namenode节点的目录格式化并把元数据从主namenode节点copy过来,并且这个命令不会把journalnode目录再格式化了!然后用第二个命令启动备namenode进程!

bin/hdfs namenode -bootstrapStandby     (hadoop3)  
sbin/hadoop-daemon.sh start namenode

在两个namenode节点都执行以下命令  
sbin/hadoop-daemon.sh start zkfc

在所有datanode节点都执行以下命令启动datanode  
sbin/hadoop-daemon.sh start datanode

日常启停命令  
sbin/start-dfs.sh  
sbin/stop-dfs.sh**

Hadoop源码编译:(daty4,5)

apache官方网站上提供的二进制文件,是基于32为操作系统进行编译的,不适合与64位操作系统,自己编译 

1.准备工作

我在一台纯净版的虚拟机上安装的

2. 把他们解压

安装jdk

1. 配置JDK环境变量

vi .bash_profile
/etc/profile

  1. source  /etc/profile   或者 source .bash_profile 来生效一下

使用javac -version  或者java -version来检测是否安装成功

2. 安装需要的依赖工具

安装Linux 系统包
yum install wget
yum install autoconf automake libtool cmake
yum install ncurses-devel
yum install openssl-devel
yum install lzo-devel zlib-devel gcc gcc-c++

3. 下载: protobuf-2.5.0.tar.gz

解压:tar -zxvf protobuf-2.5.0.tar.gz

编译安装:

进入安装目录,进行配置,执行命令   ./configure

安装命令:

     make  
     make check  
     make install

4. 安装Maven

解压:maven

修改环境变量  etc/profile

在maven 的setting.xml 的配置文件中加入镜像操作

   <mirror>  
        <id>alimaven</id>  
        <mirrorOf>central</mirrorOf>  
        <name>aliyun maven</name>  
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>  
    </mirror>

通过mvn 验证maven是否安装成功

5. 安装   findbugs

下载: findbugs-1.3.9.tar.gz

解压: tar –zxvf findbugs-1.3.9.tar.gz

设置环境变量(/etc/profile):

加在文件末尾就行

export FINDBUGS_HOME=/opt/modules/findbugs-1.3.9
export PATH=$PATH:$FINDBUGS_HOME/bin
执行命令:source /etc/profile
验证:findbugs -version

在maven目录下执行:export MAVEN_OPTS="-Xms256m -Xmx512m"    设置maven的内存

在hadoop的源码包中执行:mvn package -DskipTests -Pdist,native -Dtar 执行在Hadoop2.5.0的源码包中\

获得编译好的hadoop   :

hadoop_src_home/hadoop-dist/target

执行成功的截图,如果你没有下载成功那么久多执行几次:mvn package -DskipTests -Pdist,native** *-Dtar*

如果好几次都没有成功那么就把maven重新装一遍 或者换一个低版本的maven

把自己编译的lib包 替换hadoop二进制 lib 包     设置HADOOP_HOME环境变量

当你整个过程结束后在你的/root/.m2  目录下会产生一个repository 里面都是你下载好的依赖

在你的hadoop源码包中会有一个target/hadoop-2.5.2.tar.gz

总结出现的问题:

1. maven版本过高

  1. 错误
    [ERROR] Failed to execute goal org.apache.maven.plugins:maven-antrun-plugin:1.7:run (dist) on project hadoop-hdfs-httpfs:
    An Ant BuildException has occured: exec returned: 2
    [ERROR] around Ant part …… @ 10:134 in /home/pory/workplace/hadoop-2.4.1-src/hadoop-hdfs-project/hadoop-hdfs-httpfs/target/antrun/build-main.xml
    [ERROR] -> [Help 1]

这是因为/home/pory/workplace/hadoop-2.4.1-src/hadoop-hdfs-project/hadoop-hdfs-httpfs/downloads目录下的文件没有下载完全,
可以手动下一份匹配版本的文件放在下面,在http://archive.apache.org/dist/tomcat/tomcat-6/v6.0.36/bin/

然后重新执行命令

编译完成后使用:

首先就是要上传hadoop的安装包,解压,

hadoop32位和64位最大的区别就是native这个文件夹的内容不同其他都一样

所以如果想要切换只需要改变native下的内容就可以了。

需要应用64为的hadoop替换32位hadoop /opt/install/hadoop-2.5.2/lib/native 的内容

替换一定在linux系统中直接替换。将我们上面编译好的文件内容来一次替换。

接下来就是修改hadoop的配置文件了

vim   hadoop-env.sh

1.1 hadoop-env.sh
exportJAVA_HOME=/usr/java/jdk1.7.0_71
1.2 core-site.xml


fs.defaultFS hdfs://hadoop.baidu.com:8020

命令:mkdir -p data/tmp hadoop.tmp.dir /opt/install/hadoop-2.5.2/data/tmp 1.3 hdfs-site.xml dfs.replication 1 1.4 mapred-site.xml mapreduce.framework.name yarn 1.5 yarn-site.xml yarn.nodemanager.aux-services mapreduce\_shuffle 1.6slaves 配置自己是告诉虚拟机我本机即做nameNOde 也做dataNode hadoop2.baizhiedu.com **然后就是namenode的格式化【第一次搭建hdfs集群时需要使用】**    目的作用:格式化hdfs系统,并且生成存储数据块的目录     **bin/hdfs  namenode  -format** **6.启动hadoop守护进程**       **启动:** **sbin/hadoop-daemon.sh   start    namenode** **sbin/hadoop-daemon.sh   start    datanode** **sbin/yarn-daemon.sh    start    resourcemanager** **sbin/yarn-daemon.sh   start  nodemanager** **停止:** **sbin/hadoop-daemon.sh  stop  namenode** **sbin/hadoop-daemon.sh   stop  datanode** **sbin/yarn-daemon.sh  stop   resourcemanager** **sbin/yarn-daemon.sh  stop   nodemanager** **7.测试验证安装成果** **ps -ef|grep  java** **jps查看相关4个进程** **通过网络进行访问测试** **浏览器http://hadoop2.zhulifei.com:50070   hdfs** **http://hadoop2.zhulifei.com:8088  yarn** ##### HDFS相关内容的总 **1.概念(原理)** ![](https://article.cdnof.com/2307/5faaf943-92de-4a70-88be-699755c376f8.png) **2. java 编码** 1\. HDFS shell 命令 2. Java API Configuration FileSystem IOUtils Path ##### MapReduce基本概念 1\. 什么是mapreduce ,谁能能够和mapreduce起到相同地 作用 spark **#后续 Spark完成类似的工作** MapReduce是Hadoop体系下的一种计算模型(计算框架),主要功能是用于操作,处理HDFS上的大数据级数据。 ![](https://article.cdnof.com/2307/4662a1f8-56b7-48a5-8688-246b50c517c7.png) ResourcesManager 是主  NodeManager是从 ResourcesManager 负责资源调度 按需分配,NodeManager 上报资源。 只要一台节点是DateNode 那么他一定是NodeManager ResourcesManager 还负责监控任务(map  reduce都是任务) 2.mapreduce的构建思想(结合上图) 因为数据运算是要加载进内存中进行运算而作为TB级的数量级大数据在大多数情况下是不被普及的尽管有TB级的内存但是仍旧不广泛使用 所以需要小数量级的内存才能进行运算,而每一个datenode 都由自己独立的cpu 所以就很容易想到局部运算在进行汇总的思想 ,所以在mapreduce中,map就是用于局部运算而 reduce就是进行汇总计算。然而下一个问题又出现了,每一个datanode的服务器型号或者大小可能不同,cpu的内核和线程也可能不同为了不造成资源的浪费,或者能够打破物理硬件上的局限性 便采用了逻辑上的整体资源分配数据在实际上的物理上的地址可能不是连续的但是在逻辑上是连续的,而所谓这个资源总体分配的总工程师就是resourcemanager ,他不仅可以负责资源的调度 而且还能顺便负责监控任务的作用,他为什么能够担负起监控map reduce 任务的作用呢,实际上主要是因为作为资源的总工程师是要知道资源的基本信息的,而这些基本信息是有每一个nodeManager 来进行上报的,这样在map 或者reduce在进行计算的时候如果出现了问题 ,nodemanager 上报,和resourcemanager就能够及时的知道。 3\. 启动yarn ![](https://article.cdnof.com/2307/0bce48dd-2681-4fdc-b170-9571b87505d0.png) 1\. 配置相关的配置文件 etc/hadoop yarn-site.xml mapred-site.xml 2. 启动yarn 2.1 伪分布式 sbin/yarn-daemon.sh start resourcemanager sbin/yarn-daemon.sh start nodemanager 2.2 集群方式 mapred-site.xml mapreduce.framework.name yarn yarn-site.xml yarn.nodemanager.aux-services mapreduce\_shuffle

【分布式环境新加】
yarn.resourcemanager.hostname hadoop12.baizhiedu.com
slaves
datanode同时又是nodemanager
同步集群的每一个节点
 scp -r hadoop root@hadoop12.zhulifei.com:/opt/install/hadoop-2.5.2/etc/
 scp -r hadoop root@hadoop13.zhulifei.com:/opt/install/hadoop-2.5.2/etc/

   正常启动hdfs  
        namenode格式化  
        sbin/start-dfs.sh  
   集群方式的yarn启动  
         建议 namenode 不要和 resourcemanager放置在同一个点  
         # ssh相关的机器,避免yes  
         在集群环境下,yarn启动的命令,需要在resourcemanager所在的节点执行  
         sbin/start-yarn.sh  
         sbin/stop-yarn.sh  
   验证:  
         jps看进程  
         http://hadoop12.baizhiedu.com:8088

Mapreduce 程序开发步骤:

Mapreduce第一个程序分析

MapReduce 第一个程序开发

1. 准备工作上传文件:

2. java代码需要的pom.xml

<dependency>  
  <groupId>org.apache.hadoop</groupId>  
  <artifactId>hadoop-common</artifactId>  
  <version>2.5.2</version>  
</dependency>

<dependency>  
  <groupId>org.apache.hadoop</groupId>  
  <artifactId>hadoop-hdfs</artifactId>  
  <version>2.5.2</version>  
</dependency>

<dependency>  
  <groupId>org.apache.hadoop</groupId>  
  <artifactId>hadoop-client</artifactId>  
  <version>2.5.2</version>  
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->  
<dependency>  
  <groupId>org.apache.hadoop</groupId>  
  <artifactId>hadoop-mapreduce-client-core</artifactId>  
  <version>2.5.2</version>  
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-common -->  
<dependency>  
  <groupId>org.apache.hadoop</groupId>  
  <artifactId>hadoop-yarn-common</artifactId>  
  <version>2.5.2</version>  
</dependency>

编码:

# MapReduce编程中 一定会涉及到序列化的操作 ,基本进行了封装
String ---- Text
int ---- IntWritable
Long -----LongWritable

打成jar,上传到yarn集群(resourcemanager)

bin/yarn jar hadoop-mr-baizhiedu.jar

hadoop实现一个job作业的的java代码:

/**
* Hadoop 自身提供一套可优化网络序列化传输的基本类型
* Integer IntWritable
* Long LongWritable
* String Text
*/
public class MyMapreduce {

/**
* LongWritable 偏移量 long,表示该行在文件中的位置,而不是行号
* Text map阶段的输入数据 一行文本信息 字符串类型 String
* Text map阶段的数据字符串类型 String
* IntWritable map阶段输出的value类型,对应java中的int型,表示行号
*/

// 构建map类
public static class Mymaper extends Mapper{
/*
key = k1 //行首字母偏移量 表示该行在文件中的位置,而不是行号
value = line 行数据
context 上下文对象
*/

@Override  
public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {  

// suns xiaohei
String line = v1.toString();
String[] words = line.split("\t"); //分词
for (String word : words) {
// suns 1
// xiaohei 1
Text k2 = new Text(word);
IntWritable v2 = new IntWritable(1);
//写出
context.write(k2,v2);
}
}
}
/**
* Text 数据类型:字符串类型 String
* IntWritable reduce阶段的输入类型 int
* Text reduce阶段的输出数据类型 String类型
* IntWritable 输出词频个数 Int型
* reduce函数的输入类型必须匹配map函数的输出类型。
*/
public static class MyReduce extends Reducer {
/**
* key 输入的 键
* value 输入的 值
* context 上下文对象,用于输出键值对
*/
@Override
public void reduce(Text k2, Iterable v2s, Context context) throws IOException,
InterruptedException {
int result = 0;
for (IntWritable v2 : v2s) {
result+=v2.get();
}
Text k3 = k2;
IntWritable v3 = new IntWritable(result);
context.write(k3,v3);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "MyFirstJob");
// 作业以jar包形式运行
job.setJarByClass(MyMapreduce.class);

// InputFormat
Path path = new Path("/src/data");
TextInputFormat.addInputPath(job,path);

// map
job.setMapperClass(Mymaper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// reduce
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 输出目录一定不能存在,有MR动态创建
Path out = new Path("/dest2");
FileSystem fileSystem = FileSystem.get(conf);
fileSystem.delete(out,true);
TextOutputFormat.setOutputPath(job,out);

// 运行job作业
job.waitForCompletion(true);
}
}

代码

当然,这个只是普通的java代码,如果在这运行那么我们在linux上搭建的环境也就没有意义了。

所以要把项目打成jar包放在linux上的jdk中

还要下载一个插件

Maven自定义骨架

程序员根据自己的需求,定义Maven Archetype(骨架),后续选择自定义的骨架,就可以把我们需要的pom,其他配置文件,代码的骨架,自动生成,简化开发与测试

1. 创建一个模板module

1. 引入相关jar的坐标
2. 创建Java代码

在本项目的根下:mvn --settings F:\apache-maven-3.3.9\conf\settings.xml archetype:create-from-project

2.复制骨架的坐标(便于后续的安装)

com.baizhiedu
hadoop-test-archetype
1.0-SNAPSHOT

3.安装骨架

cd target\generated-sources\archetype
mvn clean install

4.创建项目并引入骨架

需要指定骨架的坐标,来源第二步。

下面是一个简易图:

MapReduce程序的调试

1.建议MR代码中通过Log4进行调试

xxxxxxxxxx Logger logger = Logger(xxx.class);
logger.info()
通过上述操作 输出的结果,只能查看job的信息,而Map,Reduce的信息看不到。需要开启Yarn 历史日志 ,日志归档

2. yarn集群中如何开启历史日志,日志归档

1. 配置文件
** mapred**-site.xml

指定历史服务器所在的位置及端口

mapreduce.jobhistory.address hadoop11.zhulifei.com:10020
指定历史服务器所在的外部浏览器交互端口号及机器位置
mapreduce.jobhistory.webapp.address hadoop11.zhulifei.com:19888 yarn-site.xml 日志聚合 yarn.log-aggregation-enable true

yarn.log-aggregation.retain-seconds 604800
** 要进行同步和其他两台服务器同步到集群的每一个节点域名映射不用改**

  1. 启动进程
    sbin/mr-jobhistory-daemon.sh start historyserver 每个节点都要启动
    sbin/mr-jobhistory-daemon.sh stop historyserver

日志聚合:

实战:

应用shell脚本 解决

首先关闭日志聚合:

1. vim  etc/hadoop/yarn-env.sh  在 192.168.253.11 这台机器上进行操作

export YARN_LOG_DIR=~/logs/yarn
export YARN_PID_DIR=~/data/yarn

2 . 创建脚本

if [ $# -le 0 ]
then
echo 缺少参数
exit 1
fi

logtype=out

if [ $# -ge 1 ]
then
logtype=${2}
fi

for n in `cat /opt/install/hadoop-2.5.2/etc/hadoop/slaves`
do
echo ===========查看节点 $n============
ssh $n "cat ~/logs/yarn/userlogs/${1}/container_*/*${logtype}|grep com.baizhiedu"
done

3. 运行脚本:

1. 修改脚本权限

  1. ./scanMRLog.sh application_1558968514803_0001
关闭日志聚合也就是JobHistoryServer 关闭这个,然后执行这个

然后启动idea中的程序,可以查看你的日志文件中的信息

执行./scanMRLog.sh application_1558968514803_0001

再执行上面的命令。

MapReduce的实战案例

启动节点:

启动:

sbin/hadoop-daemon.sh start namenode
sbin/hadoop-daemon.sh start datanode
sbin/yarn-daemon.sh start resourcemanager
sbin/yarn-daemon.sh start nodemanager

停止:
sbin/hadoop-daemon.sh stop namenode
sbin/hadoop-daemon.sh stop datanode
sbin/yarn-daemon.sh stop resourcemanager
sbin/yarn-daemon.sh stop nodemanager

启动secondarynamenode
./sbin/hadoop-daemon.sh start secondarynamenode
或者 sbin/start-dfs.sh

java 代码:

执行后:可以看到日志中的读取的数据

查看统计结果

MapReduce自定义数据类型

MapReduce中,Map与Reduce会进行跨JVM,跨服务器的通信,所以需要MapReduce中的数据类型进行序列化

Writable
Compareable

WriableCompareable (既能排序又能序列化)

IntWritable
LongWritable
FloatWritable
Text

NullWritable

#自定义hadoop的数据类型?
程序员自定义的类型 实现Writable Comparable(compareto方法 int 0 1 -1 )
直接实现WritableCompareble

write(DataOutput out)
readFields(DataInput in)
compareto()

equals
hashcode
toString

注意:自定义类型中 toString方法 返回的内容,将会位置输出文件的格式

代码:

package com.qlh.day01;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

public class MyWritable implements WritableComparable {
private Integer id;
private String name;

public MyWritable() {  
}

public MyWritable(Integer id, String name) {  
    this.id = id;  
    this.name = name;  
}

public Integer getId() {  
    return id;  
}

public void setId(Integer id) {  
    this.id = id;  
}

public String getName() {  
    return name;  
}

public void setName(String name) {  
    this.name = name;  
}

@Override  
public boolean equals(Object o) {  
    if (this == o) return true;  
    if (o == null || getClass() != o.getClass()) return false;  
    MyWritable that = (MyWritable) o;  
    return Objects.equals(id, that.id) &&  
            Objects.equals(name, that.name);  
}

@Override  
public int hashCode() {  
    return Objects.hash(id, name);  
}

@Override  
public int compareTo(MyWritable o) {  
    int result = id.compareTo(o.getId());  
    if (result ==0){  
        return name.compareTo(o.getName());  
    }  
    return id.compareTo(o.getId());  
}

@Override  
public void write(DataOutput out) throws IOException {  
  out.write(id);  
  out.writeUTF(name);  
}

@Override  
public void readFields(DataInput in) throws IOException {  
     id = in.readInt();  
     name = in.readUTF();  
}  

}

java代码

场景二:字符串是空串的应用场景

package com.lqh;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyMapper1 extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
System.out.println(line);
String[] split = line.split(" ");

// 将对象封装到Text中
Text k2 = new Text(split[0]);
NullWritable v2 = NullWritable.get();
context.write(k2,v2);

}  

}

MyMapper

package com.lqh;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* 接收来自map中的key value
*/

public class MyReduce1 extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// 遍历集合
Text k3 = key;
NullWritable v3 = NullWritable.get();
context.write(k3,v3);

}  

}

MyReduce

package com.lqh;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class MyReduceSubmit {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取文件的位置
String inPath = "/hw/data1";
// 输出文件的位置
String outPath = "/zhulina3";
//
Configuration configuration = new Configuration();
// 创建一个Job作业
Job job = Job.getInstance(configuration, MyReduceSubmit.class.getName());
// 指定作业以jar包形式运行
job.setJarByClass(MyReduceSubmit.class);

// InputFormat TextInputFormat 是inputFormat的子类
// 指定路径
Path path = new Path(inPath);
TextInputFormat.addInputPath(job,path);

// Map
job.setMapperClass(MyMapper1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

// shuffer

// Reduce
job.setReducerClass(MyReduce1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// 输出目录一定不能存在有 MapReduce动态创建
Path path1 = new Path(outPath);
FileSystem fileSystem = FileSystem.get(configuration);
fileSystem.delete(path1,true);

// OutPutFormat
TextOutputFormat.setOutputPath(job,path1);
// 运行job作业
job.waitForCompletion(true);

}  

}

MayReduceSubmit

http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
mr com.lqh 1.0-SNAPSHOT ../mr/pom.xml
4.0.0

<artifactId>hadoop-hr</artifactId>

<name>hadoop-hr</name>  
<!-- FIXME change it to the project's website -->  
<url>http://www.example.com</url>

<properties>  
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
    <maven.compiler.source>1.7</maven.compiler.source>  
    <maven.compiler.target>1.7</maven.compiler.target>  
    <baizhi-mainClass>com.lqh.MyReduceSubmit</baizhi-mainClass>  
    <target-host>192.168.253.11</target-host>  
    <target-position>/opt/install/hadoop-2.5.2</target-position>

</properties>

<dependencies>  
    <dependency>  
        <groupId>log4j</groupId>  
        <artifactId>log4j</artifactId>  
        <version>1.2.17</version>  
    </dependency>  
    <dependency>  
        <groupId>junit</groupId>  
        <artifactId>junit</artifactId>  
        <version>4.11</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.hadoop</groupId>  
        <artifactId>hadoop-common</artifactId>  
        <version>2.5.2</version>  
    </dependency>

    <dependency>  
        <groupId>org.apache.hadoop</groupId>  
        <artifactId>hadoop-hdfs</artifactId>  
        <version>2.5.2</version>  
    </dependency>

    <dependency>  
        <groupId>org.apache.hadoop</groupId>  
        <artifactId>hadoop-client</artifactId>  
        <version>2.5.2</version>  
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->  
    <dependency>  
        <groupId>org.apache.hadoop</groupId>  
        <artifactId>hadoop-mapreduce-client-core</artifactId>  
        <version>2.5.2</version>  
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-common -->  
    <dependency>  
        <groupId>org.apache.hadoop</groupId>  
        <artifactId>hadoop-yarn-common</artifactId>  
        <version>2.5.2</version>  
    </dependency>  
</dependencies>

<build>  
    <extensions>  
        <!--这个插件的作用是将windows打成的jar包scp到linux指定目录下-->  
        <extension>  
            <groupId>org.apache.maven.wagon</groupId>  
            <artifactId>wagon-ssh</artifactId>  
            <version>2.8</version>  
        </extension>  
    </extensions>  
    <plugins>  
        <!--打jar包-->  
        <plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-jar-plugin</artifactId>  
            <version>2.3.2</version>  
            <!--打完jar包后产生main函数-->  
            <configuration>  
                <outputDirectory>${basedir}</outputDirectory>  
                <archive>  
                    <manifest>  
                        <mainClass>${baizhi-mainClass}</mainClass>  
                    </manifest>  
                </archive>  
            </configuration>  
        </plugin>

        <plugin>

            <groupId>org.codehaus.mojo</groupId>  
            <artifactId>wagon-maven-plugin</artifactId>  
            <version>1.0</version>  
            <configuration>  
                <fromFile>${project.build.finalName}.jar</fromFile>  
                <url>scp://root:123456@${target-host}${target-position}</url>  
                <!--maven helpe就是将多个命令绑定在一起同时执行-->  
                <commands>  
                    <!--杀死原来的进程-->  
                    <command>pkill -f ${project.build.finalName}.jar</command>  
                    <!--重新启动test.jar,程序的输出结果写到nohup.out文件中  
                    nohup  linux特有的不再控制台输出  
                    -->  
                    <!--因为存在服务器的重启之类的操作,wagon也支持使用shell命令,可以有多个command标签哦,根据先后顺序执行-->  
                    <command>nohup ${target-position}/bin/yarn jar ${target-position}/${project.build.finalName}.jar >/root/nohup.out 2>&amp;1 &amp;</command>  
                </commands>  
                <!-- 显示运行命令的输出结果 -->  
                <displayCommandOutputs>true</displayCommandOutputs>  
            </configuration>  
        </plugin>  
    </plugins>  
</build>  

pom

注意一些细节:

在这个reduce中reduce只是起到了一个传递值的作用,并没有任何的业务逻辑

所以这个reduce 就会成为这个业务需求中的业务冗余。

MapReduce作业中Map,Reduce的一些细节问题

1.MapReduce中是可以没有Reduce

如果MapReduce中,只是对数据进行清洗,而不负责统计,去重的话,就没有Reduce

job.setNumReduceTasks(0); 在代码中指定这句代码
// job.setReducerClass(MyReducer.class);
// job.setOutputKeyClass(Text.class);
// job.setOutputValueClass(NullWritable.class);

2.MapReduce中有多少个Map?

1. 文本文件处理中,Map的数量由block决定。

3.MapReduce中有多少个Reduce?

1. Reduce个数可以设置的
默认情况 reduce的个数是 1
    mapreduce.job.reduces 1

   job.setNumReduceTasks(?);

job.setNumReduceTasks(2); 可以指定数量

1. 为什么要设置多个Reduce?
提高MR的运行效率 map中的数量是有block决定的但是多个map对应一个reduce处理数据降低效率,所以要增加reduce的数量
2. 多个Reduce的输出结果是多个文件,可以再次进行Map的处理,进行汇总 3
. Partion分区

由分区决定Map输出结果,交给那个Reduce处理。默认有HashPartitioner实现
k2.hashCode()%reduceNum(2) = 0,1

4. map选择reduce处理的自定义分区算法

/*123 R 大于3 在另一个Reduce */
123 在第一个reduce 中 4 在第二个reduce中

1. 自定义Partitioner public class MyPartitioner extends Partitioner {
@Override
public int getPartition(k2 key, v2 value, int numPartitions) {

    String k = key.toString();

    try{  
        int key\_i = Integer.parseInt(k);  
        if(key\_i<=3){  
            return 0;  
        }else{  
            return 1;  
        }  
    }catch(Exception e){  
        return -1;  
    }  
}

}
2. job作业设置
job.setPartitionerClass(MyPartitioner.class);

5.MapReduce中的计数器 Counter

自己定义计数器 书写在Map或者Reduce
context.getCounter("group-name","counter-name").increment(1);
public enum MyCounter {
MY_COUNTER
}
context.getCounter(MyCounter.MY_COUNTER).increment(1);

6. Combiner

Combiner是Map端的Reduce,提前Map端作合并,从而减少传输的数据,提高效率。
默认情况是Combiner关闭
job.setCombinerClass(MyReducer.class);

需求:网站经营数据的分析 (pv,uv)值