Hbase 数据迁移的常见方式(四)

KinglyJn      2017-08-26

要使用Hadoop,需要将现有的各种类型的数据库或数据文件中的数据导入HBase。一般而言,有三种常见方式:使用HBase的API中的Put方法,使用HBase 的bulk load工具和使用定制的MapReduce Job方式。本文均有详细描述。

数据导入hbase的常见方法

数据的来源:

  • 日志
  • RDBMS


导入的常见方法:

  • 使用hbase put API(sqoop、kettle)
    • 使用HBase的API中的Put是最直接的方法,用法也很容易学习。但针对大部分情况,它并非都是最高效的方式。当需要将海量数据在规定时间内载入HBase中时,效率问题体现得尤为明显。待处理的数据量一般都是巨大的,这也许是为何我们选择了HBase而不是其他数据库的原因。在项目开始之前,你就该思考如何将所有能够很好的将数据转移进HBase,否则之后可能面临严重的性能问题。
  • 使用hbase内置的importtsv 或 自定义的MR job

    • importtsv 是从TSV文件直接加载内容至HBase的一个内置工具。它通过运行一个MapReduce Job,将数据从TSV文件中直接写入HBase的表或者写入一个HBase的自有格式数据文件。importtsv工具默认直接将数据加载到hbase数据库。
  • bulk load方式快速加载巨量数据(优雅常用)

    hbase支持bulk load的入库方式,它是利用hbase的数据信息按照特定的格式存储在hdfs这一原理,直接在hdfs中生成持久化的HFile数据格式文件,然后上传至合适的位置,即完成巨量数据快速入库的方法。配合mapreduce完成,高效便捷,而且不占用region资源,在大数据量写入时能极大地提高写入效率,并降低对hbase节点的写入压力。

    1. hbase自带的importtsv工具默认直接将数据加载到hbase数据库,我们也可以选择先生成HFile数据文件,importtsv源码参照 hbase-server-0.98.6-hadoop2.jar的org.apache.hadoop.hbase.mapreduce:ImportTsv类。
    2. 如果使用importtsv工具生成HFile数据文件,还需要使用hbase自带的MR工具completebulkload来将生成的hfile文件移动到hbase regions对应的hdfs路径,以完成hbase数据的加载。
    $ hadoop-2.5.0/bin/yarn jar hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar importtsv
    ...
    By default importtsv will load data directly into HBase. To instead generate
    HFiles of data to prepare for a bulk data load, pass the option:
      -Dimporttsv.bulk.output=/path/for/output
      Note: if you do not use this option, then the target table must already exist in HBase
        
    Other options that may be specified with -D include:
      -Dimporttsv.skip.bad.lines=false - fail if encountering an invalid line
      '-Dimporttsv.separator=|' - eg separate on pipes instead of tabs
      -Dimporttsv.timestamp=currentTimeAsLong - use the specified timestamp for the import
      -Dimporttsv.mapper.class=my.Mapper - A user-defined Mapper to use instead of org.apache.hadoop.hbase.mapreduce.TsvImporterMapper
      -Dmapred.job.name=jobName - use the specified mapreduce job name for the import
    For performance consider the following options:
      -Dmapred.map.tasks.speculative.execution=false
      -Dmapred.reduce.tasks.speculative.execution=false
    


使用hbase put API向hbase导入数据

下面我们将通过单个客户端向hbase导入MySQL数据。

数据合并最常见的应用场景就是从已经存在的关系型数据库将数据导入到HBase中。对于此类型任务,最简单直接的方式就是从一个单独的客户端获取数据,然后通过HBase的API中Put方法将数据存入HBase中。这种方式适合处理数据不是太多的情况。

本节描述的是使用Put方法将MySQL数据导入HBase中的方式。所有的操作均是在一个单独的客户端执行,并且不会使用到MapReduce。本节将会带领你通过HBase Shell创建HBase表格,通过Java来连接集群,并将数据导入HBase。

1) 数据准备工作

