# hbase_learning
**Repository Path**: sun81911/hbase_learning
## Basic Information
- **Project Name**: hbase_learning
- **Description**: HBase知识点学习
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 2
- **Created**: 2020-06-12
- **Last Updated**: 2020-12-19
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## HBase 简介
### HBase 定义
HBase 是一种分布式、可扩展、支持海量数据存储的 NoSQL 数据库
### HBase 数据模型
逻辑上,HBase 的数据模型同关系型数据库很类似,数据存储在一张表中,有行有列,但从 HBase 的底层物理存储结构来看,HBase 更像是一个 Multi-Dimensional Map
#### HBase 逻辑结构

#### HBase 物理存储结构

#### 数据模型
**Name Space**
命名空间,类似于关系型数据库的 DatabBase 概念,每个命名空间下有多个表
HBase有两个自带的命名空间,分别是 `hbase` 和 `default`
hbase 中存放的是 HBase 内置的表,default 表是用户默认使用的命名空间
**Region**
类似于关系型数据库的表概念
不同的是,HBase 定义表时只需要声明「列族」即可,不需要声明具体的列
这意味着,往 HBase 写入数据时,字段可以动态、按需指定。因此和关系型数据库相比,HBase 能够轻松应对字段变更的场景
**Row**
HBase 表中的每行数据都由一个 `RowKey` 和多个 `Column` 组成,数据是按照 RowKey 的「字典顺序」存储的,并且查询数据时**只能根据 RowKey 进行检索**
**Column**
HBase 中的每个列都由 `Column Family` 「列族」和 `Column Qualifier` 「列限定符」进行限定
例如 `info: name`,`info: age`,建表时只需指明「列族」,而列限定符无需预先定义
**Time Stamp**
用于标识数据的不同版本 version,每条数据写入时,如果不指定时间戳,系统会自动为其加上该字段,其值为写入 HBase 的时间
**Cell**
由 `{rowkey, column Family: column Qualifier, time Stamp}` 唯一确定的单元
cell 中的数据是没有类型的,全部是「字节码」形式存储
### HBase 基本架构
**图示分析**

**架构角色**
1. **Region Server**
Region Server 为 Region 的管理者,其实现类为 HRegionServer,主要作用为
1. 对于数据的操作 `get`, `put`, `delete`
2. 对于 Region 的操作 `splitRegion`, `compactRegion`
2. **Master**
Master 是所有 Region Server 的管理者,其实现类为 HMaster,主要作用为
1. 对于表的操作 `create`, `delete`, `alter`
2. 对于 RegionServer的操作,分配 regions 到每个RegionServer,监控每个 RegionServer 的状态,负载均衡和故障转移
3. **Zookeeper**
HBase 通过 Zookeeper 来做 Master 的高可用、RegionServer 的监控、元数据的入口以及集群配置的维护等工作
4. **HDFS**
HDFS 为 HBase 提供最终的底层数据存储服务,同时为 HBase 提供高可用的支持
## HBase 入门
### HBase 安装部署
#### Zookeeper 正常部署
首先保证 Zookeeper 集群的正常部署,并启动之
```shell
[root@hadoop198 zookeeper-3.4.10]# bin/zkServer.sh start
[root@hadoop197 zookeeper-3.4.10]# bin/zkServer.sh start
[root@hadoop196 zookeeper-3.4.10]# bin/zkServer.sh start
```
#### Hadoop 正常部署
Hadoop 集群的正常部署并启动
```shell
[root@hadoop198 hadoop-2.7.2]# sbin/start-dfs.sh
[root@hadoop196 hadoop-2.7.2]# sbin/start-yarn.sh
```
#### HBase 解压
解压 Hbase 到指定目录
```shell
tar -zxvf hbase-1.3.1-bin.tar.gz -C /opt/module
```
#### HBase 配置文件
修改 HBase 对应的配置文件
**hbase-env.sh**
```shell
# The java implementation to use. Java 1.7+ required.
export JAVA_HOME=/opt/module/jdk1.8.0_144
```
```shell
# Configure PermSize. Only needed in JDK7. You can safely remove it for JDK8+
#export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
#export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
```
```shell
# Tell HBase whether it should manage it's own instance of Zookeeper or not.
export HBASE_MANAGES_ZK=false
```
**hbase-site.xml**
```xml
hbase.rootdir
hdfs://hadoop198:9000/hbase
hbase.cluster.distributed
true
hbase.master.port
16000
hbase.zookeeper.quorum
hadoop198,hadoop197,hadoop196
hbase.zookeeper.property.dataDir
/opt/module/zookeeper-3.4.10/zkData
```
**regionservers**
```
hadoop198
hadoop197
hadoop196
```
**软连接 Hadoop 配置文件到 HBase**
```shell
ln -s /opt/module/hadoop-2.7.2/etc/hadoop/core-site.xml /opt/module/hbase-1.3.1/conf/core-site.xml
```
```shell
ln -s /opt/module/hadoop-2.7.2/etc/hadoop/hdfs-site.xml /opt/module/hbase-1.3.1/conf/hdfs-site.xml
```
#### HBase 分发
```shell
xsync hbase-1.3.1/
```
#### HBase 服务启动
**单节点启动**
```shell
bin/hbase-daemon.sh start master
```
```shell
bin/hbase-daemon.sh start regionserver
```
**注意**:如果集群之间的节点时间不同步,会导致 regionserver 无法启动,抛出 `ClockOutOfSyncException` 异常
可以设置集群时间同步服务,或者将 `hbase.master.maxclockskew` 设置更大的值
```xml
hbase.master.maxclockskew
180000
Time difference of regionserver from master
```
**集群启动**
```shell
bin/start-hbase.sh
```
停止服务
```shell
bin/stop-hbase.sh
```
#### 查看 HBase 页面
启动成功后,可以通过 `host:port` 的方式来访问 HBase 管理页面,例如:http://hadoop198:16010
### HBase Shell 操作
#### 基本操作
进入 HBase 客户端命令行
```shell
bin/hbase shell
```
查看帮助命令
```shell
hbase(main):001:0> help
```
查看当前数据库中有哪些表
```shell
hbase(main):002:0> list
```
#### 表的操作
##### DDL
**创建表**
```shell
hbase(main):003:0> create 'student','info'
```
`student`为表名,`info`为列族,创建表时至少包含一个列族
**查看表结构**
```shell
hbase(main):002:0> describe 'student'
```
```
Table student is ENABLED
student
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRE
SSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
```
**变更表信息**
将 info 列族中的数据存放 3 个版本
```shell
hbase(main):008:0> alter 'student',{NAME=>'info',VERSIONS=>3}
```
**删除表**
首先需要先让该表为 `disable` 状态
```shell
hbase(main):003:0> disable 'student'
```
然后才能 `drop` 这个表
```shell
hbase(main):004:0> drop 'student'
```
**查看命名空间**
```shell
hbase(main):010:0> list_namespace
```
```
NAMESPACE
default
hbase
```
**创建命名空间**
```shell
hbase(main):011:0> create_namespace 'bigdata'
```
**创建不同命名空间的数据表**
```shell
hbase(main):014:0> create 'bigdata:student','info'
```
**删除命名空间**
删除命名空间之前,必须保证命名空间内无数据表
```shell
hbase(main):016:0> disable 'bigdata:student'
```
```shell
hbase(main):017:0> drop 'bigdata:student'
```
```shell
hbase(main):018:0> drop_namespace 'bigdata'
```
##### DML
**插入数据到表**
基本语法
```shell
put 'ns1:t1', 'r1', 'c1', 'value'
```
- `ns1`:命名空间
- `t1`:表名
- `r1`:Row Key
- `c1`:列
- `value`:值
```shell
hbase(main):023:0> put 'student','1001','info:sex','male'
```
```shell
hbase(main):025:0> put 'student','1002','info:sex','female'
```
```shell
hbase(main):028:0> put 'student','1002','info:name','Janna'
```
**扫描查看表数据**
```shell
scan 'student'
```
```
ROW COLUMN+CELL
1001 column=info:sex, timestamp=1591093425060, value=male
1002 column=info:name, timestamp=1591094207725, value=Janna
1002 column=info:sex, timestamp=1591093579256, value=female
```
```shell
hbase(main):027:0> scan 'student',{STARTROW => '1001', STOPROW => '1002'}
```
```
ROW COLUMN+CELL
1001 column=info:sex, timestamp=1591093425060, value=male
```
**注意**:`STARTROW` 与 `STOPROW` 之间的关系是「左闭右开」
`ROWKEY` 按照字节序进行排序,例如 `1001` < `10010` < `1002`
查看指定`ROWKEY`或指定`COLUMNFAMILY:COLUMN`的数据
```shell
hbase(main):031:0> get 'student','1002'
```
```
COLUMN CELL
info:name timestamp=1591094207725, value=Janna
info:sex timestamp=1591093579256, value=female
```
```shell
hbase(main):033:0> get 'student','1002','info:name'
```
```
COLUMN CELL
info:name timestamp=1591094207725, value=Janna
```
**更新指定字段的数据**
```shell
hbase(main):049:0> put 'student','1002','info:name','Nick'
```
**扫描全量数据**
```shell
hbase(main):056:0> scan 'student', {RAW => true, VERSIONS => 10}
```
```
ROW COLUMN+CELL
1001 column=info:sex, timestamp=1591093425060, value=male
1002 column=info:name, timestamp=1591095520023, value=Nick
1002 column=info:name, timestamp=1591094207725, value=Janna
1002 column=info:sex, timestamp=1591093579256, value=female
```
**删除数据**
删除某 RowKey 的某一列数据
```shell
hbase(main):058:0> delete 'student','1002','info:sex'
```
删除某 RowKey 的全部数据
```shell
hbase(main):065:0> deleteall 'student','1001'
```
**清空表数据**
```shell
hbase(main):070:0> truncate 'student'
```
**细节**:清空表的操作顺序为先 `disable`,然后再 `truncate`
**刷写数据到磁盘**
```shell
hbase(main):071:0> flush 'student'
```
**查看hfile文件内容**
```shell
bin/hbase hfile -v -p -m -f hdfs://hadoop198:9000/hbase/data/default/student/6bc535099538d8cf4c996d6de41a557f/info/ee8a9e5ccaa7447ea3af8291231726a5
```
## HBase 原理
### 架构原理
**图示分析**

**部分组件分析**
1. StoreFile
保存实际数据的物理文件,StoreFile 以 HFile 的形式存储在 HDFS 上。每个 Store 会有一个或多个 StoreFile(HFile),数据在每个 StoreFile 中都是有序的
2. MemStore
写缓存,由于 HFile 中的数据要求是有序的,所以数据是先存储在 MemStore 中,排好序后,等到达刷写时机才会刷写到 HFile,每次刷写都会形成一个新的 HFile
3. WAL (HLog)
由于数据要经 MemStore 排序后才能刷写到 HFile,但把数据保存在内存中会有很高的概率导致数据丢失,为了解决这个问题,数据会先写在一个叫做 Write-Ahead logfile 的文件中,然后再写入 MemStore 中。所以在系统出现故障的时候,数据可以通过这个日志文件重建
### 写流程
**图示分析**

**流程分析**
1. Client 先访问 zookeeper,获取 `hbase:meta` 表位于哪个 Region Server
2. 访问对应的 Region Server,获取 `hbase:meta` 表,根据读请求的 `namespace:table/rowkey`,查询出目标数据位于哪个 Region Server 中的哪个 Region 中,并将该 table 的 region 信息以及 meta 表的位置信息缓存在客户端的 `meta cache`,方便下次访问
3. 与目标 Region Server 进行通讯
4. 将数据顺序写入(追加)到 WAL 中
5. 将数据写入对应的 MemStore,数据会在 MemStore 进行排序
6. 向客户端发送 ack
7. 等达到 MemStore 的刷写时机后,将数据刷写到 HFile 中
### MemStore Flush
**图示分析**

**刷写时机**
1. 当某个 memstroe 的大小达到了 `hbase.hregion.memstore.flush.size`(默认值 128M),其所在 region 的所有 memstore 都会刷写
当 memstore 的大小达到了
```
hbase.hregion.memstore.flush.size × hbase.hregion.memstore.block.multiplier(默认值4)
```
会阻止继续往该 memstore 写数据
2. 当 region server 中 memstore 的总大小达到
```
java_heapsize × hbase.regionserver.global.memstore.size(默认值0.4) × hbase.regionserver.global.memstore.size.lower.limit(默认值0.95)
```
region 会按照其所有 memstore 的大小顺序(由大到小)依次进行刷写,直到 region server 中所有 memstore 的总大小减小到上述值以下
当 region server 中 memstore 的总大小达到
```
java_heapsize × hbase.regionserver.global.memstore.size(默认值0.4)
```
会阻止继续往所有的 memstore 写数据
3. 到达自动刷写的时间,也会触发 memstore flush,自动刷新的时间间隔由该属性进行配置 `hbase.regionserver.optionalcacheflushinterval(默认1小时)`
4. 当 WAL 文件的数量超过 `hbase.regionserver.max.logs`,region 会按照时间顺序依次进行刷写,直到 WAL 文件数量减小到 `hbase.regionserver.max.log` 以下(该属性名已经废弃,现无需手动设置,最大值为 32)
### 读流程
**图示分析**

