Site Overlay

大数据学习02-完整代码

0 Views

-详细需求下次再说。

SalaryMapper.java

package salaryHadoop;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
public class SalaryMapper extends Mapper<LongWritable, Text, Text, FloatWritable> {

    Text kout = new Text();
    FloatWritable favg = new FloatWritable();
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] items = line.split(",");
        String akey = items[1] + "-" + items[7];
        float avg = Float.parseFloat(items[4]);
        kout.set(akey.trim());
        favg.set(avg);
        context.write(kout,favg);
    }
}

SalaryReducer.java

package salaryHadoop;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SalaryReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {
    Text kout = new Text();
    FloatWritable valueout = new FloatWritable();

    public void reduce(Text key, Iterable<FloatWritable> values, Context context)
            throws IOException,InterruptedException{
        int count=0;
        float avgSalary=0;
        float sumSalary=0;
        for(FloatWritable dataInfo:values)
        {
            sumSalary+=dataInfo.get();
            count++;
        }
        avgSalary = sumSalary/count;
        valueout.set(avgSalary);
        context.write(key,valueout );
    }
}

Salary.java

package salaryHadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class Salary {
    public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {
        //通过Job对象对 map类和reduce类进行封装
        //通过这种方式获取job对象
        Job job = Job.getInstance(new Configuration());
        //指定程序的入口,即程序是从哪个类开始运行
        job.setJarByClass(Salary.class);
        //指定Mapper类
        job.setMapperClass(SalaryMapper.class);
        //指定Map输出的key 的类型
        job.setMapOutputKeyClass(Text.class);
        //指定Map输出的value 的类型
        job.setMapOutputValueClass(FloatWritable.class);

        //指定Reducer类
        job.setReducerClass(SalaryReducer.class);
        //指定Reduce输出的key 的类型
        job.setOutputKeyClass(Text.class);
        //指定Reduce输出的value 的类型
        job.setOutputValueClass(FloatWritable.class);

        //指定要处理的数据,数据要放在HDFS上
        FileInputFormat.setInputPaths(job, args[0]);
        //指定处理之后的结果,所在目录,同样也是在HDFS上
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //等待MapReduce处理结束,true表示是否打印进度
        boolean result = job.waitForCompletion(true);
        //退出
        System.exit(result ? 0 : 1);

    }
}
1+

说点什么

200
  Subscribe  
提醒