Hbase 集成 MapReduce(三)

KinglyJn      2017-08-26

在hadoop环境运行hbase自带的MR任务

# 1. 运行 bin/hbase mapredcp 查看MR集成hbase所需要的jar包
bin/hbase mapredcp
  lib/high-scale-lib-1.1.1.jar
  lib/netty-3.6.6.Final.jar
  lib/zookeeper-3.4.6.jar
  lib/htrace-core-2.04.jar
  lib/hbase-client-0.98.6-hadoop2.jar
  lib/protobuf-java-2.5.0.jar
  lib/hbase-server-0.98.6-hadoop2.jar
  lib/guava-12.0.1.jar
  lib/hbase-common-0.98.6-hadoop2.jar
  lib/hbase-hadoop-compat-0.98.6-hadoop2.jar
  lib/hbase-protocol-0.98.6-hadoop2.jar

# 2. 每个hadoop MR节点设置hadoop的classpath(需要每个节点存在hbase的jar包支持)
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
$HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar

# 3. 查看hbase自带的MR-jar包的一些功能
$HADOOP_HOME/bin/yarn jar /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar 
An example program must be given as the first argument.
Valid program names are:
  CellCounter: Count cells in HBase table 		#统计hbase表中的cell的个数
  completebulkload: Complete a bulk data load.	#完成桶状数据的加载【csv/tsv-->hfile-->load】
  copytable: Export a table from local cluster to peer cluster	#备份表数据
  export: Write table data to HDFS.				#将hbase表的数据导出到HDFS
  import: Import data written by Export.		#将导出的数据导入到hbase表
  importtsv: Import data in TSV format.			#导入TSV格式的数据(字段和字段之间是tab制表符)
  rowcounter: Count rows in HBase table 		#统计hbase中有多少【行】数据
  verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
  
# 4. 测试一个hbase-default-MR任务(查看user表中有多少行数据)
$HADOOP_HOME/bin/yarn jar /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar rowcounter 'user'


根据实际需求编写hbase的MR任务jar

参考

依赖包

<!-- hadoop -->
<dependency>
 	 <groupId>org.apache.hadoop</groupId>
  	<artifactId>hadoop-client</artifactId>
  	<version>2.5.0</version>
</dependency>

<!-- hbase -->
<dependency>
  	<groupId>org.apache.hbase</groupId>
  	<artifactId>hbase-server</artifactId>
  	<version>0.98.6-hadoop2</version>
</dependency>
<dependency>
  	<groupId>org.apache.hbase</groupId>
  	<artifactId>hbase-client</artifactId>
  	<version>0.98.6-hadoop2</version>
</dependency>

必要的配置文件:

core-site.xml
hdfs-site.xml
hbase-site.xml
log4j.properties

hbase-mapreduce-task java api 示例:

package com.hbase.test02_with_mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * User2BasicMapReduce
 * 将user表中info:name,info:age两列复制到basic表中(basic表事先必须存在!)
 * @author zhangqingli
 *
 */
public class User2BasicMapReduce extends Configured implements Tool {
	/**
	 * Mapper
	 */
	public static class ReadUserMapper extends TableMapper<Text, Put> {
		private Text mapOutputKey = new Text();

		@Override
		public void map(ImmutableBytesWritable key, Result value,
				Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
				throws IOException, InterruptedException {
			// get rowkey
			String rowKey = Bytes.toString(key.get());
			// set
			mapOutputKey.set(rowKey);

			Put put = new Put(key.get());
			for (Cell cell : value.rawCells()) {
				// add family:qualifier
				if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
					// add column
					if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
						put.add(cell);
					}
					if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
						put.add(cell);
					}
				}
			}
			//
			context.write(mapOutputKey, put);
		}
	}

	/**
	 * Reducer
	 */
	public static class WriteBasicReducer 
      		extends TableReducer<Text, Put, ImmutableBytesWritable> {
		@Override
		protected void reduce(Text key, Iterable<Put> values,
				Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
				throws IOException, InterruptedException {
			for (Put put : values) {
				context.write(null, put);
			}
		}
	}

	/**
	 * Driver
	 */
	@Override
	public int run(String[] args) throws Exception {
		// create job
		Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
		
		// set runnable job class
		job.setJarByClass(this.getClass());
		// set job
		Scan scan = new Scan();
		scan.setCaching(500); 		// default is 1, which will be bad for MapReduce jobs
		scan.setCacheBlocks(false); // don't set to true for MR jobs
		// set input & mapper
		TableMapReduceUtil.initTableMapperJob(
          		"user", // input table
				scan,   // Scan instance to control CF and attribute selection
				ReadUserMapper.class, 	// mapper class
				Text.class, 			// mapper output key
				Put.class, 				// mapper output value
				job);
		// set reducer & output
		TableMapReduceUtil.initTableReducerJob("basic", WriteBasicReducer.class, job);
		job.setNumReduceTasks(1); 
		
		boolean isSuccess = job.waitForCompletion(true);
		return isSuccess ? 0 : 1;
	}

	/**
	 * main
	 */
	public static void main(String[] args) throws Exception {
		// get conf
		Configuration conf = HBaseConfiguration.create();
		// submit job
		int status = ToolRunner.run(conf, new User2BasicMapReduce(), args);
		// exit program
		System.exit(status);
	}
}

上传jar包到某个nodemanager节点,运行MR任务:

export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

$HADOOP_HOME/bin/yarn jar /opt/datas/zdemo-hbase.jar



Tags:


Share: