本例子采用hadoop1.1.2版本,附件中有例子的数据文件

采用气象数据作为处理数据

1、MultipleOutputs例子,具体解释在代码中有注释

package StationPatitioner;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.lib.MultipleOutputs;import org.apache.hadoop.mapred.lib.NullOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * hadoop Version 1.1.2 * MultipleOutputs例子 * @author 巧克力黑 * */public class PatitionByStationUsingMultipleOutputs extends Configured implements Tool {	enum Counter 	{		LINESKIP,	//出错的行	}	static class StationMapper extends MapReduceBase implements Mapper
{ private NcdcRecordParser parser = new NcdcRecordParser(); @Override public void map(LongWritable key, Text value, OutputCollector
 output, Reporter reporter) throws IOException { try { parser.parse(value); output.collect(new Text(parser.getStationid()), value); } catch (Exception e) { reporter.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1 } } } static class MultipleOutputReducer extends MapReduceBase implements Reducer
{ private MultipleOutputs multipleOutputs; @Override public void configure(JobConf jobconf) { multipleOutputs = new MultipleOutputs(jobconf);//初始化一个MultipleOutputs } @Override public void reduce(Text key, Iterator
 values, OutputCollector
 output, Reporter reporter) throws IOException { //得到OutputCollector OutputCollector collector = multipleOutputs.getCollector("station", key.toString().replace("-", ""), reporter); while(values.hasNext()){ collector.collect(NullWritable.get(), values.next());//MultipleOutputs用OutputCollector输出数据 } } @Override public void close() throws IOException { multipleOutputs.close(); } } @Override public int run(String[] as) throws Exception { System.setProperty("HADOOP_USER_NAME", "root");//windows下用户与linux用户不一直,采用此方法避免报Permission相关错误 JobConf conf = new JobConf(); conf.setMapperClass(StationMapper.class); conf.setReducerClass(MultipleOutputReducer.class); conf.setMapOutputKeyClass(Text.class); conf.setOutputKeyClass(NullWritable.class); conf.setOutputFormat(NullOutputFormat.class);     FileInputFormat.setInputPaths(conf, new Path("hdfs://ubuntu:9000/sample1.txt"));//input路径     FileOutputFormat.setOutputPath(conf, new Path("hdfs://ubuntu:9000/temperature"));//output路径 MultipleOutputs.addMultiNamedOutput(conf, "station", TextOutputFormat.class, NullWritable.class, Text.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new PatitionByStationUsingMultipleOutputs(), args); System.exit(exitCode); } }

2、解析气象数据的类

package StationPatitioner;import org.apache.hadoop.io.Text;public class NcdcRecordParser {	private static final int MISSING_TEMPERATURE = 9999;	private String year;	private int airTemperature;	private String quality;	private String stationid;	public void parse(String record) {		stationid = record.substring(0, 5);		year = record.substring(15, 19);		String airTemperatureString;		// Remove leading plus sign as parseInt doesn't like them		if (record.charAt(87) == '+') {			airTemperatureString = record.substring(88, 92);		} else {			airTemperatureString = record.substring(87, 92);		}		airTemperature = Integer.parseInt(airTemperatureString);		quality = record.substring(92, 93);	}		public String getStationid(){		return stationid;	}	public void parse(Text record) {		parse(record.toString());	}	public boolean isValidTemperature() {		return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");	}	public String getYear() {		return year;	}	public int getAirTemperature() {		return airTemperature;	}}