-- 数据源:mysql 数据结构(测试数据55480条,耗时90s)
CREATE TABLE `company` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `city` varchar(255) DEFAULT NULL,
  `eMail` varchar(255) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  `num` int(11) DEFAULT '0',
  `priority` int(11) DEFAULT '0',
  `state` int(11) DEFAULT '0',
  `total_num` int(11) DEFAULT '0',
  `webAddress` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `UK_ccs28shnfm7mtptg0u7gjbm0a` (`city`,`name`)
) ENGINE=InnoDB AUTO_INCREMENT=77580 DEFAULT CHARSET=utf8

-- 数据为null的情况下一条记录的空字段左右两边各有一个\t制表符(如:Beijing 和 北京丽源公司香精厂 之间有两个\t)
1	Beijing		北京丽源公司香精厂	2	0	7	0	
2	Beijing		北京市汇金金属配件制品有限公司	1	0	1	1	
4	Beijing		北京石油管理干部学院	1	0	7	1	
5	Beijing	lihongmei813@126.com	北京市西三旗高新建材城经营开发公司	1	0	7	1	
6	Beijing	lbian@yahoo.cn	北京中关村生命科学园发展有限责任公司	1	0	1	1	www.lifesciencepark.com.cn
7	Beijing		北京鑫丽泽家具有限公司	1	0	1	1	


-- hbase 数据库表
create namespace 'test'
create 'test:company', {NAME=>'info', VERSIONS=>'1'}

2) 编写java api:

package com.hbase.test03_etl_with_put_api;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

/**
 * CompanyEtlWithPutTool
 * @author zhangqingli
 *
 */
public class CompanyEtlWithPutTool {
	private static volatile CompanyEtlWithPutTool INSTANCE;
	public static CompanyEtlWithPutTool getInstance() {
		if (INSTANCE==null) {
			synchronized (CompanyEtlWithPutTool.class) {
				if (INSTANCE==null) {
					INSTANCE = new CompanyEtlWithPutTool();
				}
			}
		}
		return INSTANCE;
	}
	
