抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

码猿不正经

1. 谈谈Hadoop序列化和反序列化及自定义bean对象实现序列化?

https://blog.csdn.net/klionl/article/details/105395340

序列化概述

  • 什么是序列化和反序列化:

    • 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)便于存储到磁盘(持久化)和网络传输

    • 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象

  • 为什么要序列化

    • 一般来说,“活的” 对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机
  • 为什么不用java的序列化:

    • Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后 ,会附带很多额外的信息 (各种校验信息 ,header,继承体系等),不便于在网络中高效传输。所以 ,hadoop自己开发了一套序列化机制(Writable),精简、高效。
  • Hadoop序列化特点:

    1. 紧凑: 高效使用存储空间。

    2. 快速: 读写数据的额外开销小。

    3. 可扩展: 随着通信协议的升级而可升级

    4. 互操作: 支持多语言的交互

自定义序列化接口(Writable)

​ 在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
具体实现bean对象序列化步骤如下7步:

  1. 必须实现Writable接口
  2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
1
2
3
>public FlowBean() {
super();
>}
  1. 重写序列化方法
1
2
3
4
5
6
>@Override
>public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
>}
  1. 重写反序列化方法
1
2
3
4
5
6
>@Override
>public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
>}
  1. 注意反序列化的顺序和序列化的顺序完全一致
  2. 要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。
1
2
3
4
>@Override
>public string toString(){
return upFlow + "\t" + downFlow + "\t" + sumFlow;
>}
  1. 如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框架中的Shuffle过程要求对key必须能排序。
1
2
3
4
5
>@Override
>public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
>}

2.FileInputFormat切片机制

切片机制

  • 简单地按照文件的内容长度进行切片
  • 切片大小默认等于block大小(block默认128M)
  • 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

具体步骤

(1) 根据driver中设置的输入目录,找到数据存储的目录。listStatus()

(2) 开始遍历目录下的每一个文件,针对每个文件单独切片,不考虑数据整体。

(3) 遍历第一个文件xx.txt

​ a) 获取文件大小fs.getLen(xx.txt)

​ b) 默认情况下,切片大小=blocksize

​ c) 开始切片,例如有个300M的文件,形成第一个切片:word.bxt一D:128M 第2个切片:word.txt一128:256M 第3个切片:word.txt一256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片(如大于128m但小于140m))。

​ d) 将切片信息写到一个切片规划文件中Job.split

​ e) 数据切片只是在逻辑上对输入数据进行分片,并不会在磁盘上将其切分成分片进行存储。使用inputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。

​ f) 注意:block是HDFS物理上存储的数据,切片是对数据逻辑上的划分

(4) 提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask的个数。

3.自定义InputFormat流程

(1)自定义一个类继承FileInputFormat

(2)改写RecordReader ,实现一次读取一个完整文件封装为KV

4.如何决定一个job的map和reduce的数量?

1)map数量

splitSize=max{minSize,min{maxSize,blockSize}}

map数量由处理的数据分成的block数量决定default_num = total_size / split_size;

2)reduce数量

reduce的数量job.setNumReduceTasks(x);x 为reduce的数量。不设置的话默认为 1。

5.MapTask的个数由什么决定

  • 一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定。

6.MapTask工作机制

1638779143474_MapTask工作原理.jpg

  1. read阶段:Map Task通过用户编写的RecordReader ,从输入InputSplit中解析出一个个key/value
  2. map阶段:将解析出的key-value交给用户编写的map()函数进行处理,生成新的key-value
  3. collect阶段:将map的数据写到环形缓冲区(分区)中
  4. spill溢写阶段:环形缓冲区数据满80%后溢写磁盘,生成临时文件,溢写之前需要进行排序
  5. combine阶段:合并所有临时文件(而不是执行Combiner业务逻辑):归并排序,将一些多次产生的小文件进行合并,形成一个大文件

详细步骤

(1)Read阶段 :Map Task通过用户编写的RecordReader ,从输入InputSplit中解析出一个个key/value。

