<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.98.6-hadoop2</version>
</dependency>
必要的配置文件(直接拷贝hadoop和hbase环境的配置文件)
注:实际上如果只是hbase,只需要hdfs-site.xml 和 log4j.properties即可
hbase-site.xml配置中,client端只需要zk的配置即可,为简便起见,直接复制hbase-site.xml到类路径下
core-site.xml
hdfs-site.xml
hbase-site.xml
log4j.properties
测试代码示例
package mainf;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
/**
* CRUD
* 参数和返回值一般采用 静态常量(封装到HbaseTableContent) 和 Map进行封装
* @author zhangqingli
*
*/
public class HbaseOperation {
public static final Configuration CONF = HBaseConfiguration.create();
/**
* getHTableByTableaName
*/
public HTable getHTableByTableaName(String tableName) throws IOException {
return new HTable(CONF, tableName);
}
/**
* create table
*/
@Test
public void testCreate() throws Exception {
String tableName = "basic";
HBaseAdmin hadmin = new HBaseAdmin(CONF);
if (hadmin.tableExists(tableName)) {
System.out.println("表已经存在");
hadmin.close();
return;
} else {
HTableDescriptor tableDesc =
new HTableDescriptor(TableName.valueOf(tableName));
tableDesc.addFamily(new HColumnDescriptor("info"));
hadmin.createTable(tableDesc);
System.out.println("创建表成功");
hadmin.close();
}
}
/**
* delete table
*/
@Test
public void testDrop() throws Exception {
String tableName = "basic";
HBaseAdmin hadmin = new HBaseAdmin(CONF);
if (!hadmin.tableExists(tableName)) {
System.out.println("表不存在");
hadmin.close();
return;
} else {
if (!hadmin.isTableDisabled(tableName)) {
hadmin.disableTable(tableName);
}
hadmin.deleteTable(tableName);
System.out.println("表删除成功");
hadmin.close();
}
}
/**
* get
*/
@Test
public void testGet() throws IOException {
//get table
HTable htable = getHTableByTableaName("test:company");
Get get = new Get(Bytes.toBytes("1004"));
//get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
//get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));
Result result = htable.get(get);
for (Cell cell : result.rawCells()) { //主类类型,类型不正确将取不到值!
String famlyColumn = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(famlyColumn + ":" + column + " -> " + value);
}
//close table
htable.close();
}
/**
* scan
*/
@Test
public void testScan() throws IOException {
HTable htable = getHTableByTableaName("user");
Scan scan = new Scan();
/*
* 设置Scan的扫描顺序,默认是正向扫描(false)(version>=0.98)
*/
//scan.setReversed(false);
/*
* range
*/
//scan.setStartRow(Bytes.toBytes("1001")); //包头不包尾
//scan.setStopRow(Bytes.toBytes("1003"));
/*
* setTimeRange
*/
//scan.setTimeRange(new Date().getTime(), new Date().getTime());
/*
* setTimestamp
*/
//scan.setTimeStamp(new Date().getTime());
/*
* addColumn
*/
//scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
//scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));
/*
* filter(一般filter查询会比较慢,常用的filter只有PrefixFilter、PageFilter)
*/
//scan.setFilter(new PrefixFilter(Bytes.toBytes("100")));
/*
* 设置是否缓存块,默认true缓存数据块
* 我们分内存,缓存和磁盘,三种方式,一般数据的读取为 内存->缓存->磁盘
* 当MR的时候为非热点数据,因此不需要缓存
*/
//scan.setCacheBlocks(true);
/*
* 设置每次从服务器端读取的行数,默认值为-1无限制,
* 当设置了setCaching(n)后,我们的server会从regin server上读取出n条数据。
* 那么client端读取数据的时候会直接从server的缓存中返回,但是如果每次你只需
* 要读取100条记录,但是设置了setCaching(1000),那么每次都会从region server
* 多余的拿出900条记录,这样会让应用的server内存吃不消了比较好的解决方案就是
* 设置setCaching(n)为实际需要的记录数。
*/
//scan.setCaching(-1);
/*
* 设置获取记录的列个数,默认无限制,也就是返回所有的列
*/
//scan.setBatch(1);
/*
* 激活或者禁用raw模式。如果raw模式被激活,Scan将返回 所有已经被打上删除标记
* 但尚未被真正删除 的数据。该功能仅用于激活了KEEP_DELETED_ROWS的列族,即列
* 族开启了 hcd.setKeepDeletedCells(true),Scan激活raw模式后,就不能指定
* 任意的列,否则会报错
*/
//scan.setRaw(false);
ResultScanner resultScanner = htable.getScanner(scan);
for (Result result : resultScanner) {
System.out.println(Bytes.toString(result.getRow()));
System.out.println(result);
System.out.println("-----------");
}
IOUtils.closeStream(htable);
}
/**
* put
*/
@Test
public void testPut() throws IOException {
HTable htable = getHTableByTableaName("user");
Put put = new Put(Bytes.toBytes("1004"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("zhaoliu"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(25));
htable.put(put);
htable.close();
}
/**
* delete
*/
@Test
public void testDelete() throws IOException {
HTable htable = getHTableByTableaName("user");
Delete delete = new Delete(Bytes.toBytes("1002"));
//删除最新版本cell
delete.deleteColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));
//删除所有版本cell
//delete.deleteColumns(Bytes.toBytes("info"), Bytes.toBytes("age"));
//删除一行记录的所有列,注意javaapi没有deleteAll
//delete.deleteFamily(Bytes.toBytes("info"));
htable.delete(delete);
htable.close();
}
}
package com.hbase.test02_with_mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* User2BasicMapReduce
* @author zhangqingli
*
*/
public class User2BasicMapReduce extends Configured implements Tool {
/**
* Mapper
*/
public static class ReadUserMapper extends TableMapper<Text, Put> {
private Text mapOutputKey = new Text();
@Override
public void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
throws IOException, InterruptedException {
// get rowkey
String rowKey = Bytes.toString(key.get());
// set
mapOutputKey.set(rowKey);
Put put = new Put(key.get());
for (Cell cell : value.rawCells()) {
// add family:qualifier
if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
// add column
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
put.add(cell);
}
if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
put.add(cell);
}
}
}
//
context.write(mapOutputKey, put);
}
}
/**
* Reducer
*/
public static class WriteBasicReducer
extends TableReducer<Text, Put, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<Put> values,
Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
for (Put put : values) {
context.write(null, put);
}
}
}
/**
* Driver
*/
@Override
public int run(String[] args) throws Exception {
// create job
Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
// set runnable job class
job.setJarByClass(this.getClass());
// set job
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set input & mapper
TableMapReduceUtil.initTableMapperJob("user", // input table
scan, // Scan instance to control CF and attribute selection
ReadUserMapper.class, // mapper class
Text.class, // mapper output key
Put.class, // mapper output value
job);
// set reducer & output
TableMapReduceUtil.initTableReducerJob("basic", WriteBasicReducer.class, job);
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
/**
* main
*/
public static void main(String[] args) throws Exception {
// get conf
Configuration conf = HBaseConfiguration.create();
// submit job
int status = ToolRunner.run(conf, new User2BasicMapReduce(), args);
// exit program
System.exit(status);
}
}
package com.hbase.test03_etl_with_put_api;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
/**
* CompanyEtlWithPutTool
* @author zhangqingli
*
*/
public class CompanyEtlWithPutTool {
private static volatile CompanyEtlWithPutTool INSTANCE;
public static CompanyEtlWithPutTool getInstance() {
if (INSTANCE==null) {
synchronized (CompanyEtlWithPutTool.class) {
if (INSTANCE==null) {
INSTANCE = new CompanyEtlWithPutTool();
}
}
}
return INSTANCE;
}
/**
* connect hbase
*/
public HTable getHTable(String tablename) throws Exception {
Configuration conf = HBaseConfiguration.create();
return new HTable(conf, tablename);
}
public void releaseHTable(HTable htable) {
try {
if (htable != null) {
htable.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* connect mysql
*/
public Connection getConn() throws Exception {
String userName = "root";
String password = "23wesdxc";
String url = "jdbc:mysql://dbserver:3306/gsxt";
Class.forName("com.mysql.jdbc.Driver").newInstance();
return DriverManager.getConnection(url, userName, password);
}
public void releaseConn(Connection conn, Statement st, ResultSet rs) {
try {
if (conn != null) {
conn.close();
}
if (st != null) {
st.close();
}
if (rs != null) {
rs.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* extract data from mysql to hbase
*/
public void etl() {
Connection conn = null;
PreparedStatement pst = null;
ResultSet rs = null;
String sql = "select * from company";
HTable htable = null;
try {
conn = getConn();
htable = getHTable("test:company");
pst = conn.prepareStatement(sql);
rs = pst.executeQuery();
while (rs.next()) {
long id = rs.getLong("id");
String city = rs.getString("city");
String email = rs.getString("eMail");
String name = rs.getString("name");
int num = rs.getInt("num");
int priority = rs.getInt("priority");
int state = rs.getInt("state");
int total_num = rs.getInt("total_num");
String webAddress = rs.getString("webAddress");
System.out.println(city + " : " + name);
String rowKey = id+"";
Put put = new Put(Bytes.toBytes(rowKey));
byte[] family1 = Bytes.toBytes("info");
if (city!=null) {
put.add(family1, Bytes.toBytes("city"), Bytes.toBytes(city));
}
if (email!=null) {
put.add(family1, Bytes.toBytes("eMail"), Bytes.toBytes(email));
}
if (name!=null) {
put.add(family1, Bytes.toBytes("name"), Bytes.toBytes(name));
}
put.add(family1, Bytes.toBytes("num"), Bytes.toBytes(num));
put.add(family1, Bytes.toBytes("priority"), Bytes.toBytes(priority));
put.add(family1, Bytes.toBytes("state"), Bytes.toBytes(state));
put.add(family1, Bytes.toBytes("total_num"), Bytes.toBytes(total_num));
if (webAddress!=null) {
put.add(family1, Bytes.toBytes("webAddress"),
Bytes.toBytes(webAddress));
}
htable.put(put);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
releaseConn(conn, pst, rs);
releaseHTable(htable);
}
}
public static void main(String[] args) {
getInstance().etl();
}
}
依赖包
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.3</version>
</dependency>
必要的配置文件(直接拷贝hadoop和hbase环境的配置文件)
注:实际上如果只是hbase,只需要hdfs-site.xml 和 log4j.properties即可
hbase-site.xml配置中,client端只需要zk的配置即可,为简便起见,直接复制hbase-site.xml到类路径下
hbase-site.xml
log4j.properties
类型 | 特点 | 场合 | 优缺点分析 |
Native Java API | 最常规和高效的访问方式 | 适合MapReduce作业并行批处理HBase表数据 | |
Hbase Shell | HBase的命令行工具,最简单的访问方式 | 适合HBase管理使用 | |
Thrift GateWay | 利用Thrift序列化技术,支持c++,PHP,Python等多种语言 | 适合其他异构系统在线访问HBase表数据 | |
REST Gateway | 解除了语言限制 | 支持REST风格的Http API访问Hbase | |
Pig | 使用Pig Latin流式编程语言来处理HBase中的数据 | 适合做数据统计 | |
Hive | 简单 | 可以以类似SQL方式访问Hbase | |
Phoenix | 构建在Apache Hbase上的一个SQL中间层,可以让开发者在HBase上执行SQL查询。 | 对于简单的低延迟查询,量级为毫秒;对于百万级别的行数来说,量级为秒;对于10万到100万行的简单查询,胜过Hive.对于使用了Hbase API,协同处理器及自定义过滤器的Impala与OpenTSDB来说,进行相似的查询,Phoenix的速度更快一些 | |
Kundera | Kundera是一个JPA 2.0兼容的NoSQL数据存储的对象映射框架。Kundera基于现有类库构建,封装出简易的API | 支持交叉数据存储持久性,意味着用户可以在不同的数据存储使用单一方法存储和获取相关实体。 能够会很好的管理事务,同时支持EntityTransaction和Java Transction API; 兼容JPA 2.0,严格使用JPA注解对象映射到数据库存储表。 目前只是的NoSQL服务包括:Cassandra,HBase,MongoDB,Redis,OracleNoSQL,Neo4j; | |
Lealone | 淘宝出品, 支持高性能的分布式事务,使用一个非常新颖的,基于局部时间戳的多版本冲突有效性检测的事务模型,是对H2关系数据库的改进和扩展,是一个100%纯Java的将BigTable和RDBMS融合的数据库。 | | |
hbase-sql | 在Hbase提供的API中,使用Scan来查询数据,在hbase-sql实现过程中将SQL语句转换成Scan,然后进行具体查询,主要处理流程: SQL语句--SQL解析器--SQL语法结点--Scan-hbase-ResultScanner--List 感觉下来还是解析完SQL,然后整表扫描,拿到一个结果集 | | |
Interactive Query | Intel研发的基于HBase的SQL引擎层,是非开源的,是Intel Hadoop发行版的一个模块,使用封装的HBase查询引擎层来解析HiveQL,该引擎拥有更高的性能。 | | |
Impala | Cloudera发布实施查询开源项目Impala,经多款产品测试表明,比原来基于MapReduce的Hive SQL查询速度提升了3-90倍。Impala是Google Dremel的模仿,但是在SQL的性能上更好。 | Impala采用与Hive相同的元数据,SQL语法,ODBC驱动程序和用户接口,这样在使用CDH产品时,批处理和实时查询的平台是统一的。 | 一些兼容性问题 |