	/**
	 * connect hbase
	 */
	public HTable getHTable(String tablename) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		return new HTable(conf, tablename);
	}
	public void releaseHTable(HTable htable) {
		try {
			if (htable != null) {
				htable.close();
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/**
	 * connect mysql
	 */
	public Connection getConn() throws Exception {
		String userName = "root";
		String password = "23wesdxc";
		String url = "jdbc:mysql://dbserver:3306/gsxt";
		Class.forName("com.mysql.jdbc.Driver").newInstance();
		return DriverManager.getConnection(url, userName, password);
	}
	public void releaseConn(Connection conn, Statement st, ResultSet rs) {
		try {
			if (conn != null) {
				conn.close();
			}
			if (st != null) {
				st.close();
			}
			if (rs != null) {
				rs.close();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * extract data from mysql to hbase
	 */
	public void etl() {
		Connection conn = null;
		PreparedStatement pst = null;
		ResultSet rs = null;
		String sql = "select * from company";
		HTable htable = null;
		
		try {
			conn = getConn();
			htable = getHTable("test:company");
			
			pst = conn.prepareStatement(sql);
			rs = pst.executeQuery();
			while (rs.next()) {
				long id = rs.getLong("id");
				String city = rs.getString("city");
				String email = rs.getString("eMail");
				String name = rs.getString("name");
				int num = rs.getInt("num");
				int priority = rs.getInt("priority");
				int state = rs.getInt("state");
				int total_num = rs.getInt("total_num");
				String webAddress = rs.getString("webAddress");
				
				System.out.println(city + " : " + name);
				String rowKey = id+"";
				Put put = new Put(Bytes.toBytes(rowKey));
				byte[] family1 = Bytes.toBytes("info");
				
				if (city!=null) {
					put.add(family1, Bytes.toBytes("city"), Bytes.toBytes(city));
				}
				if (email!=null) {
					put.add(family1, Bytes.toBytes("eMail"), Bytes.toBytes(email));
				}
				if (name!=null) {
					put.add(family1, Bytes.toBytes("name"), Bytes.toBytes(name));
				}
				put.add(family1, Bytes.toBytes("num"), Bytes.toBytes(num));
				put.add(family1, Bytes.toBytes("priority"), Bytes.toBytes(priority));
				put.add(family1, Bytes.toBytes("state"), Bytes.toBytes(state));
				put.add(family1, Bytes.toBytes("total_num"), Bytes.toBytes(total_num));
				if (webAddress!=null) {
					put.add(family1, Bytes.toBytes("webAddress"),
                            Bytes.toBytes(webAddress));
				}
				htable.put(put);
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			releaseConn(conn, pst, rs);
			releaseHTable(htable);
		}
	}

	public static void main(String[] args) {
		getInstance().etl();
	}
}

3) 打包执行,在hbase shell客户端验证导入是否成功

> count 'test:company'
55480 row(s) in 2.1610 seconds


通过hbase-importtsv MR任务导入数据

1) 准备数据

-- 数据源:mysql 数据结构(测试数据55480条,整个过程耗时16s)
-- 这个参数是根据RFC4180文档设置的
-- 该文档全称Common Format and MIME Type for Comma-Separated Values (CSV) Files
-- 其中详细描述了CSV格式,其要点包括:
-- (1)字段之间以制表符分隔,数据行之间以\r\n分隔;
-- (2)字符串以半角双引号包围,字符串本身的双引号用两个双引号表示。
-- (3)如果遇到访问权限的问题,可以查看和更改 /etc/apparmor.d/usr.sbin.mysqld 文件
select * from gsxt.company   
into outfile '/var/lib/mysql/company2.tsv'   
fields terminated by '\t' optionally enclosed by '"' escaped by '"'   
lines terminated by '\r\n';

load data infile '/var/lib/mysql/company2.tsv'  
into table gsxt.company 
fields terminated by '\t'  optionally enclosed by '"' escaped by '"'  
lines terminated by '\r\n';


-- hbase 数据库表
-- create namespace 'test'
create 'test:company2', {NAME=>'info', VERSIONS=>'1'}

2) 运行hbase自带的importtsv-MR任务向hbase导入数据

# 设置环境变量HADOOP_CLASSPATH
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

# 将准备的tsv数据文件上传到hdfs(我上传到了/user/datas/company2目录)
$ hadoop-2.5.0/bin/hdfs -put /opt/datas/company2.tsv /user/ubuntu/datas/company2 

# 运行hbase自带的importtsv-MR任务向hbase导入数据
$ hadoop-2.5.0/bin/yarn jar hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\ info:city,info:email,info:name,info:num,info:priority,info:state,info:total_num,info:webAddress \
test:company2 \
/user/ubuntu/datas/company2

# 在hbase shell客户端验证导入是否成功
count 'test:company'
55480 row(s) in 2.1610 seconds


bulk load方式快速加载巨量数据

# 0. 在hdfs /user/datas/company3 (自定义)目录准备待导入的tsv数据文件
$ hadoop-2.5.0/bin/hdfs dfs -mkdir -p /user/ubuntu/datas/company3
$ hadoop-2.5.0/bin/hdfs dfs -put /opt/datas/company3.tsv /user/ubuntu/datas/company3 

# 1. 设置HADOOP_CLASSPATH环境变量
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`


# 2. 使用importtsv工具生成HFile文件(注意参数的顺序,注意设置的字段个数及顺序需要对应)
$ hadoop-2.5.0/bin/yarn jar hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
['-Dimporttsv.separator=,'] #经过测试这个参数的存在可能会使导出的条数不对,使用默认的\t分隔符
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:city,info:email,info:name,info:num,info:priority,info:state,info:total_num,info:webAddress \ 					  #这里使用HBASE_ROW_KEY充当第一列(即id列)
-Dimporttsv.bulk.output=/user/ubuntu/hbase/hfileoutput \
test:company3 \
/user/ubuntu/datas/company3


# 3. 如果使用importtsv工具生成HFile数据文件,还需要使用hbase自带的MR工具completebulkload来将生成的hfile文件移动到hbase regions对应的hdfs路径,以完成hbase数据的加载。
$ hadoop-2.5.0/bin/yarn jar hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar completebulkload \
/user/ubuntu/hbase/hfileoutput \
test:company3

# 4. 在hbase shell客户端验证导入是否成功
count 'test:company'
55480 row(s) in 2.1610 seconds

使用importtsv工具生成的HFile文件:

使用completebulkload工具将生成的hfile文件移动到hbase regions对应的hdfs路径,以完成hbase数据的加载,这个时候importtsv工具生成的HFile文件被【移动】到company3表regions对应的hdfs文件路径,数据加载完成。


Tags:


Share: