# 1. 运行 bin/hbase mapredcp 查看MR集成hbase所需要的jar包
bin/hbase mapredcp
lib/high-scale-lib-1.1.1.jar
lib/netty-3.6.6.Final.jar
lib/zookeeper-3.4.6.jar
lib/htrace-core-2.04.jar
lib/hbase-client-0.98.6-hadoop2.jar
lib/protobuf-java-2.5.0.jar
lib/hbase-server-0.98.6-hadoop2.jar
lib/guava-12.0.1.jar
lib/hbase-common-0.98.6-hadoop2.jar
lib/hbase-hadoop-compat-0.98.6-hadoop2.jar
lib/hbase-protocol-0.98.6-hadoop2.jar
# 2. 每个hadoop MR节点设置hadoop的classpath(需要每个节点存在hbase的jar包支持)
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
$HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar
# 3. 查看hbase自带的MR-jar包的一些功能
$HADOOP_HOME/bin/yarn jar /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar
An example program must be given as the first argument.
Valid program names are:
CellCounter: Count cells in HBase table #统计hbase表中的cell的个数
completebulkload: Complete a bulk data load. #完成桶状数据的加载【csv/tsv-->hfile-->load】
copytable: Export a table from local cluster to peer cluster #备份表数据
export: Write table data to HDFS. #将hbase表的数据导出到HDFS
import: Import data written by Export. #将导出的数据导入到hbase表
importtsv: Import data in TSV format. #导入TSV格式的数据(字段和字段之间是tab制表符)
rowcounter: Count rows in HBase table #统计hbase中有多少【行】数据
verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
# 4. 测试一个hbase-default-MR任务(查看user表中有多少行数据)
$HADOOP_HOME/bin/yarn jar /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar rowcounter 'user'
依赖包
<!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.0</version>
</dependency>
<!-- hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>0.98.6-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.98.6-hadoop2</version>
</dependency>
必要的配置文件:
core-site.xml
hdfs-site.xml
hbase-site.xml
log4j.properties
hbase-mapreduce-task java api 示例:
package com.hbase.test02_with_mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* User2BasicMapReduce
* 将user表中info:name,info:age两列复制到basic表中(basic表事先必须存在!)
* @author zhangqingli
*
*/
public class User2BasicMapReduce extends Configured implements Tool {
/**
* Mapper
*/
public static class ReadUserMapper extends TableMapper<Text, Put> {
private Text mapOutputKey = new Text();
@Override
public void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
throws IOException, InterruptedException {
// get rowkey
String rowKey = Bytes.toString(key.get());
// set
mapOutputKey.set(rowKey);
Put put = new Put(key.get());
for (Cell cell : value.rawCells()) {
// add family:qualifier
if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
// add column
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
put.add(cell);
}
if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
put.add(cell);
}
}
}
//
context.write(mapOutputKey, put);
}
}
/**
* Reducer
*/
public static class WriteBasicReducer
extends TableReducer<Text, Put, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<Put> values,
Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
for (Put put : values) {
context.write(null, put);
}
}
}
/**
* Driver
*/
@Override
public int run(String[] args) throws Exception {
// create job
Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
// set runnable job class
job.setJarByClass(this.getClass());
// set job
Scan scan = new Scan();
scan.setCaching(500); // default is 1, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set input & mapper
TableMapReduceUtil.initTableMapperJob(
"user", // input table
scan, // Scan instance to control CF and attribute selection
ReadUserMapper.class, // mapper class
Text.class, // mapper output key
Put.class, // mapper output value
job);
// set reducer & output
TableMapReduceUtil.initTableReducerJob("basic", WriteBasicReducer.class, job);
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
/**
* main
*/
public static void main(String[] args) throws Exception {
// get conf
Configuration conf = HBaseConfiguration.create();
// submit job
int status = ToolRunner.run(conf, new User2BasicMapReduce(), args);
// exit program
System.exit(status);
}
}
上传jar包到某个nodemanager节点,运行MR任务:
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
$HADOOP_HOME/bin/yarn jar /opt/datas/zdemo-hbase.jar