https://hbase.apache.org/book.html#_preface
https://blogs.apache.org/hbase/
https://research.google.com/archive/bigtable.html
什么是Hbase?
hadoop 数据库:分布式、可伸缩、大数据存储。
最开始引入 hbase-client,服务有使用【google/protobuf/wrappers.proto】,有很多包冲突,所以直接使用了 habase-shade-client:
**hbase.zookeeper.quorum
**zookeeper server 地址,逗号分割。本地模式和伪集群模式下,默认为 127.0.0.1
**hbase.zookeeper.property.clientPort
**zookeeper server 端口,默认 2181
**hbase.client.retries.number
**hbase client 所有操作的重试上限,默认 15。client 首先等待 hbase.client.pause 执行第一次重试,之后每隔 10s 再次执行。
**hbase.rpc.timeout
**hbase client 一次 rpc 操作的超时时间(超时基于ping检查),默认60000ms,触发则抛出 TimeoutException 异常。
**hbase.client.operation.timeout
**hbase client 一次操作的总的时间限制, 默认 1200000ms,触发则直接抛出 SocketTimeoutException 异常。
示例:
@Configuration
public class HBaseConfig {
@Value("${hbase.zookeeper.quorum}")
private String hbaseZkQuorum;
@Value("${hbase.zookeeper.property.clientPort:2181}")
private String hbaseZkPort;
@Value("${hbase.client.retries.number:2}")
private String hbaseClientRetry;
@Value("${hbase.rpc.timeout:2000}")
private String hbaseRpcTimeout;
@Value("${hbase.client.operation.timeout:3000}")
private String hbaseClientOperationTimeout;
@Bean
public Connection hbaseConnection() throws IOException {
org.apache.hadoop.conf.Configuration hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("hbase.zookeeper.property.clientPort", hbaseZkPort);
hbaseConfig.set("hbase.zookeeper.quorum", hbaseZkQuorum);
hbaseConfig.set("hbase.client.retries.number", hbaseClientRetry);
hbaseConfig.set("hbase.client.operation.timeout", hbaseClientOperationTimeout);
hbaseConfig.set("hbase.rpc.timeout", hbaseRpcTimeout);
return ConnectionFactory.createConnection(hbaseConfig);
}
@Bean
public HbaseSimpleTemplate hbaseSimpleTemplate(@Qualifier("hbaseConnection") Connection hbaseConnection) {
return new HbaseSimpleTemplate(hbaseConnection);
}
}
集群 connection 封装了底层和实际 hbase server 及 zookeeper 的连接。由 ConnectionFactory 创建并由发起端维护其整个生命周期。
承载了服务发现(hbase master 及 region server)及本地缓存维护(存储及更新)逻辑。所以基于此链接实例化而来的 Table 和 Admin 共享此信息。
Connection 创建是一个很重的操作。
Connection 实现是 thread-safe 的。
所以通常的操作时,一次创建,到处使用。
这里我们通过 @Bean 注解,将 connection 实例交由 spring 管理,维护其从创建,使用到销毁的整个生命周期。
Hbase Connection 数据操作封装:
row->column->all cells
row->column->cells
rows->column->cells
public class HbaseSimpleTemplate {
private Connection hbaseConnection;
public HbaseSimpleTemplate(Connection hbaseConnection) {
this.hbaseConnection = hbaseConnection;
}
/**
* 结果映射map
*
* @param result
* @return
*/
private Map
if (result == null || result.isEmpty()) {
return new HashMap<>();
}
return result.listCells().stream().collect(
Collectors.toMap(cell -> Bytes.toString(CellUtil.cloneQualifier(cell)), cell -> Bytes.toString(CellUtil.cloneValue(cell))));
}
/**
* 查询
* @param tableName
* @param rowName
* @param familyName
* @return
* @throws IOException
*/
public Map
Map
return resultMap.values().stream().findFirst().orElse(new HashMap<>());
}
/**
*
* @param tableName
* @param rowName
* @param familyName
* @param qualifiers
* @return
* @throws IOException
*/
public Map
Map
return resultMap.values().stream().findFirst().orElse(new HashMap<>());
}
/**
* 批量查询
*
* @param tableName
* @param rowNames
* @param familyName
* @return
* @throws IOException
*/
public Map
Map
List
rowNames.forEach(rowName -> {
Get get = new Get(rowName.getBytes());
if (CollectionUtils.isNotEmpty(qualifiers)) {
qualifiers.forEach(qualifier -> get.addColumn(familyName.getBytes(), qualifier.getBytes()));
} else {
get.addFamily(familyName.getBytes());
}
gets.add(get);
});
Arrays.stream(hbaseConnection.getTable(TableName.valueOf(tableName)).get(gets))
.forEach(result -> {
Map
String id = MapUtils.getString(kvMap, "id");
if (StringUtils.isNotBlank(id)) {
resultMap.put(id, kvMap);
}
});
return resultMap;
}
/**
* 写入 qualifier
*
* @param tableName
* @param rowName
* @param familyName
* @param qualifier
* @param value
* @return
* @throws IOException
*/
public boolean put(String tableName, String rowName, String familyName, String qualifier, String value) throws IOException {
Map
qv.put(qualifier, value);
put(tableName, rowName, familyName, qv);
return true;
}
/**
* 写入 qualifiers
*
* @param tableName
* @param rowName
* @param familyName
* @param qualifierValues
* @return
* @throws IOException
*/
public boolean put(String tableName, String rowName, String familyName, Map
if (MapUtils.isEmpty(qualifierValues)) {
return false;
}
List
qualifierValues.forEach((qualifier, value) -> puts.add(new Put(rowName.getBytes()).addColumn(familyName.getBytes(), qualifier.getBytes(), value.getBytes())));
hbaseConnection.getTable(TableName.valueOf(tableName)).put(puts);
return true;
}
/**
* 删除
*
* @param tableName
* @param rowName
* @param familyName
* @return
* @throws IOException
*/
public boolean del(String tableName, String rowName, String familyName) throws IOException {
Delete delete = new Delete(rowName.getBytes());
delete.addFamily(familyName.getBytes());
hbaseConnection.getTable(TableName.valueOf(tableName)).delete(delete);
return true;
}
/**
* 删除 qualifier
*
* @param tableName
* @param rowName
* @param familyName
* @param qualifiers
* @return
* @throws IOException
*/
public boolean delQualifiers(String tableName, String rowName, String familyName, List
Delete delete = new Delete(rowName.getBytes());
qualifiers.forEach(qualifier -> delete.addColumn(familyName.getBytes(), qualifier.getBytes()));
hbaseConnection.getTable(TableName.valueOf(tableName)).delete(delete);
return true;
}
}
getTable:
获取 Table 实现用以访问表数据。
Table 非 thread-safe 的并且其创建很轻量,所以线程内使用需要单独创建(不需要且不应该缓存和池化)。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章