(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理 ,并产生一系列新的key/value。

(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部 ,它会将生成的key/value分区(调用Partitioner) ,并写入一个环形内存缓冲区中。

(4)Spill阶段 :即“溢写” ,当环形缓冲区满后 ,MapReduce会将数据写到本地磁盘上 ,生成一个临时文件。需要注意的是 ,将数据写入本地磁盘之前 ,先要对数据进行一次本地排序 ,并在必要时对数据进行合并、压缩等操作。

1
2
3
4
5
6
7
>溢写阶段详情:
>步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照
>key进行排序。这样 ,经过排序后 ,数据以分区为单位聚集在一起 ,且同一分区内所有数据按照key有序。
>步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示
>当前溢写次数)中。如果用户设置了Combiner ,则写入文件之前 ,对每个分区中的数据进行一次聚集操作。
>步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中 ,其中每个分区的元信息包括在临时文件中的偏
>移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB ,则将内存索引写到文件output/spillN.out.index中。

(5)Combine阶段 :当所有数据处理完成后 ,MapTask对所有临时文件进行一次合并 ,以确保最终只会生成一个

数据文件。

​ 当所有数据处理完后 ,MapTask会将所有临时文件合并成一个大文件 ,并保存到文件output/file.out中 ,同时生成相应的索引文件output/file.out.index。

​ 在进行文件合并过程中 ,MapTask以分区为单位进行合并。对于某个分区 ,它将采用多轮递归合并的方式。每轮合 并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

​ 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

7.ReduceTask工作机制

(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Merge阶段:在远程拷数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

8.请描述mapReduce有几种排序及排序发生的阶段

排序的分类

  • 部分排序

    ​ MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。

  • 全排序

    ​ 用Hadoop产生一个全局排序文件最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。

    ​ 替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路时使用一个分区来描述输出的全局排序。例如:可以为待分析文件创建3个分区 ,在第一分区中 ,记录的单词首字母a-g ,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。

  • 辅助排序(GroupingComparator分组)

    ​ Mapreduce框架在记录到达reduce之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中 ,这些值的排序也不固定 ,因为它们来自不同的map任务且这些map任务在不同轮次中完成时间各不相同。一般来说 ,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是 ,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。

  • 二次排序

    ​ 在自定义排序过程中 ,如果compareTo中的判断条件为两个即为二次排序。

排序发生的阶段

map阶段和reduce阶段的shuffle阶段都会发生一次

  • 一个是在Map side,发生在spill后,partition前
  • 一个是在Reduce side, 发生在copy后,reduce前

自定义排序WritableComparable

  • bean对象实现WritableComparable接口重写compareTo方法 ,就可以实现排序

    1
    2
    3
    4
    5
    @Override
    public int compareTo(FlowBean o) {
    // 倒序排列 ,从大到小
    .sumFlow > o.getSumFlow() ? -1 : 1;
    }

9.请描述mapReduce中shuffle阶段的工作流程,如何优化shuffle阶段?

  • 工作流程:分区 ,排序 ,溢写 ,拷贝到对应reduce机器上

  • 优化:增加combiner ,压缩溢写的文件。

10.请描述MapReduce中combiner的作用是什么,一般使用情景,哪些情况不需要,及和reduce的区别?

  • Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
  • Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟reducer的输入kv类型要对应起来。
  • Combiner和reducer的区别在于运行的位置:
    • Combiner是在每一个maptask所在的节点运行;
    • Reducer是接收全局所有Mapper的输出结果。

11.如果没有定义partitioner ,那数据在被送达reducer前是如何被分区的?

如果没有自定义的 partitioning,则默认的 partition 算法。

  • 先对每一条数据的键(key)进行哈希运算

  • 然后,将得到的哈希值对 Reducer 的总数进行取模运算 ,

  • 取模运算的结果决定了每个键值对应该发送到哪个 Reducer,也就是分区号

12.有可能使 Hadoop 任务输出到多个目录中么?如果可以 ,怎么做?

可以输出到多个目录中 ,采用自定义OutputFormat

  • 实现步骤:

​ (1)自定义outputformat

​ (2)改写recordwriter ,具体改写输出数据的方法write()

评论