**流程分析**
1. Client 先访问 zookeeper,获取 `hbase:meta` 表位于哪个 Region Server
2. 访问对应的 Region Server,获取 hbase:meta 表,根据读请求的 `namespace:table/rowkey`,查询出目标数据位于哪个 Region Server 中的哪个 Region 中,并将该 table 的 region 信息以及 meta 表的位置信息缓存在客户端的 `meta cache`,方便下次访问
3. 与目标 Region Server 进行通讯
4. 分别在 Block Cache(读缓存),MemStore 和 Store File(HFile)中查询目标数据,并将查到的所有数据进行合并
此处所有数据是指同一条数据的不同版本(time stamp)或者不同的类型(Put/Delete)
5. 将从文件中查询到的数据块(Block,HFile 数据存储单元,默认大小为 64KB)缓存到Block Cache
6. 将合并后的最终结果返回给客户端
### StoreFile Compaction
**图示分析**

**概念分析**
由于memstore每次刷写都会生成一个新的 HFile,且同一个字段的不同版本(timestamp)和不同类型(put/delete)有可能会分布在不同的 HFile 中,因此查询时需要遍历所有的 HFile
为了减少 HFile 的个数,以及清理掉过期和删除的数据,会进行 StoreFile Compaction
Compaction 分为两种,分别是 Minor Compaction 和 Major Compaction
1. Minor Compaction会将临近的若干个较小的 HFile 合并成一个较大的 HFile,但不会清理过期和删除的数据
2. Major Compaction 会将一个 Store 下的所有的 HFile 合并成一个大 HFile,并且会清理掉过期和删除的数据
### Region Split
**图示分析**

