《大数据hadoop参考资料案例分析.doc》由会员分享,可在线阅读,更多相关《大数据hadoop参考资料案例分析.doc(69页珍藏版)》请在taowenge.com淘文阁网|工程机械CAD图纸|机械工程制图|CAD装配图下载|SolidWorks_CaTia_CAD_UG_PROE_设计图分享下载上搜索。
1、目录第10单元 练习案例210.1 案例1 通过采集的气象数据分析每年的最高温度210.2 案例2:数据去重问题510.3 案例3:数据排序910.4 案例4:平均成绩144.编写MAP代码1610.5 案例5:求最大值和最小值2010.6 案例6:TopN问题252.数据2510.7 案例7:分析非结构化文件3110.8 案例8:获取员工所在部门信息36第11单元 西普研究生学员分析4711.1 数据准备4711.1.1 整理所有学员基本信息4711.1.2 项目准备50第10单元 练习案例要点提示:通过几个实际案例,巩固前面学习的相关知识 10.1 案例1分析年气象数据最高温度1原始数据分
2、析0067011990999991950051507004888888889999999N9+00001+99999999999999999999990067011990999991950051512004888888889999999N9+00221+99999999999999999999990067011990999991950051518004888888889999999N9-00111+99999999999999999999990067011990999991949032412004888888889999999N9+01111+9999999999999999999999数据说
3、明: 第15-19个字符是year第45-50位是温度表示,+表示零上 -表示零下,且温度的值不能是9999,9999表示异常数据。第50位值只能是0、1、4、5、9几个数字。第一步:在eclipse中新建一个项目“TemperatureHight”。如下图所示第二步:在项目src目录下,右键点击,选择“新建”创建一个类文件名称为“MaxTemperatureMapper”并指定包名”com.simple.temperature”。第三步:在编写“MaxTemperatureMapper”类之前需要把hadoop相关的jar包导入,首先在项目根目录下创建一个文件夹并把指定位置中的包放入该文件中
4、。第四步:把lib下所有的jar包导入到环境变量,全选lib文件夹下的jar包文件,右键点击,选择“build path”-“add to build path”,添加后,发现在项目下多一个列表项“Referenced Libraries”第四步:让类“MaxTemperatureMapper”继承类Mapper同时指定需要的参数类型,根据业务逻辑修改map类的内容如下。package com.simple.temperature;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apac
5、he.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MaxTemperatureMapper extendsMapper private static final int MISSING = 9999;Overrideprotected void map(LongWritable key, Text value,Mapper.Context context)throws IOException, InterruptedE
6、xception String line = value.toString();/读取一条记录String year = line.substring(15, 19);/获取温度数System.out.println(year=+year);int airTemperature;if (line.charAt(45) = +) /判断温度正负airTemperature = Integer.parseInt(line.substring(46, 50); else airTemperature = Integer.parseInt(line.substring(45, 50);String q
7、uality = line.substring(50, 51);System.out.println(quality: + quality);/判断温度是否异常if (airTemperature != MISSING & quality.matches(01459) context.write(new Text(year), new IntWritable(airTemperature);第五步:在项目src目录下指定的包名”com.simple.temperature”下右键点击,新建一个类名为“MaxTemperatureReducer “并继承。package com.simple.t
8、emperature;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class MaxTemperatureReducer extends ReducerOverrideprotected void reduce(Text key, Iterable values,Context context)throws IOException, Inte
9、rruptedException /把Integer.MIN_VALUE = 2147483847作为maxValue的初始值int maxValue = Integer.MIN_VALUE;/循环取出最大值for(IntWritable value: values)maxValue = Math.max(maxValue, value.get();context.write(key, new IntWritable(maxValue);第五步:在项目src目录下指定的包名”com.simple.temperature”下右键点击,新建一个测试主类名为“MaxTemperature “并指定主
10、方法。第五步:在项目src目录下指定的包名”com.simple.temperature”下右键点击,新建一个测试主类名为“MaxTemperature “并指定主方法。package com.simple.temperature;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce
11、.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MaxTemperature public static void main(String args) throws Exception/获取作业对象Job job = Job.getInstance(new Configuration();/设置主类job.setJarByClass(MaxTemperature
12、.class);/设置job参数job.setMapperClass(MaxTemperatureMapper.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);/设置job输入输出FileInputFormat.addInputPath(job, new Path(file:/simple/source.txt);FileOutputFormat.setOutputPath(jo
13、b, new Path(file:/simple/output); System.exit(job.waitForCompletion(true) ? 0 : 1);第六步:测试结果10.2 案例2:数据去重问题1原始数据1)file1:2012-3-1 a2012-3-2 b2012-3-3 c2012-3-4 d2012-3-5 a2012-3-6 b2012-3-7 c2012-3-3 c 2)file2:2012-3-1 b2012-3-2 a2012-3-3 b2012-3-4 d2012-3-5 a2012-3-6 c2012-3-7 d2012-3-3 c 数据输出: 2012-
14、3-1 a 2012-3-1 b 2012-3-2 a 2012-3-2 b 2012-3-3 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-6 c 2012-3-7 c 2012-3-7 d 2说明数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key,而对value-list则没有要求。当reduce接收到一个时就直接将key复制到输出的
15、key中,并将value设置成空值。第一步:创建一个项目第二步:编写Mapper代码package com.simple.deduplication;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class DeduplicationMapper extends Mapper public void map(Object key, Text v
16、alue, Context context)throws IOException, InterruptedException /按行读取信息并作为mapper的输出键,mapper的输出值置为空文本即可Text line = value;context.write(line, new Text();第三步:编写Reducer代码package com.simple.deduplication;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer
17、;public class DeduplicationReducer extends Reducer public void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException /Reducer阶段直接按键输出即可,键直接可以实现去重context.write(key, new Text();第四步:测试类package com.simple.deduplication;import org.apache.hadoop.conf.Configuration;impo
18、rt org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TestDeduplication public static void main(String args) throws
19、Exception/获取作业对象Job job = Job.getInstance(new Configuration();/设置主类job.setJarByClass(TestDeduplication.class);/设置job参数job.setMapperClass(DeduplicationMapper.class);job.setReducerClass(DeduplicationReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);/设置job输入输出FileInpu
20、tFormat.addInputPath(job, new Path(file:/simple/source.txt);FileOutputFormat.setOutputPath(job, new Path(file:/simple/output); System.exit(job.waitForCompletion(true) ? 0 : 1);第五步:运行结果第六步:查看生成的结果10.3 案例3:数据排序数据排序是许多实际任务执行时要完成的第一项工作,比如学生成绩评比、数据建立索引等。这个实例和数据去重类似,都是先对原始数据进行初步处理,为进一步的数据操作打好基础。下面进入这个示例。1
21、.需求描述对输入文件中数据进行排序。输入文件中的每行内容均为一个数字,即一个数据。要求在输出中每行有两个间隔的数字,其中,第一个代表原始数据在原始数据集中的位次,第二个代表原始数据。2原始数据 1)file1:2326543215756652232)file2:595622650923)file3:26546 样例输出: 1 2 2 6 3 15 4 22 5 26 6 32 7 32 8 54 9 92 10 650 11 654 12 756 13 595614 652233. 设计思考这个实例仅仅要求对输入数据进行排序,熟悉MapReduce过程的读者会很快想到在MapReduce过程中
22、就有排序3(mapreduce框架自动会对key3进行排序,这要把要拍与的放在key3,mapreduce会自动对其进行排序要进行排序的话必须要设置reduc的数量只能有一个),是否可以利用这个默认的排序,而不需要自己再实现具体的排序呢?答案是肯定的。但是在使用之前首先需要了解它的默认排序规则。它是按照key值进行排序的,如果key为封装int的IntWritable类型,那么MapReduce按照数字大小对key排序,如果key为封装为String的Text类型,那么MapReduce按照字典顺序对字符串排序。了解了这个细节,我们就知道应该使用封装int的IntWritable型数据结构了。
23、也就是在map中将读入的数据转化成 IntWritable型,然后作为key值输出(value任意)。reduce拿到之后,将输入的 key作为value输出,并根据value-list中元素的个数决定输出的次数。输出的key(即代码中的linenum)是一个全局变量,它统计当前key的位次。需要注意的是这个程序中没有配置Combiner,也就是在MapReduce过程中不使用Combiner。这主要是因为使用map和reduce就已经能够完成任务了。1. map代码package com.simple.deduplication;import java.io.IOException;impo
24、rt org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class IntSortMapper extends Mapper Overrideprotected void map(LongWritable key, Text value,Mapper.Co
25、ntext context)throws IOException, InterruptedException /按行读取整数信息并作为mapper的输出键,mapper的输出值置为空即可context.write(new IntWritable(Integer.valueOf(value.toString(),NullWritable.get();2. reduce代码package com.simple.deduplication;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apa
26、che.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;public class IntSortReducer extends Reducer /使用num作为控制打印序列的编号变量IntWritable num = new IntWritable(1);public void reduce(IntWritable key, Iterable values, Context context)throws IOException, InterruptedException /Reducer阶段直接按键输出即可,键
27、直接可以实现去重context.write(num, key);/为排序后的每个整数值按次序指定编号num = new IntWritable(num.get()+1);6main代码package com.simple.deduplication;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache
28、.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TestIntSort public static void main(String args) throws Exception/获取作业对象Job job = Job.getInstance(new Configuration();/设置主类job.setJarByClass(
29、TestIntSort.class);/设置job参数job.setMapperClass(IntSortMapper.class);job.setReducerClass(IntSortReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);/设置job输入输出/FileI
30、nputFormat.addInputPath(job, new Path(file:/simple/source.txt);FileInputFormat.setInputPaths(job, new Path(file:/simple/source.txt);FileOutputFormat.setOutputPath(job, new Path(file:/simple/output); System.exit(job.waitForCompletion(true) ? 0 : 1);10.4 案例4:平均成绩1.需求分析 对输入文件中数据进行就算学生平均成绩。输入文件中的每行内容均为一
31、个学生的姓名和他相应的成绩,如果有多门学科,则每门学科为一个文件。要求在输出中每行有两个间隔的数据,其中,第一个代表学生的姓名,第二个代表其平均成绩。2.原始数据1)math:张三 88李四 99王五 66赵六 772)china:张三 78李四 89王五 96赵六 673)english:张三 80李四 82王五 84赵六 86 样本输出: 张三 82 李四 90 王五 82 赵六 763.设计思考 Map处理的 是一个纯文本文件, 文件中存放的数据时每一行表示一个学生的姓名和他相应一科成绩。Mapper处理的数据是由InputFormat分解过的数据集,其中 InputFormat的作用是
32、将数据集切割成小数据集InputSplit,每一个InputSplit将由一个Mapper负责处理。此 外,InputFormat中还提供了一个RecordReader的实现,并将一个InputSplit解析成对提 供给了map函数。InputFormat的默认值是TextInputFormat,它针对文本文件,按行将文本切割成InputSlit,并用 LineRecordReader将InputSplit解析成对,key是行在文本中的位置,value是文件中的 一行。Map的结果会通过partion分发到Reducer,Reducer做完Reduce操作后,将通过以格式OutputForma
33、t输出。 Mapper最终处理的结果对,会送到Reducer中进行合并,合并的时候,有相同key的键/值对则送到同一个 Reducer上。 Reducer是所有用户定制Reducer类地基础,它的输入是key和这个key对应的所有value的一个迭代器,同时还有 Reducer的上下文。Reduce的结果由Reducer.Context的write方法输出到文件中。4.编写map代码package com.simple.avg.score;import java.io.IOException;import java.util.StringTokenizer;import org.apache.
34、hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class AvgScoreMapper extendsMapper Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException /读取一行数
35、据String val = value.toString();/把读取的数据以换行作为分割符StringTokenizer stringTokenizer = new StringTokenizer(val, n);while (stringTokenizer.hasMoreElements() StringTokenizer tmp = new StringTokenizer(stringTokenizer.nextToken();/对读取的一行的名称和成绩进行切分并写入到context对象中String username = tmp.nextToken();String score = t
36、mp.nextToken();context.write(new Text(username),new IntWritable(Integer.valueOf(score);5.编写reduce代码package com.simple.avg.score;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;pub
37、lic class AvgScoreReducer extends Reducer Overrideprotected void reduce(Text key, Iterable values,Context context)throws IOException, InterruptedException /获取对键值集合遍历对象Iterator iterator = values.iterator();int count = 0;int sum =0;/循环获取相同键的所有值并计算和while(iterator.hasNext()int v = iterator.next().get();
38、sum += v;count+;int avg = sum/count;context.write(key, new IntWritable(avg);6.编写Main Job函数package com.simple.avg.score;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred
39、uce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class AvgScoreJob public static void main(String args) throws Exception/获取作业对象Job job = Job.getInstance(new Configuration();/设置主类job.setJarByClass(AvgScoreJob.cl
40、ass);/设置job参数job.setMapperClass(AvgScoreMapper.class);job.setReducerClass(AvgScoreReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);/设置job输入输出/FileInputFormat.addInputPath(job
41、, new Path(file:/simple/source.txt);FileInputFormat.setInputPaths(job, new Path(file:/simple/source.txt);FileOutputFormat.setOutputPath(job, new Path(file:/simple/output1); System.exit(job.waitForCompletion(true) ? 0 : 1);程序正确执行完毕:执行结果:10.5 案例5:求最大值和最小值1数据准备1)文件1:a.txt 102103910920011390282)文件2:b.txt 523083810005 10210391092001139028523083810005结果预测:Max 10005Min 22、编码实现Mappackage com.simple.max.min.value;import java.io.IOException;import java.util.StringTokenizer;import org.apache.