7.Shuffle详解

7.Shuffle详解学的也是思想 实际生产不用

大家好,欢迎来到IT知识分享网。

 1.分区规则

7.Shuffle详解

ps.”&”指的是按位与运算,可以强制转换为正数 

ps.”%”,假设reduceTask的个数为3,则余数为0,1,2正好指代了三个分区

       以上代码的含义就是对key的hash值强制取正之后,对reduce的个数取余,这样的话,如果key相同,则hash值相同,则余数相同,则会放到同一分区。

      但是如果某一种key过多,则会导致分区不均匀,此问题称为数据倾斜

2.自定义分区

自定义分区用于解决数据倾斜问题

案例:

         数据:一堆手机号

         需求:手机号136、137、138、139开头都分别放到一个独立的4个分区中,其他开头的放到一个分区中,然后对总流量进行倒序排序

重点:(1)全局排序是不能分区的(2)把流量当作key来排序

 3.1 bean阶段(自定义排序规则:继承WritableComparable接口)

public class FlowBean implements WritableComparable<FlowBean>{ private Integer upFlow; private Integer downFlow; private Integer sumFlow; }

生成set/get阶段:

 public Integer getUpFlow() { return upFlow; } public void setUpFlow(Integer upFlow) { this.upFlow = upFlow; } public Integer getDownFlow() { return downFlow; } public void setDownFlow(Integer downFlow) { this.downFlow = downFlow; } public Integer getSumFlow() { return sumFlow; } public void setSumFlow(Integer sumFlow) { this.sumFlow = sumFlow; }

重写toString方法:

public String toString() { return upFlow + "\t" + downFlow + "\t" +sumFlow; }

序列化方法与反序列化方法:

// 序列化 public void write(DataOutput out) throws IOException { out.writeInt(upFlow); out.writeInt(downFlow); out.writeInt(sumFlow); } // 反序列化 public void readFields(DataInput in) throws IOException { upFlow = in.readInt(); downFlow = in.readInt(); sumFlow = in.readInt(); } 

 计算总流量

public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; }
自定义排序规则:倒叙输出:当this.getSumFlow()大于o.getSumFlow()时候,返回负数 
public int compareTo(FlowBean o) { return -this.getSumFlow().compareTo(o.getSumFlow()); }

3.2 自定义分区规则:需要继承Hadoop的提供的Partitioner对象

public class PhonePartitioner extends Partitioner<FlowBean,Text> { public int getPartition(FlowBean flowBean,Text text,int numPartitions) { int phonePartitions; // 获取手机号 String phoneNum = text.toString(); if(phoneNum.startsWith("136")){ phonePartitions = 0; }else if(phoneNum.startsWith("137")){ phonePartitions = 1; }else if(phoneNum.startsWith("138")){ phonePartitions = 2; }else if(phoneNum.startsWith("139")){ phonePartitions = 3; }else { phonePartitions =4; } return phonePartitions; } }

3.3 map阶段:要求输出时以流量做为k,以手机号为v

public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> { private Text outv = new Text(); private FlowBean outk = new FlowBean(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取当前行数据 String line = value.toString(); // 切割数据 String[] phoneDatas = line.split("\t"); // 获取输出数据的key(手机号) outv.set(phoneDatas[1]); // 获取输出数据的value outk.setUpFlow(Integer.parseInt(phoneDatas[phoneDatas.length-3])); outk.setDownFlow(Integer.parseInt(phoneDatas[phoneDatas.length-2])); outk.setSumFlow(); // 将数据输出 context.write(outk, outv); } }

3.4 recue阶段:reduce做的事情就是把数据翻转着写出去

public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> { private FlowBean outv = new FlowBean(); protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 遍历输出 for (Text value : values) { context.write(value,key); } } }

3.5 driver阶段:将以上代码整合起来

7.Shuffle详解

 3.Combiner

定义:就是数据合并,将map的输出结果预聚合,减小网络传输量,比如:

                                 (map,1)(map,1)  (map,1) => (map,3)

Combiner和reducer的差别:

Combiner是对每一个mapper进行汇总,reducer是对所有的mapper进行汇总

使用前提:不能影响最终的业务逻辑,combiner输出的kv应该和reducer输出的kv对应

假如有3,5,7,2,6求平均值,使用combiner做局部合并和使用reducer做全局合并:

7.Shuffle详解

 位置:mapper中,处于分区快排之后,溢写之前

 使用案例:对wordCount使用Combiner

1.自定义Combiner类:重写reducer方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { // 1 汇总操作 int count = 0; for(IntWritable v :values){ count += v.get(); } // 2 写出 context.write(key, new IntWritable(count)); } } 

2.在driver中添加Combiner类:

job.setCombinerClass(WordcountCombiner.class);

 4.shuffle流程总结 

7.Shuffle详解

 ps1.map缓冲区:

环形缓冲区,右边写数据,左边写元数据。但是环形缓冲区是不能碰头的,否则一端的数据会覆盖另一端的数据,所以达到80%就开始溢写

ps2.reduce缓冲区:

如果缓冲区足够大,整个归排就直接在内存中执行,否则就溢写到磁盘进行,最后在发给reduc

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/117457.html

(0)
上一篇 2025-11-20 21:33
下一篇 2025-11-20 22:00

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信