**概念分析**
默认情况下,每个 Table 起初只有一个 Region,随着数据的不断写入,Region 会自动进行拆分。刚拆分时,两个子 Region 都位于当前的 Region Server,但处于负载均衡的考虑,HMaster 有可能会将某个 Region 转移给其他的 Region Server
Region Split 时机是当一个 region 中的某个 Store 下所有 StoreFile 的总大小超过 `min(r^2 * hbase.hregion.memstore.flush.size, hbase.hregion.max.filesize)`
该 Region 就会进行拆分,其中 `r` 为当前 Region Server 中属于该 Table 的个数
## HBase API
### 环境准备
**相关依赖**
```xml
junit
junit
4.12
org.apache.hbase
hbase-server
1.3.1
org.apache.hbase
hbase-client
1.3.1
```
### API 测试
#### 初始化配置信息
```java
Connection conn;
// 初始化配置信息
@Before
public void init() throws IOException {
// 1 使用HBaseConfiguration的单例方法实例化,获取配置文件信息
// 过时
// HBaseConfiguration conf = new HBaseConfiguration();
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop198,hadoop197,hadoop196");
conf.set("hbase.zookeeper.property.clientPort", "2181");
// 创建连接
conn = ConnectionFactory.createConnection(conf);
}
```
#### 关闭资源
```java
// 关闭连接
@After
public void destroy() {
if (conn != null) {
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
```
#### DDL
##### 判断表是否存在
```java
// 2 获取管理员对象
// 过时
// HBaseAdmin admin = new HBaseAdmin(conf);
Admin admin = conn.getAdmin();
// 3 判断表是否存在
// 过时
// boolean res = admin.tableExists("student");
boolean res = admin.tableExists(TableName.valueOf("student"));
System.out.println(res);
admin.close();
```
##### 创建表
```java
// 获取admin对象
Admin admin = conn.getAdmin();
// 1 创建表描述器
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf("stu2"));
// 2 创建列族描述器
HColumnDescriptor family = new HColumnDescriptor("info");
// 3 添加列族信息
descriptor.addFamily(family);
// 4 创建表
admin.createTable(descriptor);
// 关闭资源
admin.close();
```
##### 删除表
```java
// 获取admin对象
Admin admin = conn.getAdmin();
// 1 下线表
admin.disableTable(TableName.valueOf("stu2"));
// 2 删除表
admin.deleteTable(TableName.valueOf("stu2"));
// 关闭资源
admin.close();
```
##### 创建命名空间
```java
Admin admin = null;
try {
// 获取admin对象
admin = conn.getAdmin();
// 1 传入命名空间名,创建命名空间描述器
NamespaceDescriptor nd = NamespaceDescriptor.create("bigdata").build();
// 2 创建命名空间
admin.createNamespace(nd);
} catch (NamespaceExistException e) {
// 3 抓取命名空间存在异常,以判断命名空间是否存在
System.out.println("命名空间已存在");
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭资源
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
```
#### DML
##### 插入/修改数据
```java
// 1 获取表对象
Table table = conn.getTable(TableName.valueOf("student"));
// 2 传入rowkey,创建put对象
Put put = new Put(Bytes.toBytes("1002"));
// 3 给put对象赋值
// byte[] family: 列族, byte[] qualifier: 列名, byte[] value: 值
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("rogre2"));
// 批量添加
// 方法一:相同rowkey,直接put多个column
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("gender"), Bytes.toBytes("male"));
// 方法二:不同rowkey,实例化多个put对象构建list
Put put2 = new Put(Bytes.toBytes("1003"));
put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("roger3"));
// 构建list集合
List list = new ArrayList();
list.add(put);
list.add(put2);
// 4 插入数据
// 单条插入
// table.put(put);
// 批量插入
table.put(list);
// 5 关闭资源
table.close();
```
##### 获取数据 get
```java
// 1 获取表对象
Table table = conn.getTable(TableName.valueOf("student"));
// 2 指定rowkey,创建get对象
Get get = new Get(Bytes.toBytes("1002"));
// 2.1 指定获取的列族
get.addFamily(Bytes.toBytes("info"));
// 2.2 指定获取的列名
get.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"));
// 2.3 设置获取数据的版本数
get.setMaxVersions();
// 3 获取数据
Result result = table.get(get);
// 4 解析result
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
// 获取列族数据
byte[] b1 = CellUtil.cloneFamily(cell);
String family = Bytes.toString(b1);
// 获取列名数据
byte[] b2 = CellUtil.cloneQualifier(cell);
String qualifier = Bytes.toString(b2);
// 获取值数据
byte[] b3 = CellUtil.cloneValue(cell);
String value = Bytes.toString(b3);
// 打印数据
System.out.println("===> family: " + family + ", qualifier: " + qualifier + ", value: " + value);
}
// 5 关闭连接
table.close();
```
##### 获取数据 scan
```java
// 1 获取表对象
Table table = conn.getTable(TableName.valueOf("student"));
// 2 创建scan对象
// 2.1 全表扫描
// Scan scan = new Scan();
// 2.2 指定过滤条件
Scan scan = new Scan(Bytes.toBytes("1001"), Bytes.toBytes("1003"));
// 3 获取scanner,扫描表
ResultScanner scanner = table.getScanner(scan);
// 4 解析扫描结果
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
// 获取列族数据
byte[] b1 = CellUtil.cloneFamily(cell);
String family = Bytes.toString(b1);
// 获取列名数据
byte[] b2 = CellUtil.cloneQualifier(cell);
String qualifier = Bytes.toString(b2);
// 获取值数据
byte[] b3 = CellUtil.cloneValue(cell);
String value = Bytes.toString(b3);
// 获取rowkey数据
byte[] b4 = CellUtil.cloneRow(cell);
String rowKey = Bytes.toString(b4);
// 打印数据
System.out.println("===> rowKey: " + rowKey + ", family: " + family + ", qualifier: " + qualifier + ", value: " + value);
}
}
// 5 关闭资源
table.close();
```
##### 删除数据
```java
// 1 获取表对象
Table table = conn.getTable(TableName.valueOf("student"));
// 2 指定删除的rowkey,构建delete对象, type=DeleteFamily
Delete delete = new Delete(Bytes.toBytes("1001"));
// 2.1 指定删除列所有版本数据, type=DeleteColumn
delete.addColumns(Bytes.toBytes("info"), Bytes.toBytes("name"));
// 2.2 删除指定列单个版本,由于flush可能导致前后两次执行得到结果不一致,尽量避免, type=Delete
// delete.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
// 指定删除的时间戳
delete.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), 1591337821300L);
// 2.3 删除指定的列族
delete.addFamily(Bytes.toBytes("info"));
// 2.4 批量删除
Delete d2 = new Delete(Bytes.toBytes("1002"));
// 构造delete集合
List list = new ArrayList();
list.add(delete);
list.add(d2);
// 3 数据删除
table.delete(delete);
// 3.1 批量数据删除
table.delete(list);
// 4 关闭资源
table.close();
```
### HBase - MapReduce
通过 HBase 的相关 JavaAPI,可以实现伴随 HBase 操作的 MapReduce 过程
比如使用MapReduce 将数据从本地文件系统导入到 HBase 的表中;从 HBase 中读取一些原始数据后使用 MapReduce 做数据分析
#### 官方 HBase-MapReduce
**查看 HBase 的 MapReduce 任务的执行**
```shell
bin/hbase mapredcp
```
**环境变量的导入**
配置 `/etc/profile`
```shell
export HBASE_HOME=/opt/module/hbase-1.3.1
export HADOOP_HOME=/opt/module/hadoop-2.7.2
```
使配置生效
```shell
source /etc/profile
```
配置 `hadoop-env.sh`
```shell
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase-1.3.1/lib/*
```
**注意**:需要在
```shell
# Extra Java CLASSPATH elements. Automatically insert capacity-scheduler.
for f in $HADOOP_HOME/contrib/capacity-scheduler/*.jar; do
if [ "$HADOOP_CLASSPATH" ]; then
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f
else
export HADOOP_CLASSPATH=$f
fi
done
```
之后
**同步配置**
```shell
xsync /etc/profile # 同步系统配置
xsync hadoop-env.sh # 同步hadoop环境配置
```
**启动 Hadoop 集群**
```shell
start-dfs.sh
start-yarn.sh
```
**启动 Hbase 服务**
```shell
start-hbase.sh
```
**运行官方 MapReduce 任务**
**统计 student 表中有多少行数据**
```shell
yarn jar lib/hbase-server-1.3.1.jar rowcounter student
```
**使用 MapReduce 将本地数据导入到 HBase**
在本地创建一个 tsv 格式的文件 `fruit.tsv`
```
1001 Apple Red
1002 Pear Yellow
1003 Pineapple Yellow
```
在 HDFS 中创建 input_fruit 文件夹并上传 fruit.tsv 文件
```shell
hdfs dfs -mkdir /input_fruit/
```
```shell
hdfs dfs -put fruit.tsv /input_fruit/
```
创建 Hbase 表
```shell
hbase(main):003:0> create 'fruit','info'
```
执行 MapReduce 到 HBase 的 fruit 表中
```shell
/opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit hdfs://hadoop198:9000/input_fruit
```
#### 自定义 HBase-MapReduce
##### 示例一
**实现将 HDFS 中的数据写入到 Hbase 表中**
FruitMapper
```java
public class FruitMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
```
FruitReducer
```java
// 继承TableReducer类
public class FruitReducer extends TableReducer {
@Override
protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException {
// 1 遍历values
for (Text value : values) {
// 示例:1001 Apple Red
// 2 获取每行数据
String line = value.toString();
String[] fields = line.split("\t");
// 3 构建put对象
String rowKey = fields[0];
Put put = new Put(Bytes.toBytes(rowKey));
// 4 赋值put对象
String name = fields[1];
String color = fields[2];
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
// 5 写出
context.write(NullWritable.get(), put);
}
}
}
```
FruitDriver
```java
// 实现Tool接口
public class FruitDriver implements Tool {
// 定义configuration
Configuration conf;
public int run(String[] args) throws Exception {
// 1 获取job对象
Job job = Job.getInstance(conf);
// 2 设置驱动类路径
job.setJarByClass(FruitDriver.class);
// 3 设置mapper和reducer
job.setMapperClass(FruitMapper.class);
// String table, Class extends TableReducer> reducer, Job job
// 表名,reducer类,job
TableMapReduceUtil.initTableReducerJob(args[1], FruitReducer.class, job);
// 4 设置mapper输出格式
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
// 5 设置输入参数
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 6 提交任务
boolean res = job.waitForCompletion(true);
return res ? 0 : 1;
}
public void setConf(Configuration configuration) {
this.conf = configuration;
}
public Configuration getConf() {
return conf;
}
public static void main(String[] args) {
try {
// 实例化configuration对象
Configuration conf = new Configuration();
// 运行run方法
int run = ToolRunner.run(conf, new FruitDriver(), args);
// 退出
System.exit(run);
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
打包在集群上运行
```shell
yarn jar hbase-importdata.jar com.roger.mr.FruitDriver /input_fruit/fruit.tsv fruit2
```
**注意**:运行任务前,如果待数据导入的表不存在,则需要提前创建
##### 示例二
**实现将 fruit 表中的一部分数据,通过 MR 迁入到 fruit3 表中**
Fruit2Mapper
```java
// 继承TableMapper类
public class Fruit2Mapper extends TableMapper {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
// 构建put对象,key本身就是rowkey
Put put = new Put(key.get());
// 1 获取数据
Cell[] cells = value.rawCells();
for (Cell cell : cells) {
// 2 判断当前列是否为name列
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
// 3 提取name列数据
put.add(cell);
}
}
// 4 写出
context.write(key, put);
}
}
```
Fruit2Reducer
```java
// 继承TableReducer类
public class Fruit2Reducer extends TableReducer {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {
// 遍历写出
for (Put put : values) {
context.write(NullWritable.get(), put);
}
}
}
```
Fruit2Driver
```java
public class Fruit2Driver implements Tool {
Configuration conf;
public int run(String[] args) throws Exception {
// 1 获取job对象
Job job = Job.getInstance(conf);
// 2 设置驱动类路径
job.setJarByClass(Fruit2Driver.class);
// 3 设置mapper和reducer
// 表名,扫描条件,mapper类,输出key类型,输出value类型,job
TableMapReduceUtil.initTableMapperJob("fruit",
new Scan(),
Fruit2Mapper.class,
ImmutableBytesWritable.class,
Put.class,
job);
// 表名,reducer类,job
TableMapReduceUtil.initTableReducerJob("fruit3",
Fruit2Reducer.class,
job);
// 4 提交任务
boolean res = job.waitForCompletion(true);
return res ? 0 : 1;
}
public void setConf(Configuration configuration) {
this.conf = configuration;
}
public Configuration getConf() {
return conf;
}
public static void main(String[] args) {
try {
// 实例化HBaseConfiguration对象
Configuration conf = HBaseConfiguration.create();
// 运行run方法
int run = ToolRunner.run(conf, new Fruit2Driver(), args);
// 退出
System.exit(run);
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
此时将 `/opt/module/hbase-1.3.1/conf/hbase-site.xml` 文件拷贝到 `resources` 目录下,在本地运行 `main` 方法操作 HBase
**注意**:运行任务前,如果待数据导入的表不存在,则需要提前创建
### HBase - Hive
#### HBase 与 Hive 对比分析
**Hive**
1. 数据仓库
Hive 的本质其实就相当于将 HDFS 中已经存储的文件在 Mysql 中做了一个双射关系,以方便使用 HQL 去管理查询
2. 用于数据分析、清洗
Hive 适用于离线的数据分析和清洗,延迟较高
3. 基于 HDFS、MapReduce
Hive 存储的数据依旧在 DataNode 上,编写的 HQL 语句终将是转换为 MapReduce 代码执行
**HBase**
1. 数据库
是一种面向「列族」存储的非关系型数据库
2. 用于存储结构化和非结构化的数据
适用于单表非关系型数据的存储,不适合做关联查询,类似 JOIN 等操作
3. 基于 HDFS
数据持久化存储的体现形式是 HFile,存放于 DataNode 中,被 ResionServer 以 region 的形式进行管理
4. 延迟较低,接入在线业务使用
面对大量的企业数据,HBase 可以单表大量数据的存储,同时提供了高效的数据访问速度
#### 集成使用
**环境准备**
因为后续可能会在操作 Hive 的同时对 HBase 也会产生影响,所以 Hive 需要持有操作HBase 的 Jar,故拷贝 Hive 所依赖的 Jar 包或者使用软连接
```shell
ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
```
同时在 hive-site.xml 中修改 zookeeper 的属性
```xml
hive.zookeeper.quorum
hadoop198,hadoop197,hadoop196
The list of ZooKeeper servers to talk to. This is only needed for read/write locks.
hive.zookeeper.client.port
2181
The port of ZooKeeper servers to talk to. This is only needed for read/write locks.
```
由于兼容性的问题,需要重新编译 `/opt/module/hive-1.2.1/lib/hive-hbase-handler-1.2.1.jar` 包替换
**案例一**
建立 Hive 表,关联 HBase 表,插入数据到 Hive 表的同时能够影响 HBase 表
在 Hive 中创建表同时关联 HBase
```sql
hive (default)> CREATE TABLE hive_hbase_emp_table(empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
```
完成之后分别进入 Hive 和 HBase 查看,都生成了对应的表
通过 insert 命令将中间表中的数据导入到 Hive 关联 Hbase 的那张表中
```sql
hive (default)> insert into table hive_hbase_emp_table select * from emp;
```
查看 Hive 以及关联的 HBase 表中是否已经成功的同步插入了数据
```sql
hive (default)> select * from hive_hbase_emp_table;
```
```shell
hbase(main):017:0> scan 'hbase_emp_table'
```
**案例二**
在 HBase 中已经存储了某一张表 hbase_emp_table,然后在 Hive 中创建一个外部表来关联 HBase 中的 hbase_emp_table 这张表,使之可以借助 Hive 来分析 HBase 这张表中的数据
在 Hive 中创建外部表
```sql
hive (default)> CREATE EXTERNAL TABLE relevance_hbase_emp(empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
```
使用 Hive 函数进行一些分析操作
```sql
hive (default)> select * from relevance_hbase_emp;
```
## HBase 优化
### 高可用
在 HBase 中 HMaster 负责监控 HRegionServer 的生命周期,均衡 RegionServer 的负载,如果 HMaster 挂掉了,那么整个 HBase 集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以 HBase 支持对 HMaster 的高可用配置
**关闭 HBase 集群**
```shell
stop-hbase.sh
```
**在 conf 目录下创建 backup-masters 文件**
```shell
vim conf/backup-masters
```
**在 backup-masters 文件中配置高可用 HMaster 节点**
```
hadoop197
hadoop196
```
**分发配置**
```shell
xsync backup-masters
```
**启动 hbase 服务**
```shell
start-hbase.sh
```
### 预分区
每一个 region 维护着 StartRow 与 EndRow,如果加入的数据符合某个 Region 维护的RowKey 范围,则该数据交给这个 Region 维护,依照这个原则,可以将数据所要投放的分区提前大致的规划好,以提高 HBase 性能
**手动设定预分区**
```shell
hbase(main):001:0> create 'staff1','info','partition1',SPLITS=>['1000','2000','3000','4000']
```
**生成16进制序列预分区**
```shell
hbase(main):001:0> create 'staff2','info','partition2',{NUMREGIONS=>15, SPLITALGO=>'HexStringSplit'}
```
**按照文件中设置的规则预分区**
创建 splits.txt 文件
```
aaaa
bbbb
cccc
dddd
```
执行
```shell
hbase(main):001:0> create 'staff3','partition3',SPLITS_FILE=>'splits.txt'
```
**使用 JavaAPI 创建预分区**
```java
// 自定义算法,产生一系列hash散列值存储在二维数组中
byte[][] splitKeys = 某个散列值函数
// 获取connection对象
Connection conn = ConnectionFactory.createConnection(HBaseConfiguration.create());
// 获取admin对象
Admin admin = conn.getAdmin();
// 创建HTableDescriptor实例
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
// 通过HTableDescriptor实例和散列值二维数组创建带有预分区的Hbase表
admin.createTable(tableDesc, splitKeys);
```
### RowKey 设计
#### 常用方案
一条数据的唯一标识就是 RowKey,这条数据存储于哪个分区,取决于 RowKey 处于哪个一个预分区的区间内,设计 RowKey 的主要目的 ,就是让数据均匀的分布于所有的 region 中,在一定程度上防止数据倾斜
**生成随机数、hash、散列值**
```
原本rowKey为1001的,SHA1后变成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7
原本rowKey为3001的,SHA1后变成:49042c54de64a1e9bf0b33e00245660ef92dc7bd
原本rowKey为5001的,SHA1后变成:7b61dec07e02c188790670af43e717f0f46e8913
```
在做此操作之前,一般会选择从数据集中抽取样本,来决定什么样的 rowKey 来 hash 后作为每个分区的临界值
**字符串反转**
```
20170524000001转成10000042507102
20170524000002转成20000042507102
```
**字符串拼接**
```
20170524000001_a12e
20170524000001_93i7
```
#### 情景设计
假设需要分300个预分区,此时分区键可设计为
```
000|
001|
...
025|
...
298|
```
299个分区键划分300个分区,RowKey设计为
```
000_
001_
...
025_
...
298_
```
由于`|`大于`_`,故`000_`开头的数据位于`-∞, 000|`区间,即第1个分区,之后同理
### 基础优化
**允许在 HDFS 的文件中追加内容**
`hdfs-site.xml`, `hbase-site.xml`
```
dfs.support.append
```
开启 HDFS 追加同步,可以配合 HBase 的数据同步和持久化,默认值为 true
**优化 DataNode 允许的最大文件打开数**
`hdfs-site.xml`
```
dfs.datanode.max.transfer.threads
```
HBase 一般都会同一时间操作大量的文件,根据集群的数量和规模以及数据动作,设置为 4096 或者更高,默认值为4096
**优化延迟高的数据操作的等待时间**
`hdfs-site.xml`
```
dfs.image.transfer.timeout
```
如果对于某一次数据操作来讲,延迟非常高,socket 需要等待更长的时间,可以把该值设置为更大的值,默认为 60000 毫秒,以确保 socket 不会被 timeout 掉
**优化数据的写入效率**
`mapred-site.xml`
```
mapreduce.map.output.compress
mapreduce.map.output.compress.codec
```
开启这压缩和解压缩功能可以大大提高文件的写入效率,减少写入时间
第一个属性值修改为 `true`
第二个属性值修改为 `org.apache.hadoop.io.compress.GzipCodec` 或者其他压缩方式
**设置 RPC 监听数量**
`hbase-site.xml`
```
hbase.regionserver.handler.count
```
默认值为 30,用于指定 RPC 监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值
**优化 HStore 文件大小**
`hbase-site.xml`
```
hbase.hregion.max.filesize
```
如果 HFile 的大小达到这个数值,则这个 region 会被切分为两个 Hfile
默认值 10737418240 (10GB),如果需要运行 HBase 的 MR 任务可以减小此值,因为一个 region 对应一个 map 任务,如果单个 region 过大,会导致 map 任务执行时间过长
**优化 HBase 客户端缓存**
`hbase-site.xml`
```
hbase.client.write.buffer
```
用于指定 HBase 客户端缓存,增大该值可以减少 RPC 调用次数,但是会消耗更多内存
一般需要设定一定的缓存大小,以达到减少 RPC 次数的目的
**指定 scan.next 扫描 HBase 所获取的行数**
`hbase-site.xml`
```
hbase.client.scanner.caching
```
用于指定 scan.next 方法获取的默认行数,值越大消耗内存越大
## 谷粒微博
### 需求分析
微博内容的浏览,数据库表设计
关注用户,取关用户
拉取关注的人的微博内容
### 代码实现
#### 代码设计总览
1. 创建命名空间和数据表
2. 创建微博内容表
3. 创建用户关系表
4. 创建用户微博内容接收邮件表
5. 发布微博内容
6. 添加关注用户
7. 移除(取关)用户
8. 获取用户初始化数据
9. 获取用户所有微博内容
10. 测试
#### 表设计总览

#### 创建命名空间和数据表
**HBaseUtils**
```java
// 工具类用以创建命名空间和表
public class HBaseUtils {
// 创建命名空间
public static void createNamespace(String namespace) throws IOException {
// 获取connection对象
Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION);
// 获取admin对象
Admin admin = conn.getAdmin();
// 构建命名空间描述器
NamespaceDescriptor descriptor = NamespaceDescriptor.create(namespace).build();
// 创建命名空间
admin.createNamespace(descriptor);
// 关闭资源
admin.close();
conn.close();
}
// 判断表是否存在
public static boolean isTableExist(String tableName) throws IOException {
// 获取connection对象
Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION);
// 获取admin对象
Admin admin = conn.getAdmin();
// 判断是否存在
boolean result = admin.tableExists(TableName.valueOf(tableName));
// 关闭资源
admin.close();
conn.close();
// 返回结果
return result;
}
// 创建表
public static void createTable(String tableName, int versions, String... columnFamily) throws IOException {
// 判断是否传入了列族信息
if (columnFamily.length == 0) {
System.out.println("没有传入列族信息");
return;
}
// 判断表是否存在
if (isTableExist(tableName)) {
System.out.println(tableName + "表已存在");
return;
}
// 获取connection对象
Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION);
// 获取admin对象
Admin admin = conn.getAdmin();
// 实例化表描述器
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
// 表描述器添加信息
for (String cf : columnFamily) {
// 实例化列族描述器
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
// 设置版本
hColumnDescriptor.setMaxVersions(versions);
// 添加列族信息
descriptor.addFamily(hColumnDescriptor);
}
// 创建表
admin.createTable(descriptor);
// 关闭资源
admin.close();
conn.close();
}
}
```
**Constants**
**命名空间**
```java
// 命名空间
public static final String NAMESPACE = "weibo";
```
**微博内容表结构**
```java
// 微博内容表
public static final String TABLE_CONTENT = NAMESPACE + ":content";
public static final String TABLE_CONTENT_CF = "info";
public static final String TABLE_CONTENT_CN = "content";
public static final int TABLE_CONTENT_VERSIONS = 1;
```
**用户关系表结构**
```java
// 用户关系表
public static final String TABLE_RELATION = NAMESPACE + ":relation";
public static final String TABLE_RELATION_CF1 = "attends";
public static final String TABLE_RELATION_CF2 = "fans";
public static final int TABLE_RELATION_VERSIONS = 1;
```
**收件箱表结构**
```java
// 收件箱表
public static final String TABLE_INBOX = NAMESPACE + ":inbox";
public static final String TABLE_INBOX_CF = "info";
public static final int TABLE_INBOX_VERSIONS = 2;
```
**创建命名空间和数据表**
```java
// 创建命名空间
HBaseUtils.createNamespace(Constants.NAMESPACE);
// 创建微博内容表
HBaseUtils.createTable(Constants.TABLE_CONTENT,
Constants.TABLE_CONTENT_VERSIONS,
Constants.TABLE_CONTENT_CF);
// 创建用户关系表
HBaseUtils.createTable(Constants.TABLE_RELATION,
Constants.TABLE_RELATION_VERSIONS,
Constants.TABLE_RELATION_CF1,
Constants.TABLE_RELATION_CF2);
// 创建收件箱表
HBaseUtils.createTable(Constants.TABLE_INBOX,
Constants.TABLE_INBOX_VERSIONS,
Constants.TABLE_INBOX_CF);
```
#### 发布微博内容
**思路步骤**
1. 微博内容表中添加数据
2. 微博收件箱表对所有粉丝用户添加数据
**代码实现**
```java
// 发布微博
public void publishWeibo(String uid, String content) throws IOException {
// 获取connection对象
Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION);
// 第一部分:操作微博内容表
// 获取微博内容表对象
Table tableContent = conn.getTable(TableName.valueOf(Constants.TABLE_CONTENT));
// 获取当前时间戳
long ts = System.currentTimeMillis();
// 构建RowKey
String rowKey = uid + "_" + ts;
// 创建put对象
Put putContent = new Put(Bytes.toBytes(rowKey));
// 赋值put对象
putContent.addColumn(Bytes.toBytes(Constants.TABLE_CONTENT_CF),
Bytes.toBytes(Constants.TABLE_CONTENT_CN),
Bytes.toBytes(content));
// 执行插入数据
tableContent.put(putContent);
// 第二部分:操作微博收件箱表
// 获取用户关系表
Table tableRelation = conn.getTable(TableName.valueOf(Constants.TABLE_RELATION));
// 获取当前发布微博人的fans列族数据
Get getRelation = new Get(Bytes.toBytes(uid));
// 指定列族fans
getRelation.addFamily(Bytes.toBytes(Constants.TABLE_RELATION_CF2));
// 获取指定列族数据
Result result = tableRelation.get(getRelation);
// 创建集合存放微博收件箱表的put对象
List putsInbox = new ArrayList();
// 遍历数据信息
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
// 构建收件箱表的put对象
Put putInbox = new Put(CellUtil.cloneQualifier(cell));
// 赋值收件箱表的put对象
putInbox.addColumn(Bytes.toBytes(Constants.TABLE_INBOX_CF),
Bytes.toBytes(uid),
Bytes.toBytes(rowKey));
// 将put对象存入集合
putsInbox.add(putInbox);
}
// 判断是否有粉丝
if (putsInbox.size() > 0) {
// 获取收件箱表对象
Table tableInbox = conn.getTable(TableName.valueOf(Constants.TABLE_INBOX));
// 执行收件箱表数据插入
tableInbox.put(putsInbox);
// 关闭收件箱表对象
tableInbox.close();
}
// 关闭资源
tableRelation.close();
tableContent.close();
conn.close();
}
```
**测试发布微博**
```java
@Test
public void publishWeibo() throws IOException {
// 发布微博
dao.publishWeibo("1001", "1001的第1条微博");
dao.publishWeibo("1001", "1001的第2条微博");
dao.publishWeibo("1003", "1003的第1条微博");
dao.publishWeibo("1003", "1003的第2条微博");
dao.publishWeibo("1003", "1003的第3条微博");
}
```
#### 添加关注用户
**思路步骤**
1. 在微博用户关系表中,对当前主动操作的用户添加新关注的用户
2. 在微博用户关系表中,对被关注的用户添加新的粉丝
3. 微博收件箱表中添加所关注的用户发布的微博
**代码实现**
```java
// 添加关注
public void addAttend(String uid, String... attends) throws IOException {
// 判断是否传入待关注用户
if (attends.length == 0) {
System.out.println("无待关注人");
return;
}
// 获取connection对象
Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION);
// 第一部分:操作用户关系表
// 获取用户关系表对象
Table tableRelation = conn.getTable(TableName.valueOf(Constants.TABLE_RELATION));
// 创建集合存放用户关系表put对象
List putsRelation = new ArrayList();
// 创建操作者的put对象
Put put = new Put(Bytes.toBytes(uid));
// 循环操作关注者与被关注者的put对象
for (String attend : attends) {
// 赋值关注者put对象
put.addColumn(Bytes.toBytes(Constants.TABLE_RELATION_CF1),
Bytes.toBytes(attend),
Bytes.toBytes(attend));
// 创建被关注者put对象
Put putAttend = new Put(Bytes.toBytes(attend));
// 赋值被关注者put对象
putAttend.addColumn(Bytes.toBytes(Constants.TABLE_RELATION_CF2),
Bytes.toBytes(uid),
Bytes.toBytes(uid));
// 将被关注者put对象存入集合
putsRelation.add(putAttend);
}
// 将关注者put对象存入集合
putsRelation.add(put);
// 执行关系表数据插入
tableRelation.put(putsRelation);
// 第二部分:操作收件箱表
// 获取内容表对象
Table tableContent = conn.getTable(TableName.valueOf(Constants.TABLE_CONTENT));
// 创建收件箱表put对象
Put putInbox = new Put(Bytes.toBytes(uid));
// 定义时间戳,控制时间间隔
long ts = System.currentTimeMillis();
// 循环attends获取每个被关注者的微博
for (String attend : attends) {
// 获取当前被关注者发布的微博
Scan scan = new Scan(Bytes.toBytes(attend + "_"), Bytes.toBytes(attend + "|"));
// 得到扫描结果
ResultScanner resultScanner = tableContent.getScanner(scan);
// 遍历Result
for (Result result : resultScanner) {
// 赋值收件箱表的put对象
putInbox.addColumn(Bytes.toBytes(Constants.TABLE_INBOX_CF),
Bytes.toBytes(attend),
ts++,
result.getRow());
}
}
// 判断put对象内容是否为空,即判断关注者是否发过微博
if (!putInbox.isEmpty()) {
// 获取收件箱表对象
Table tableInbox = conn.getTable(TableName.valueOf(Constants.TABLE_INBOX));
// 插入数据
tableInbox.put(putInbox);
// 关闭收件箱表
tableInbox.close();
}
// 关闭资源
tableContent.close();
tableRelation.close();
conn.close();
}
```
**测试添加关注用户**
```java
@Test
public void addAttend() throws IOException {
// 1002关注1001和1003
dao.addAttend("1002", "1001", "1003");
}
```
#### 获取用户初始化数据
**思路步骤**
1. 从微博收件箱中获取所关注的用户的微博 RowKey
2. 根据获取的 RowKey,得到微博内容
**代码实现**
```java
// 获取初始化数据
public void getInit(String uid) throws IOException {
// 获取connection对象
Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION);
// 获取收件箱表对象
Table tInbox = conn.getTable(TableName.valueOf(Constants.TABLE_INBOX));
// 获取内容表对象
Table tContent = conn.getTable(TableName.valueOf(Constants.TABLE_CONTENT));
// 创建收件箱表get对象
Get gInbox = new Get(Bytes.toBytes(uid));
// 设置最大版本
gInbox.setMaxVersions();
// 获取数据
Result result = tInbox.get(gInbox);
// 遍历数据
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
// 构建内容表的get对象
Get gContent = new Get(CellUtil.cloneValue(cell));
// 获取该get对象的内容
Result rContent = tContent.get(gContent);
// 解析内容并打印
Cell[] cs = rContent.rawCells();
for (Cell c : cs) {
// 打印信息
System.out.println("===> RowKey: " + Bytes.toString(CellUtil.cloneRow(c)) + ", " +
"ColumnFamily: " + Bytes.toString(CellUtil.cloneFamily(c)) + ", " +
"ColumnName: " + Bytes.toString(CellUtil.cloneQualifier(c)) + ", " +
"Value: " + Bytes.toString(CellUtil.cloneValue(c)));
}
}
// 关闭资源
tContent.close();
tInbox.close();
conn.close();
}
```
**测试获取用户初始化数据**
```java
@Test
public void getInit() throws IOException {
// 获取1002初始化收件箱数据
dao.getInit("1002");
}
```
#### 取消关注用户
**思路步骤**
1. 在微博用户关系表中,对当前主动操作的用户移除取关的用户
2. 在微博用户关系表中,对被取关的用户移除粉丝
3. 微博收件箱中删除取关的用户发布的微博
**代码实现**
```java
// 取消关注
public void removeAttend(String uid, String... attends) throws IOException {
// 判断是否传入待删除用户
if (attends.length == 0) {
System.out.println("无待删除人");
return;
}
// 获取connection对象
Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION);
// 第一部分:操作用户关系表
// 获取用户关系表对象
Table tableRelation = conn.getTable(TableName.valueOf(Constants.TABLE_RELATION));
// 创建集合存放用户关系表的delete对象
List deletesRelation = new ArrayList();
// 创建操作者的delete对象
Delete delete = new Delete(Bytes.toBytes(uid));
// 循环创建被删除者的delete对象
for (String attend : attends) {
// 赋值操作者的delete对象
delete.addColumns(Bytes.toBytes(Constants.TABLE_RELATION_CF1), Bytes.toBytes(attend));
// 创建被取关者的delete对象
Delete deleteAttend = new Delete(Bytes.toBytes(attend));
// 赋值被删除者delete对象
deleteAttend.addColumns(Bytes.toBytes(Constants.TABLE_RELATION_CF2), Bytes.toBytes(uid));
// 将被取关者delete对象添加进集合
deletesRelation.add(deleteAttend);
}
// 将操作者delete对象添加进集合
deletesRelation.add(delete);
// 执行删除操作
tableRelation.delete(deletesRelation);
// 第二部分:操作收件箱表
Table tableInbox = conn.getTable(TableName.valueOf(Constants.TABLE_INBOX));
// 创建操作者的delete对象
Delete deleteUid = new Delete(Bytes.toBytes(uid));
// 循环赋值delete对象
for (String attend : attends) {
deleteUid.addColumns(Bytes.toBytes(Constants.TABLE_INBOX_CF), Bytes.toBytes(attend));
}
// 执行收件箱表的删除操作
tableInbox.delete(deleteUid);
// 关闭资源
tableInbox.close();
tableRelation.close();
conn.close();
}
```
**测试取消关注用户**
```java
@Test
public void removeAttend() throws IOException {
// 1002取关1003
dao.removeAttend("1002", "1003");
}
```
#### 获取用户所有微博内容
**思路步骤**
1. 构建过滤器怼内容表进行数据过滤,获取所需信息
2. 解析打印所得数据
**代码实现**
```java
// 获取某人的所有微博数据
public void getWeibo(String uid) throws IOException {
// 获取connection对象
Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION);
// 获取微博内容表对象
Table tContent = conn.getTable(TableName.valueOf(Constants.TABLE_CONTENT));
// 构建scan对象
Scan scan = new Scan();
// 构建比较器
SubstringComparator comparator = new SubstringComparator(uid + "_");
// 构建过滤器
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, comparator);
// 设置过滤器
scan.setFilter(filter);
// 获取数据
ResultScanner scanner = tContent.getScanner(scan);
// 解析打印
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
// 打印信息
System.out.println("===> RowKey: " + Bytes.toString(CellUtil.cloneRow(cell)) + ", " +
"ColumnFamily: " + Bytes.toString(CellUtil.cloneFamily(cell)) + ", " +
"ColumnName: " + Bytes.toString(CellUtil.cloneQualifier(cell)) + ", " +
"Value: " + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
// 关闭资源
tContent.close();
conn.close();
}
```
**测试获取用户所有微博内容**
```java
@Test
public void getWeibo() throws IOException {
// 获取1003所有微博
dao.getWeibo("1003");
}
```