1,map类
public static class MyMapper extends Mapper{ List putList = new LinkedList (); HTableInterface bvuser; public MyMapper() { } //这个方法每个map任务只调用一次 @Override protected void setup(final Context context) throws IOException, InterruptedException { bvuser = new HTable(context.getConfiguration(),"bvuser"); BufferedReader br = new BufferedReader(new InputStreamReader(FileSystem.get(context.getConfiguration()).open(new Path("hdfs文件路径")))); String line = null; while((line = br.readLine()) != null){ } IOUtils.closeQuietly(br); } //hdfs的每行数据会调用一次 @Override public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { Counter counter = context.getCounter("group", "mapcount"); counter.increment(1); String line = value.toString(); if(line==null || line.isEmpty()){ return; } Put put = new Put(Bytes.toBytes("3136949-" + userId)); put.add(Bytes.toBytes("info"), Bytes.toBytes("billTypes"), System.currentTimeMillis(), Bytes.toBytes(StringUtils.join(typeidSet, ","))); putList.add(put); if(putList.size()==1000){ bvuser.put(putList); putList.clear(); } } //每个任务只掉用一次 @Override protected void cleanup(Context context) throws IOException, InterruptedException { if(!putList.isEmpty()){ bvuser.put(putList); putList.clear(); } bvuser.close(); }}
2,Main方法
Configuration conf = HBaseConfiguration.create();final Job job = Job.getInstance(conf, "data-save-job");//格式化文件 文本形式 相当于字符串job.setInputFormatClass(TextInputFormat.class);job.setJarByClass(TextFile2HbaseJob.class);//设置map类job.setMapperClass(MyMapper.class);//设置map的输出结果job.setMapOutputKeyClass(NullWritable.class);job.setMapOutputValueClass(Text.class);FileOutputFormat.setOutputPath(job, new Path(output));FileSystem fs = FileSystem.get(conf);FileStatus[] files = fs.listStatus(new Path(input), new PathFilter() { @Override public boolean accept(Path path) { return path.getName().startsWith("part-m"); }});if (files.length == 0) { throw new IllegalStateException("no file");}//整个目录下的文件都加进去 /export/part-m-0000000,part-m-0000000格式for (FileStatus file : files) { FileInputFormat.addInputPath(job, file.getPath());}//设置reduce数0job.setNumReduceTasks(0);