如何使用Hadoop分析数据_光环大数据培训
光环大数据光环大数据-大数据培训知名品牌大数据培训知名品牌光环大数据光环大数据 http:/hadoop.aura.cnbulkload 流程与实践流程与实践_光环大数据培训光环大数据培训bulkload 流程与实践流程与实践, bulkload 方式需要两个 Job 配合完成: (1)第一个 Job 还是运行原来业务处理逻辑,处理的结果不直接调用HTableOutputFormat 写入到 HBase,而是先写入到 HDFS 上的一个中间目录下(如middata) (2)第二个 Job 以第一个 Job 的输出(middata)做为输入,然后将其格式化 HBase的底层存储文件 HFile (3)调用 BulkLoad 将第二个 Job 生成的 HFile 导入到对应的 HBase 表中 下面给出相应的范例代码:001 import java.io.IOException;002 003 import org.apache.hadoop.conf.Configuration;004 import org.apache.hadoop.fs.Path;005 import org.apache.hadoop.hbase.HBaseConfiguration;006 import org.apache.hadoop.hbase.KeyValue;007 import org.apache.hadoop.hbase.client.HTable;008 import org.apache.hadoop.hbase.client.Put;009 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;010 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;011 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;012 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;013 import org.apache.hadoop.hbase.util.Bytes;014 import org.apache.hadoop.io.IntWritable;015 import org.apache.hadoop.io.LongWritable;016 import org.apache.hadoop.io.Text;017 import org.apache.hadoop.mapreduce.Job;018 import org.apache.hadoop.mapreduce.Mapper;光环大数据光环大数据-大数据培训知名品牌大数据培训知名品牌光环大数据光环大数据 http:/hadoop.aura.cn019 import org.apache.hadoop.mapreduce.Reducer;020 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;021 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;022 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;023 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;024 import org.apache.hadoop.util.GenericOptionsParser;025 026 public class GeneratePutHFileAndBulkLoadToHBase 027 028 public static class WordCountMapper extends Mapper029 030 031 private Text wordText=new Text();032 private IntWritable one=new IntWritable(1);033 Override034 protected void map(LongWritable key, Text value, Context context)035 throws IOException, InterruptedException 036 / TODO Auto-generated method stub037 String line=value.toString();038 String wordArray=line.split(“ “);039 for(String word:wordArray)040 041 wordText.set(word);042 context.write(wordText, one);043 044 045 046 光环大数据光环大数据-大数据培训知名品牌大数据培训知名品牌光环大数据光环大数据 http:/hadoop.aura.cn047 048 public static class WordCountReducer extends Reducer049 050 051 private IntWritable result=new IntWritable();052 protected void reduce(Text key, Iterable valueList,053 Context context)054 throws IOException, InterruptedException 055 / TODO Auto-generated method stub056 int sum=0;057 for(IntWritable value:valueList)058 059 sum+=value.get();060 061 result.set(sum);062 context.write(key, result);063 064 065 066 067 public static class ConvertWordCountOutToHFileMapper extends Mapper068 069 070 Override071 protected void map(LongWritable key, Text value, Context context)072 throws IOException, InterruptedException 073 / TODO Auto-generated method stub光环大数据光环大数据-大数据培训知名品牌大数据培训知名品牌光环大数据光环大数据 http:/hadoop.aura.cn074 String wordCountStr=value.toString();075 String wordCountArray=wordCountStr.split(“t“);076 String word=wordCountArray0;077 int count=Integer.valueOf(wordCountArray1);078 079 /创建 HBase 中的 RowKey080 byte rowKey=Bytes.toBytes(word);081 ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey);082 byte family=Bytes.toBytes(“cf“);083 byte qualifier=Bytes.toBytes(“count“);084 byte hbaseValue=Bytes.toBytes(count);085 / Put 用于列簇下的多列提交,若只有一个列,则可以使用 KeyValue 格式086 / KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);087 Put put=new Put(rowKey);088 put.add(family, qualifier, hbaseValue);089 context.write(rowKeyWritable, put);090 091 092 093 094 095 public static void main(String args) throws Exception 096 / TODO Auto-generated method stub097 Configuration hadoopConfiguration=new Configuration();098 String dfsArgs = new GenericOptionsParser(hadoopConfiguration, args).getRemainingArgs();光环大数据光环大数据-大数据培训知名品牌大数据培训知名品牌光环大数据光环大数据 http:/hadoop.aura.cn099 100 /第一个 Job 就是普通 MR,输出到指定的目录101 Job job=new Job(hadoopConfiguration, “wordCountJob“);102 job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);103 job.setMapperClass(WordCountMapper.class);104 job.setReducerClass(WordCountRed