本文共 5202 字,大约阅读时间需要 17 分钟。
目录
这个文本文件,其中第六个字段表示开奖结果数值,现在以15为分界点,将15以上的结果保存到一个文件,15以下的结果保存到一个文件。
注意:cdh版本已经不支持本地运行,所以我们用 apache版本
cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ org.apache.hadoop hadoop-common 2.7.4 org.apache.hadoop hadoop-hdfs 2.7.4 org.apache.hadoop hadoop-client 2.7.4 org.apache.hadoop hadoop-mapreduce-client-core 2.7.4 org.apache.maven.plugins maven-compiler-plugin 3.0 org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade true
package com.czxy.partitioner;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PartitionerMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 数据不需要任何操作 context.write(value,NullWritable.get()); }}
package com.czxy.partitioner;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PartitionerReduce extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 数据不需要任何操作 context.write(key,NullWritable.get()); }}
package com.czxy.partitioner;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/** *这里的 key value 的输入类型 对应 map输出类型 */public class MyPartitioner extends Partitioner{ @Override public int getPartition(Text text, NullWritable nullWritable, int i) { // 类型转换 String s = text.toString(); //字符串切割 获取第5位 String res = s.split("\t")[5]; System.out.println(res); //字符串转为integer 判断大于15放在一个分区中 负责放在另一个分区中 if (Integer.parseInt(res)>15){ return 1; } return 0; }}
注意:你的reduceTask设置几个就会产生几个文件,你的Partitioner如果没有设置返回值那么多余的文文都是空的
package com.czxy.partitioner;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class PartitionerDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { // 获取job Job job = Job.getInstance(new Configuration()); // 设置支持jar执行 job.setJarByClass(PartitionerDriver.class); // 设置执行的napper job.setMapperClass(PartitionerMapper.class); // 设置map输出的key类型 job.setMapOutputKeyClass(Text.class); // 设置map输出value类型 job.setMapOutputValueClass(NullWritable.class); // 设置执行的reduce job.setReducerClass(PartitionerReduce.class); // 设置reduce输出key的类型 job.setOutputKeyClass(Text.class); // 设置reduce输出value的类型 job.setOutputValueClass(NullWritable.class); // 设置文件输入 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("./data/partitioner/")); // 设置文件输出 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("./outPut/partitioner/")); // 设置 Task 数量 job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(2); // 设置启动类 boolean b = job.waitForCompletion(true); return b ? 0 : 1; } public static void main(String[] args) throws Exception { ToolRunner.run(new PartitionerDriver(), args); }}
(提取码6npi)
执行结果:
文件1:part-r-00000
文件2:part-r-00001
转载地址:http://kakzi.baihongyu.com/