# hbase_learning **Repository Path**: sun81911/hbase_learning ## Basic Information - **Project Name**: hbase_learning - **Description**: HBase知识点学习 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2020-06-12 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## HBase 简介 ### HBase 定义 HBase 是一种分布式、可扩展、支持海量数据存储的 NoSQL 数据库 ### HBase 数据模型 逻辑上,HBase 的数据模型同关系型数据库很类似,数据存储在一张表中,有行有列,但从 HBase 的底层物理存储结构来看,HBase 更像是一个 Multi-Dimensional Map #### HBase 逻辑结构 ![](imgs/HBase逻辑结构.png) #### HBase 物理存储结构 ![](imgs/HBase物理存储结构.png) #### 数据模型 **Name Space** 命名空间,类似于关系型数据库的 DatabBase 概念,每个命名空间下有多个表 HBase有两个自带的命名空间,分别是 `hbase` 和 `default` hbase 中存放的是 HBase 内置的表,default 表是用户默认使用的命名空间 **Region** 类似于关系型数据库的表概念 不同的是,HBase 定义表时只需要声明「列族」即可,不需要声明具体的列 这意味着,往 HBase 写入数据时,字段可以动态、按需指定。因此和关系型数据库相比,HBase 能够轻松应对字段变更的场景 **Row** HBase 表中的每行数据都由一个 `RowKey` 和多个 `Column` 组成,数据是按照 RowKey 的「字典顺序」存储的,并且查询数据时**只能根据 RowKey 进行检索** **Column** HBase 中的每个列都由 `Column Family` 「列族」和 `Column Qualifier` 「列限定符」进行限定 例如 `info: name`,`info: age`,建表时只需指明「列族」,而列限定符无需预先定义 **Time Stamp** 用于标识数据的不同版本 version,每条数据写入时,如果不指定时间戳,系统会自动为其加上该字段,其值为写入 HBase 的时间 **Cell** 由 `{rowkey, column Family: column Qualifier, time Stamp}` 唯一确定的单元 cell 中的数据是没有类型的,全部是「字节码」形式存储 ### HBase 基本架构 **图示分析** ![](imgs/hbase基本架构.png) **架构角色** 1. **Region Server** Region Server 为 Region 的管理者,其实现类为 HRegionServer,主要作用为 1. 对于数据的操作 `get`, `put`, `delete` 2. 对于 Region 的操作 `splitRegion`, `compactRegion` 2. **Master** Master 是所有 Region Server 的管理者,其实现类为 HMaster,主要作用为 1. 对于表的操作 `create`, `delete`, `alter` 2. 对于 RegionServer的操作,分配 regions 到每个RegionServer,监控每个 RegionServer 的状态,负载均衡和故障转移 3. **Zookeeper** HBase 通过 Zookeeper 来做 Master 的高可用、RegionServer 的监控、元数据的入口以及集群配置的维护等工作 4. **HDFS** HDFS 为 HBase 提供最终的底层数据存储服务,同时为 HBase 提供高可用的支持 ## HBase 入门 ### HBase 安装部署 #### Zookeeper 正常部署 首先保证 Zookeeper 集群的正常部署,并启动之 ```shell [root@hadoop198 zookeeper-3.4.10]# bin/zkServer.sh start [root@hadoop197 zookeeper-3.4.10]# bin/zkServer.sh start [root@hadoop196 zookeeper-3.4.10]# bin/zkServer.sh start ``` #### Hadoop 正常部署 Hadoop 集群的正常部署并启动 ```shell [root@hadoop198 hadoop-2.7.2]# sbin/start-dfs.sh [root@hadoop196 hadoop-2.7.2]# sbin/start-yarn.sh ``` #### HBase 解压 解压 Hbase 到指定目录 ```shell tar -zxvf hbase-1.3.1-bin.tar.gz -C /opt/module ``` #### HBase 配置文件 修改 HBase 对应的配置文件 **hbase-env.sh** ```shell # The java implementation to use. Java 1.7+ required. export JAVA_HOME=/opt/module/jdk1.8.0_144 ``` ```shell # Configure PermSize. Only needed in JDK7. You can safely remove it for JDK8+ #export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m" #export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m" ``` ```shell # Tell HBase whether it should manage it's own instance of Zookeeper or not. export HBASE_MANAGES_ZK=false ``` **hbase-site.xml** ```xml hbase.rootdir hdfs://hadoop198:9000/hbase hbase.cluster.distributed true hbase.master.port 16000 hbase.zookeeper.quorum hadoop198,hadoop197,hadoop196 hbase.zookeeper.property.dataDir /opt/module/zookeeper-3.4.10/zkData ``` **regionservers** ``` hadoop198 hadoop197 hadoop196 ``` **软连接 Hadoop 配置文件到 HBase** ```shell ln -s /opt/module/hadoop-2.7.2/etc/hadoop/core-site.xml /opt/module/hbase-1.3.1/conf/core-site.xml ``` ```shell ln -s /opt/module/hadoop-2.7.2/etc/hadoop/hdfs-site.xml /opt/module/hbase-1.3.1/conf/hdfs-site.xml ``` #### HBase 分发 ```shell xsync hbase-1.3.1/ ``` #### HBase 服务启动 **单节点启动** ```shell bin/hbase-daemon.sh start master ``` ```shell bin/hbase-daemon.sh start regionserver ``` **注意**:如果集群之间的节点时间不同步,会导致 regionserver 无法启动,抛出 `ClockOutOfSyncException` 异常 可以设置集群时间同步服务,或者将 `hbase.master.maxclockskew` 设置更大的值 ```xml hbase.master.maxclockskew 180000 Time difference of regionserver from master ``` **集群启动** ```shell bin/start-hbase.sh ``` 停止服务 ```shell bin/stop-hbase.sh ``` #### 查看 HBase 页面 启动成功后,可以通过 `host:port` 的方式来访问 HBase 管理页面,例如:http://hadoop198:16010 ### HBase Shell 操作 #### 基本操作 进入 HBase 客户端命令行 ```shell bin/hbase shell ``` 查看帮助命令 ```shell hbase(main):001:0> help ``` 查看当前数据库中有哪些表 ```shell hbase(main):002:0> list ``` #### 表的操作 ##### DDL **创建表** ```shell hbase(main):003:0> create 'student','info' ``` `student`为表名,`info`为列族,创建表时至少包含一个列族 **查看表结构** ```shell hbase(main):002:0> describe 'student' ``` ``` Table student is ENABLED student COLUMN FAMILIES DESCRIPTION {NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRE SSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} ``` **变更表信息** 将 info 列族中的数据存放 3 个版本 ```shell hbase(main):008:0> alter 'student',{NAME=>'info',VERSIONS=>3} ``` **删除表** 首先需要先让该表为 `disable` 状态 ```shell hbase(main):003:0> disable 'student' ``` 然后才能 `drop` 这个表 ```shell hbase(main):004:0> drop 'student' ``` **查看命名空间** ```shell hbase(main):010:0> list_namespace ``` ``` NAMESPACE default hbase ``` **创建命名空间** ```shell hbase(main):011:0> create_namespace 'bigdata' ``` **创建不同命名空间的数据表** ```shell hbase(main):014:0> create 'bigdata:student','info' ``` **删除命名空间** 删除命名空间之前,必须保证命名空间内无数据表 ```shell hbase(main):016:0> disable 'bigdata:student' ``` ```shell hbase(main):017:0> drop 'bigdata:student' ``` ```shell hbase(main):018:0> drop_namespace 'bigdata' ``` ##### DML **插入数据到表** 基本语法 ```shell put 'ns1:t1', 'r1', 'c1', 'value' ``` - `ns1`:命名空间 - `t1`:表名 - `r1`:Row Key - `c1`:列 - `value`:值 ```shell hbase(main):023:0> put 'student','1001','info:sex','male' ``` ```shell hbase(main):025:0> put 'student','1002','info:sex','female' ``` ```shell hbase(main):028:0> put 'student','1002','info:name','Janna' ``` **扫描查看表数据** ```shell scan 'student' ``` ``` ROW COLUMN+CELL 1001 column=info:sex, timestamp=1591093425060, value=male 1002 column=info:name, timestamp=1591094207725, value=Janna 1002 column=info:sex, timestamp=1591093579256, value=female ``` ```shell hbase(main):027:0> scan 'student',{STARTROW => '1001', STOPROW => '1002'} ``` ``` ROW COLUMN+CELL 1001 column=info:sex, timestamp=1591093425060, value=male ``` **注意**:`STARTROW` 与 `STOPROW` 之间的关系是「左闭右开」 `ROWKEY` 按照字节序进行排序,例如 `1001` < `10010` < `1002` 查看指定`ROWKEY`或指定`COLUMNFAMILY:COLUMN`的数据 ```shell hbase(main):031:0> get 'student','1002' ``` ``` COLUMN CELL info:name timestamp=1591094207725, value=Janna info:sex timestamp=1591093579256, value=female ``` ```shell hbase(main):033:0> get 'student','1002','info:name' ``` ``` COLUMN CELL info:name timestamp=1591094207725, value=Janna ``` **更新指定字段的数据** ```shell hbase(main):049:0> put 'student','1002','info:name','Nick' ``` **扫描全量数据** ```shell hbase(main):056:0> scan 'student', {RAW => true, VERSIONS => 10} ``` ``` ROW COLUMN+CELL 1001 column=info:sex, timestamp=1591093425060, value=male 1002 column=info:name, timestamp=1591095520023, value=Nick 1002 column=info:name, timestamp=1591094207725, value=Janna 1002 column=info:sex, timestamp=1591093579256, value=female ``` **删除数据** 删除某 RowKey 的某一列数据 ```shell hbase(main):058:0> delete 'student','1002','info:sex' ``` 删除某 RowKey 的全部数据 ```shell hbase(main):065:0> deleteall 'student','1001' ``` **清空表数据** ```shell hbase(main):070:0> truncate 'student' ``` **细节**:清空表的操作顺序为先 `disable`,然后再 `truncate` **刷写数据到磁盘** ```shell hbase(main):071:0> flush 'student' ``` **查看hfile文件内容** ```shell bin/hbase hfile -v -p -m -f hdfs://hadoop198:9000/hbase/data/default/student/6bc535099538d8cf4c996d6de41a557f/info/ee8a9e5ccaa7447ea3af8291231726a5 ``` ## HBase 原理 ### 架构原理 **图示分析** ![](imgs/详细架构.png) **部分组件分析** 1. StoreFile 保存实际数据的物理文件,StoreFile 以 HFile 的形式存储在 HDFS 上。每个 Store 会有一个或多个 StoreFile(HFile),数据在每个 StoreFile 中都是有序的 2. MemStore 写缓存,由于 HFile 中的数据要求是有序的,所以数据是先存储在 MemStore 中,排好序后,等到达刷写时机才会刷写到 HFile,每次刷写都会形成一个新的 HFile 3. WAL (HLog) 由于数据要经 MemStore 排序后才能刷写到 HFile,但把数据保存在内存中会有很高的概率导致数据丢失,为了解决这个问题,数据会先写在一个叫做 Write-Ahead logfile 的文件中,然后再写入 MemStore 中。所以在系统出现故障的时候,数据可以通过这个日志文件重建 ### 写流程 **图示分析** ![](imgs/写流程.png) **流程分析** 1. Client 先访问 zookeeper,获取 `hbase:meta` 表位于哪个 Region Server 2. 访问对应的 Region Server,获取 `hbase:meta` 表,根据读请求的 `namespace:table/rowkey`,查询出目标数据位于哪个 Region Server 中的哪个 Region 中,并将该 table 的 region 信息以及 meta 表的位置信息缓存在客户端的 `meta cache`,方便下次访问 3. 与目标 Region Server 进行通讯 4. 将数据顺序写入(追加)到 WAL 中 5. 将数据写入对应的 MemStore,数据会在 MemStore 进行排序 6. 向客户端发送 ack 7. 等达到 MemStore 的刷写时机后,将数据刷写到 HFile 中 ### MemStore Flush **图示分析** ![](imgs/memstore_flush.png) **刷写时机** 1. 当某个 memstroe 的大小达到了 `hbase.hregion.memstore.flush.size`(默认值 128M),其所在 region 的所有 memstore 都会刷写 当 memstore 的大小达到了 ``` hbase.hregion.memstore.flush.size × hbase.hregion.memstore.block.multiplier(默认值4) ``` 会阻止继续往该 memstore 写数据 2. 当 region server 中 memstore 的总大小达到 ``` java_heapsize × hbase.regionserver.global.memstore.size(默认值0.4) × hbase.regionserver.global.memstore.size.lower.limit(默认值0.95) ``` region 会按照其所有 memstore 的大小顺序(由大到小)依次进行刷写,直到 region server 中所有 memstore 的总大小减小到上述值以下 当 region server 中 memstore 的总大小达到 ``` java_heapsize × hbase.regionserver.global.memstore.size(默认值0.4) ``` 会阻止继续往所有的 memstore 写数据 3. 到达自动刷写的时间,也会触发 memstore flush,自动刷新的时间间隔由该属性进行配置 `hbase.regionserver.optionalcacheflushinterval(默认1小时)` 4. 当 WAL 文件的数量超过 `hbase.regionserver.max.logs`,region 会按照时间顺序依次进行刷写,直到 WAL 文件数量减小到 `hbase.regionserver.max.log` 以下(该属性名已经废弃,现无需手动设置,最大值为 32) ### 读流程 **图示分析** ![](imgs/读流程.png) **流程分析** 1. Client 先访问 zookeeper,获取 `hbase:meta` 表位于哪个 Region Server 2. 访问对应的 Region Server,获取 hbase:meta 表,根据读请求的 `namespace:table/rowkey`,查询出目标数据位于哪个 Region Server 中的哪个 Region 中,并将该 table 的 region 信息以及 meta 表的位置信息缓存在客户端的 `meta cache`,方便下次访问 3. 与目标 Region Server 进行通讯 4. 分别在 Block Cache(读缓存),MemStore 和 Store File(HFile)中查询目标数据,并将查到的所有数据进行合并 此处所有数据是指同一条数据的不同版本(time stamp)或者不同的类型(Put/Delete) 5. 将从文件中查询到的数据块(Block,HFile 数据存储单元,默认大小为 64KB)缓存到Block Cache 6. 将合并后的最终结果返回给客户端 ### StoreFile Compaction **图示分析** ![](imgs/storefile_compaction.png) **概念分析** 由于memstore每次刷写都会生成一个新的 HFile,且同一个字段的不同版本(timestamp)和不同类型(put/delete)有可能会分布在不同的 HFile 中,因此查询时需要遍历所有的 HFile 为了减少 HFile 的个数,以及清理掉过期和删除的数据,会进行 StoreFile Compaction Compaction 分为两种,分别是 Minor Compaction 和 Major Compaction 1. Minor Compaction会将临近的若干个较小的 HFile 合并成一个较大的 HFile,但不会清理过期和删除的数据 2. Major Compaction 会将一个 Store 下的所有的 HFile 合并成一个大 HFile,并且会清理掉过期和删除的数据 ### Region Split **图示分析** ![](imgs/region_split.png) **概念分析** 默认情况下,每个 Table 起初只有一个 Region,随着数据的不断写入,Region 会自动进行拆分。刚拆分时,两个子 Region 都位于当前的 Region Server,但处于负载均衡的考虑,HMaster 有可能会将某个 Region 转移给其他的 Region Server Region Split 时机是当一个 region 中的某个 Store 下所有 StoreFile 的总大小超过 `min(r^2 * hbase.hregion.memstore.flush.size, hbase.hregion.max.filesize)` 该 Region 就会进行拆分,其中 `r` 为当前 Region Server 中属于该 Table 的个数 ## HBase API ### 环境准备 **相关依赖** ```xml junit junit 4.12 org.apache.hbase hbase-server 1.3.1 org.apache.hbase hbase-client 1.3.1 ``` ### API 测试 #### 初始化配置信息 ```java Connection conn; // 初始化配置信息 @Before public void init() throws IOException { // 1 使用HBaseConfiguration的单例方法实例化,获取配置文件信息 // 过时 // HBaseConfiguration conf = new HBaseConfiguration(); Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop198,hadoop197,hadoop196"); conf.set("hbase.zookeeper.property.clientPort", "2181"); // 创建连接 conn = ConnectionFactory.createConnection(conf); } ``` #### 关闭资源 ```java // 关闭连接 @After public void destroy() { if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } ``` #### DDL ##### 判断表是否存在 ```java // 2 获取管理员对象 // 过时 // HBaseAdmin admin = new HBaseAdmin(conf); Admin admin = conn.getAdmin(); // 3 判断表是否存在 // 过时 // boolean res = admin.tableExists("student"); boolean res = admin.tableExists(TableName.valueOf("student")); System.out.println(res); admin.close(); ``` ##### 创建表 ```java // 获取admin对象 Admin admin = conn.getAdmin(); // 1 创建表描述器 HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf("stu2")); // 2 创建列族描述器 HColumnDescriptor family = new HColumnDescriptor("info"); // 3 添加列族信息 descriptor.addFamily(family); // 4 创建表 admin.createTable(descriptor); // 关闭资源 admin.close(); ``` ##### 删除表 ```java // 获取admin对象 Admin admin = conn.getAdmin(); // 1 下线表 admin.disableTable(TableName.valueOf("stu2")); // 2 删除表 admin.deleteTable(TableName.valueOf("stu2")); // 关闭资源 admin.close(); ``` ##### 创建命名空间 ```java Admin admin = null; try { // 获取admin对象 admin = conn.getAdmin(); // 1 传入命名空间名,创建命名空间描述器 NamespaceDescriptor nd = NamespaceDescriptor.create("bigdata").build(); // 2 创建命名空间 admin.createNamespace(nd); } catch (NamespaceExistException e) { // 3 抓取命名空间存在异常,以判断命名空间是否存在 System.out.println("命名空间已存在"); } catch (IOException e) { e.printStackTrace(); } finally { // 关闭资源 try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } ``` #### DML ##### 插入/修改数据 ```java // 1 获取表对象 Table table = conn.getTable(TableName.valueOf("student")); // 2 传入rowkey,创建put对象 Put put = new Put(Bytes.toBytes("1002")); // 3 给put对象赋值 // byte[] family: 列族, byte[] qualifier: 列名, byte[] value: 值 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("rogre2")); // 批量添加 // 方法一:相同rowkey,直接put多个column put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("gender"), Bytes.toBytes("male")); // 方法二:不同rowkey,实例化多个put对象构建list Put put2 = new Put(Bytes.toBytes("1003")); put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("roger3")); // 构建list集合 List list = new ArrayList(); list.add(put); list.add(put2); // 4 插入数据 // 单条插入 // table.put(put); // 批量插入 table.put(list); // 5 关闭资源 table.close(); ``` ##### 获取数据 get ```java // 1 获取表对象 Table table = conn.getTable(TableName.valueOf("student")); // 2 指定rowkey,创建get对象 Get get = new Get(Bytes.toBytes("1002")); // 2.1 指定获取的列族 get.addFamily(Bytes.toBytes("info")); // 2.2 指定获取的列名 get.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name")); // 2.3 设置获取数据的版本数 get.setMaxVersions(); // 3 获取数据 Result result = table.get(get); // 4 解析result Cell[] cells = result.rawCells(); for (Cell cell : cells) { // 获取列族数据 byte[] b1 = CellUtil.cloneFamily(cell); String family = Bytes.toString(b1); // 获取列名数据 byte[] b2 = CellUtil.cloneQualifier(cell); String qualifier = Bytes.toString(b2); // 获取值数据 byte[] b3 = CellUtil.cloneValue(cell); String value = Bytes.toString(b3); // 打印数据 System.out.println("===> family: " + family + ", qualifier: " + qualifier + ", value: " + value); } // 5 关闭连接 table.close(); ``` ##### 获取数据 scan ```java // 1 获取表对象 Table table = conn.getTable(TableName.valueOf("student")); // 2 创建scan对象 // 2.1 全表扫描 // Scan scan = new Scan(); // 2.2 指定过滤条件 Scan scan = new Scan(Bytes.toBytes("1001"), Bytes.toBytes("1003")); // 3 获取scanner,扫描表 ResultScanner scanner = table.getScanner(scan); // 4 解析扫描结果 for (Result result : scanner) { Cell[] cells = result.rawCells(); for (Cell cell : cells) { // 获取列族数据 byte[] b1 = CellUtil.cloneFamily(cell); String family = Bytes.toString(b1); // 获取列名数据 byte[] b2 = CellUtil.cloneQualifier(cell); String qualifier = Bytes.toString(b2); // 获取值数据 byte[] b3 = CellUtil.cloneValue(cell); String value = Bytes.toString(b3); // 获取rowkey数据 byte[] b4 = CellUtil.cloneRow(cell); String rowKey = Bytes.toString(b4); // 打印数据 System.out.println("===> rowKey: " + rowKey + ", family: " + family + ", qualifier: " + qualifier + ", value: " + value); } } // 5 关闭资源 table.close(); ``` ##### 删除数据 ```java // 1 获取表对象 Table table = conn.getTable(TableName.valueOf("student")); // 2 指定删除的rowkey,构建delete对象, type=DeleteFamily Delete delete = new Delete(Bytes.toBytes("1001")); // 2.1 指定删除列所有版本数据, type=DeleteColumn delete.addColumns(Bytes.toBytes("info"), Bytes.toBytes("name")); // 2.2 删除指定列单个版本,由于flush可能导致前后两次执行得到结果不一致,尽量避免, type=Delete // delete.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); // 指定删除的时间戳 delete.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), 1591337821300L); // 2.3 删除指定的列族 delete.addFamily(Bytes.toBytes("info")); // 2.4 批量删除 Delete d2 = new Delete(Bytes.toBytes("1002")); // 构造delete集合 List list = new ArrayList(); list.add(delete); list.add(d2); // 3 数据删除 table.delete(delete); // 3.1 批量数据删除 table.delete(list); // 4 关闭资源 table.close(); ``` ### HBase - MapReduce 通过 HBase 的相关 JavaAPI,可以实现伴随 HBase 操作的 MapReduce 过程 比如使用MapReduce 将数据从本地文件系统导入到 HBase 的表中;从 HBase 中读取一些原始数据后使用 MapReduce 做数据分析 #### 官方 HBase-MapReduce **查看 HBase 的 MapReduce 任务的执行** ```shell bin/hbase mapredcp ``` **环境变量的导入** 配置 `/etc/profile` ```shell export HBASE_HOME=/opt/module/hbase-1.3.1 export HADOOP_HOME=/opt/module/hadoop-2.7.2 ``` 使配置生效 ```shell source /etc/profile ``` 配置 `hadoop-env.sh` ```shell export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase-1.3.1/lib/* ``` **注意**:需要在 ```shell # Extra Java CLASSPATH elements. Automatically insert capacity-scheduler. for f in $HADOOP_HOME/contrib/capacity-scheduler/*.jar; do if [ "$HADOOP_CLASSPATH" ]; then export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f else export HADOOP_CLASSPATH=$f fi done ``` 之后 **同步配置** ```shell xsync /etc/profile # 同步系统配置 xsync hadoop-env.sh # 同步hadoop环境配置 ``` **启动 Hadoop 集群** ```shell start-dfs.sh start-yarn.sh ``` **启动 Hbase 服务** ```shell start-hbase.sh ``` **运行官方 MapReduce 任务** **统计 student 表中有多少行数据** ```shell yarn jar lib/hbase-server-1.3.1.jar rowcounter student ``` **使用 MapReduce 将本地数据导入到 HBase** 在本地创建一个 tsv 格式的文件 `fruit.tsv` ``` 1001 Apple Red 1002 Pear Yellow 1003 Pineapple Yellow ``` 在 HDFS 中创建 input_fruit 文件夹并上传 fruit.tsv 文件 ```shell hdfs dfs -mkdir /input_fruit/ ``` ```shell hdfs dfs -put fruit.tsv /input_fruit/ ``` 创建 Hbase 表 ```shell hbase(main):003:0> create 'fruit','info' ``` 执行 MapReduce 到 HBase 的 fruit 表中 ```shell /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit hdfs://hadoop198:9000/input_fruit ``` #### 自定义 HBase-MapReduce ##### 示例一 **实现将 HDFS 中的数据写入到 Hbase 表中** FruitMapper ```java public class FruitMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } } ``` FruitReducer ```java // 继承TableReducer类 public class FruitReducer extends TableReducer { @Override protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { // 1 遍历values for (Text value : values) { // 示例:1001 Apple Red // 2 获取每行数据 String line = value.toString(); String[] fields = line.split("\t"); // 3 构建put对象 String rowKey = fields[0]; Put put = new Put(Bytes.toBytes(rowKey)); // 4 赋值put对象 String name = fields[1]; String color = fields[2]; put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name)); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color)); // 5 写出 context.write(NullWritable.get(), put); } } } ``` FruitDriver ```java // 实现Tool接口 public class FruitDriver implements Tool { // 定义configuration Configuration conf; public int run(String[] args) throws Exception { // 1 获取job对象 Job job = Job.getInstance(conf); // 2 设置驱动类路径 job.setJarByClass(FruitDriver.class); // 3 设置mapper和reducer job.setMapperClass(FruitMapper.class); // String table, Class reducer, Job job // 表名,reducer类,job TableMapReduceUtil.initTableReducerJob(args[1], FruitReducer.class, job); // 4 设置mapper输出格式 job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // 5 设置输入参数 FileInputFormat.setInputPaths(job, new Path(args[0])); // 6 提交任务 boolean res = job.waitForCompletion(true); return res ? 0 : 1; } public void setConf(Configuration configuration) { this.conf = configuration; } public Configuration getConf() { return conf; } public static void main(String[] args) { try { // 实例化configuration对象 Configuration conf = new Configuration(); // 运行run方法 int run = ToolRunner.run(conf, new FruitDriver(), args); // 退出 System.exit(run); } catch (Exception e) { e.printStackTrace(); } } } ``` 打包在集群上运行 ```shell yarn jar hbase-importdata.jar com.roger.mr.FruitDriver /input_fruit/fruit.tsv fruit2 ``` **注意**:运行任务前,如果待数据导入的表不存在,则需要提前创建 ##### 示例二 **实现将 fruit 表中的一部分数据,通过 MR 迁入到 fruit3 表中** Fruit2Mapper ```java // 继承TableMapper类 public class Fruit2Mapper extends TableMapper { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // 构建put对象,key本身就是rowkey Put put = new Put(key.get()); // 1 获取数据 Cell[] cells = value.rawCells(); for (Cell cell : cells) { // 2 判断当前列是否为name列 if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { // 3 提取name列数据 put.add(cell); } } // 4 写出 context.write(key, put); } } ``` Fruit2Reducer ```java // 继承TableReducer类 public class Fruit2Reducer extends TableReducer { @Override protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { // 遍历写出 for (Put put : values) { context.write(NullWritable.get(), put); } } } ``` Fruit2Driver ```java public class Fruit2Driver implements Tool { Configuration conf; public int run(String[] args) throws Exception { // 1 获取job对象 Job job = Job.getInstance(conf); // 2 设置驱动类路径 job.setJarByClass(Fruit2Driver.class); // 3 设置mapper和reducer // 表名,扫描条件,mapper类,输出key类型,输出value类型,job TableMapReduceUtil.initTableMapperJob("fruit", new Scan(), Fruit2Mapper.class, ImmutableBytesWritable.class, Put.class, job); // 表名,reducer类,job TableMapReduceUtil.initTableReducerJob("fruit3", Fruit2Reducer.class, job); // 4 提交任务 boolean res = job.waitForCompletion(true); return res ? 0 : 1; } public void setConf(Configuration configuration) { this.conf = configuration; } public Configuration getConf() { return conf; } public static void main(String[] args) { try { // 实例化HBaseConfiguration对象 Configuration conf = HBaseConfiguration.create(); // 运行run方法 int run = ToolRunner.run(conf, new Fruit2Driver(), args); // 退出 System.exit(run); } catch (Exception e) { e.printStackTrace(); } } } ``` 此时将 `/opt/module/hbase-1.3.1/conf/hbase-site.xml` 文件拷贝到 `resources` 目录下,在本地运行 `main` 方法操作 HBase **注意**:运行任务前,如果待数据导入的表不存在,则需要提前创建 ### HBase - Hive #### HBase 与 Hive 对比分析 **Hive** 1. 数据仓库 Hive 的本质其实就相当于将 HDFS 中已经存储的文件在 Mysql 中做了一个双射关系,以方便使用 HQL 去管理查询 2. 用于数据分析、清洗 Hive 适用于离线的数据分析和清洗,延迟较高 3. 基于 HDFS、MapReduce Hive 存储的数据依旧在 DataNode 上,编写的 HQL 语句终将是转换为 MapReduce 代码执行 **HBase** 1. 数据库 是一种面向「列族」存储的非关系型数据库 2. 用于存储结构化和非结构化的数据 适用于单表非关系型数据的存储,不适合做关联查询,类似 JOIN 等操作 3. 基于 HDFS 数据持久化存储的体现形式是 HFile,存放于 DataNode 中,被 ResionServer 以 region 的形式进行管理 4. 延迟较低,接入在线业务使用 面对大量的企业数据,HBase 可以单表大量数据的存储,同时提供了高效的数据访问速度 #### 集成使用 **环境准备** 因为后续可能会在操作 Hive 的同时对 HBase 也会产生影响,所以 Hive 需要持有操作HBase 的 Jar,故拷贝 Hive 所依赖的 Jar 包或者使用软连接 ```shell ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jar ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar ``` 同时在 hive-site.xml 中修改 zookeeper 的属性 ```xml hive.zookeeper.quorum hadoop198,hadoop197,hadoop196 The list of ZooKeeper servers to talk to. This is only needed for read/write locks. hive.zookeeper.client.port 2181 The port of ZooKeeper servers to talk to. This is only needed for read/write locks. ``` 由于兼容性的问题,需要重新编译 `/opt/module/hive-1.2.1/lib/hive-hbase-handler-1.2.1.jar` 包替换 **案例一** 建立 Hive 表,关联 HBase 表,插入数据到 Hive 表的同时能够影响 HBase 表 在 Hive 中创建表同时关联 HBase ```sql hive (default)> CREATE TABLE hive_hbase_emp_table(empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table"); ``` 完成之后分别进入 Hive 和 HBase 查看,都生成了对应的表 通过 insert 命令将中间表中的数据导入到 Hive 关联 Hbase 的那张表中 ```sql hive (default)> insert into table hive_hbase_emp_table select * from emp; ``` 查看 Hive 以及关联的 HBase 表中是否已经成功的同步插入了数据 ```sql hive (default)> select * from hive_hbase_emp_table; ``` ```shell hbase(main):017:0> scan 'hbase_emp_table' ``` **案例二** 在 HBase 中已经存储了某一张表 hbase_emp_table,然后在 Hive 中创建一个外部表来关联 HBase 中的 hbase_emp_table 这张表,使之可以借助 Hive 来分析 HBase 这张表中的数据 在 Hive 中创建外部表 ```sql hive (default)> CREATE EXTERNAL TABLE relevance_hbase_emp(empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table"); ``` 使用 Hive 函数进行一些分析操作 ```sql hive (default)> select * from relevance_hbase_emp; ``` ## HBase 优化 ### 高可用 在 HBase 中 HMaster 负责监控 HRegionServer 的生命周期,均衡 RegionServer 的负载,如果 HMaster 挂掉了,那么整个 HBase 集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以 HBase 支持对 HMaster 的高可用配置 **关闭 HBase 集群** ```shell stop-hbase.sh ``` **在 conf 目录下创建 backup-masters 文件** ```shell vim conf/backup-masters ``` **在 backup-masters 文件中配置高可用 HMaster 节点** ``` hadoop197 hadoop196 ``` **分发配置** ```shell xsync backup-masters ``` **启动 hbase 服务** ```shell start-hbase.sh ``` ### 预分区 每一个 region 维护着 StartRow 与 EndRow,如果加入的数据符合某个 Region 维护的RowKey 范围,则该数据交给这个 Region 维护,依照这个原则,可以将数据所要投放的分区提前大致的规划好,以提高 HBase 性能 **手动设定预分区** ```shell hbase(main):001:0> create 'staff1','info','partition1',SPLITS=>['1000','2000','3000','4000'] ``` **生成16进制序列预分区** ```shell hbase(main):001:0> create 'staff2','info','partition2',{NUMREGIONS=>15, SPLITALGO=>'HexStringSplit'} ``` **按照文件中设置的规则预分区** 创建 splits.txt 文件 ``` aaaa bbbb cccc dddd ``` 执行 ```shell hbase(main):001:0> create 'staff3','partition3',SPLITS_FILE=>'splits.txt' ``` **使用 JavaAPI 创建预分区** ```java // 自定义算法,产生一系列hash散列值存储在二维数组中 byte[][] splitKeys = 某个散列值函数 // 获取connection对象 Connection conn = ConnectionFactory.createConnection(HBaseConfiguration.create()); // 获取admin对象 Admin admin = conn.getAdmin(); // 创建HTableDescriptor实例 HTableDescriptor tableDesc = new HTableDescriptor(tableName); // 通过HTableDescriptor实例和散列值二维数组创建带有预分区的Hbase表 admin.createTable(tableDesc, splitKeys); ``` ### RowKey 设计 #### 常用方案 一条数据的唯一标识就是 RowKey,这条数据存储于哪个分区,取决于 RowKey 处于哪个一个预分区的区间内,设计 RowKey 的主要目的 ,就是让数据均匀的分布于所有的 region 中,在一定程度上防止数据倾斜 **生成随机数、hash、散列值** ``` 原本rowKey为1001的,SHA1后变成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7 原本rowKey为3001的,SHA1后变成:49042c54de64a1e9bf0b33e00245660ef92dc7bd 原本rowKey为5001的,SHA1后变成:7b61dec07e02c188790670af43e717f0f46e8913 ``` 在做此操作之前,一般会选择从数据集中抽取样本,来决定什么样的 rowKey 来 hash 后作为每个分区的临界值 **字符串反转** ``` 20170524000001转成10000042507102 20170524000002转成20000042507102 ``` **字符串拼接** ``` 20170524000001_a12e 20170524000001_93i7 ``` #### 情景设计 假设需要分300个预分区,此时分区键可设计为 ``` 000| 001| ... 025| ... 298| ``` 299个分区键划分300个分区,RowKey设计为 ``` 000_ 001_ ... 025_ ... 298_ ``` 由于`|`大于`_`,故`000_`开头的数据位于`-∞, 000|`区间,即第1个分区,之后同理 ### 基础优化 **允许在 HDFS 的文件中追加内容** `hdfs-site.xml`, `hbase-site.xml` ``` dfs.support.append ``` 开启 HDFS 追加同步,可以配合 HBase 的数据同步和持久化,默认值为 true **优化 DataNode 允许的最大文件打开数** `hdfs-site.xml` ``` dfs.datanode.max.transfer.threads ``` HBase 一般都会同一时间操作大量的文件,根据集群的数量和规模以及数据动作,设置为 4096 或者更高,默认值为4096 **优化延迟高的数据操作的等待时间** `hdfs-site.xml` ``` dfs.image.transfer.timeout ``` 如果对于某一次数据操作来讲,延迟非常高,socket 需要等待更长的时间,可以把该值设置为更大的值,默认为 60000 毫秒,以确保 socket 不会被 timeout 掉 **优化数据的写入效率** `mapred-site.xml` ``` mapreduce.map.output.compress mapreduce.map.output.compress.codec ``` 开启这压缩和解压缩功能可以大大提高文件的写入效率,减少写入时间 第一个属性值修改为 `true` 第二个属性值修改为 `org.apache.hadoop.io.compress.GzipCodec` 或者其他压缩方式 **设置 RPC 监听数量** `hbase-site.xml` ``` hbase.regionserver.handler.count ``` 默认值为 30,用于指定 RPC 监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值 **优化 HStore 文件大小** `hbase-site.xml` ``` hbase.hregion.max.filesize ``` 如果 HFile 的大小达到这个数值,则这个 region 会被切分为两个 Hfile 默认值 10737418240 (10GB),如果需要运行 HBase 的 MR 任务可以减小此值,因为一个 region 对应一个 map 任务,如果单个 region 过大,会导致 map 任务执行时间过长 **优化 HBase 客户端缓存** `hbase-site.xml` ``` hbase.client.write.buffer ``` 用于指定 HBase 客户端缓存,增大该值可以减少 RPC 调用次数,但是会消耗更多内存 一般需要设定一定的缓存大小,以达到减少 RPC 次数的目的 **指定 scan.next 扫描 HBase 所获取的行数** `hbase-site.xml` ``` hbase.client.scanner.caching ``` 用于指定 scan.next 方法获取的默认行数,值越大消耗内存越大 ## 谷粒微博 ### 需求分析 微博内容的浏览,数据库表设计 关注用户,取关用户 拉取关注的人的微博内容 ### 代码实现 #### 代码设计总览 1. 创建命名空间和数据表 2. 创建微博内容表 3. 创建用户关系表 4. 创建用户微博内容接收邮件表 5. 发布微博内容 6. 添加关注用户 7. 移除(取关)用户 8. 获取用户初始化数据 9. 获取用户所有微博内容 10. 测试 #### 表设计总览 ![](imgs/谷粒微博表设计.png) #### 创建命名空间和数据表 **HBaseUtils** ```java // 工具类用以创建命名空间和表 public class HBaseUtils { // 创建命名空间 public static void createNamespace(String namespace) throws IOException { // 获取connection对象 Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 获取admin对象 Admin admin = conn.getAdmin(); // 构建命名空间描述器 NamespaceDescriptor descriptor = NamespaceDescriptor.create(namespace).build(); // 创建命名空间 admin.createNamespace(descriptor); // 关闭资源 admin.close(); conn.close(); } // 判断表是否存在 public static boolean isTableExist(String tableName) throws IOException { // 获取connection对象 Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 获取admin对象 Admin admin = conn.getAdmin(); // 判断是否存在 boolean result = admin.tableExists(TableName.valueOf(tableName)); // 关闭资源 admin.close(); conn.close(); // 返回结果 return result; } // 创建表 public static void createTable(String tableName, int versions, String... columnFamily) throws IOException { // 判断是否传入了列族信息 if (columnFamily.length == 0) { System.out.println("没有传入列族信息"); return; } // 判断表是否存在 if (isTableExist(tableName)) { System.out.println(tableName + "表已存在"); return; } // 获取connection对象 Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 获取admin对象 Admin admin = conn.getAdmin(); // 实例化表描述器 HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName)); // 表描述器添加信息 for (String cf : columnFamily) { // 实例化列族描述器 HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf); // 设置版本 hColumnDescriptor.setMaxVersions(versions); // 添加列族信息 descriptor.addFamily(hColumnDescriptor); } // 创建表 admin.createTable(descriptor); // 关闭资源 admin.close(); conn.close(); } } ``` **Constants** **命名空间** ```java // 命名空间 public static final String NAMESPACE = "weibo"; ``` **微博内容表结构** ```java // 微博内容表 public static final String TABLE_CONTENT = NAMESPACE + ":content"; public static final String TABLE_CONTENT_CF = "info"; public static final String TABLE_CONTENT_CN = "content"; public static final int TABLE_CONTENT_VERSIONS = 1; ``` **用户关系表结构** ```java // 用户关系表 public static final String TABLE_RELATION = NAMESPACE + ":relation"; public static final String TABLE_RELATION_CF1 = "attends"; public static final String TABLE_RELATION_CF2 = "fans"; public static final int TABLE_RELATION_VERSIONS = 1; ``` **收件箱表结构** ```java // 收件箱表 public static final String TABLE_INBOX = NAMESPACE + ":inbox"; public static final String TABLE_INBOX_CF = "info"; public static final int TABLE_INBOX_VERSIONS = 2; ``` **创建命名空间和数据表** ```java // 创建命名空间 HBaseUtils.createNamespace(Constants.NAMESPACE); // 创建微博内容表 HBaseUtils.createTable(Constants.TABLE_CONTENT, Constants.TABLE_CONTENT_VERSIONS, Constants.TABLE_CONTENT_CF); // 创建用户关系表 HBaseUtils.createTable(Constants.TABLE_RELATION, Constants.TABLE_RELATION_VERSIONS, Constants.TABLE_RELATION_CF1, Constants.TABLE_RELATION_CF2); // 创建收件箱表 HBaseUtils.createTable(Constants.TABLE_INBOX, Constants.TABLE_INBOX_VERSIONS, Constants.TABLE_INBOX_CF); ``` #### 发布微博内容 **思路步骤** 1. 微博内容表中添加数据 2. 微博收件箱表对所有粉丝用户添加数据 **代码实现** ```java // 发布微博 public void publishWeibo(String uid, String content) throws IOException { // 获取connection对象 Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 第一部分:操作微博内容表 // 获取微博内容表对象 Table tableContent = conn.getTable(TableName.valueOf(Constants.TABLE_CONTENT)); // 获取当前时间戳 long ts = System.currentTimeMillis(); // 构建RowKey String rowKey = uid + "_" + ts; // 创建put对象 Put putContent = new Put(Bytes.toBytes(rowKey)); // 赋值put对象 putContent.addColumn(Bytes.toBytes(Constants.TABLE_CONTENT_CF), Bytes.toBytes(Constants.TABLE_CONTENT_CN), Bytes.toBytes(content)); // 执行插入数据 tableContent.put(putContent); // 第二部分:操作微博收件箱表 // 获取用户关系表 Table tableRelation = conn.getTable(TableName.valueOf(Constants.TABLE_RELATION)); // 获取当前发布微博人的fans列族数据 Get getRelation = new Get(Bytes.toBytes(uid)); // 指定列族fans getRelation.addFamily(Bytes.toBytes(Constants.TABLE_RELATION_CF2)); // 获取指定列族数据 Result result = tableRelation.get(getRelation); // 创建集合存放微博收件箱表的put对象 List putsInbox = new ArrayList(); // 遍历数据信息 Cell[] cells = result.rawCells(); for (Cell cell : cells) { // 构建收件箱表的put对象 Put putInbox = new Put(CellUtil.cloneQualifier(cell)); // 赋值收件箱表的put对象 putInbox.addColumn(Bytes.toBytes(Constants.TABLE_INBOX_CF), Bytes.toBytes(uid), Bytes.toBytes(rowKey)); // 将put对象存入集合 putsInbox.add(putInbox); } // 判断是否有粉丝 if (putsInbox.size() > 0) { // 获取收件箱表对象 Table tableInbox = conn.getTable(TableName.valueOf(Constants.TABLE_INBOX)); // 执行收件箱表数据插入 tableInbox.put(putsInbox); // 关闭收件箱表对象 tableInbox.close(); } // 关闭资源 tableRelation.close(); tableContent.close(); conn.close(); } ``` **测试发布微博** ```java @Test public void publishWeibo() throws IOException { // 发布微博 dao.publishWeibo("1001", "1001的第1条微博"); dao.publishWeibo("1001", "1001的第2条微博"); dao.publishWeibo("1003", "1003的第1条微博"); dao.publishWeibo("1003", "1003的第2条微博"); dao.publishWeibo("1003", "1003的第3条微博"); } ``` #### 添加关注用户 **思路步骤** 1. 在微博用户关系表中,对当前主动操作的用户添加新关注的用户 2. 在微博用户关系表中,对被关注的用户添加新的粉丝 3. 微博收件箱表中添加所关注的用户发布的微博 **代码实现** ```java // 添加关注 public void addAttend(String uid, String... attends) throws IOException { // 判断是否传入待关注用户 if (attends.length == 0) { System.out.println("无待关注人"); return; } // 获取connection对象 Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 第一部分:操作用户关系表 // 获取用户关系表对象 Table tableRelation = conn.getTable(TableName.valueOf(Constants.TABLE_RELATION)); // 创建集合存放用户关系表put对象 List putsRelation = new ArrayList(); // 创建操作者的put对象 Put put = new Put(Bytes.toBytes(uid)); // 循环操作关注者与被关注者的put对象 for (String attend : attends) { // 赋值关注者put对象 put.addColumn(Bytes.toBytes(Constants.TABLE_RELATION_CF1), Bytes.toBytes(attend), Bytes.toBytes(attend)); // 创建被关注者put对象 Put putAttend = new Put(Bytes.toBytes(attend)); // 赋值被关注者put对象 putAttend.addColumn(Bytes.toBytes(Constants.TABLE_RELATION_CF2), Bytes.toBytes(uid), Bytes.toBytes(uid)); // 将被关注者put对象存入集合 putsRelation.add(putAttend); } // 将关注者put对象存入集合 putsRelation.add(put); // 执行关系表数据插入 tableRelation.put(putsRelation); // 第二部分:操作收件箱表 // 获取内容表对象 Table tableContent = conn.getTable(TableName.valueOf(Constants.TABLE_CONTENT)); // 创建收件箱表put对象 Put putInbox = new Put(Bytes.toBytes(uid)); // 定义时间戳,控制时间间隔 long ts = System.currentTimeMillis(); // 循环attends获取每个被关注者的微博 for (String attend : attends) { // 获取当前被关注者发布的微博 Scan scan = new Scan(Bytes.toBytes(attend + "_"), Bytes.toBytes(attend + "|")); // 得到扫描结果 ResultScanner resultScanner = tableContent.getScanner(scan); // 遍历Result for (Result result : resultScanner) { // 赋值收件箱表的put对象 putInbox.addColumn(Bytes.toBytes(Constants.TABLE_INBOX_CF), Bytes.toBytes(attend), ts++, result.getRow()); } } // 判断put对象内容是否为空,即判断关注者是否发过微博 if (!putInbox.isEmpty()) { // 获取收件箱表对象 Table tableInbox = conn.getTable(TableName.valueOf(Constants.TABLE_INBOX)); // 插入数据 tableInbox.put(putInbox); // 关闭收件箱表 tableInbox.close(); } // 关闭资源 tableContent.close(); tableRelation.close(); conn.close(); } ``` **测试添加关注用户** ```java @Test public void addAttend() throws IOException { // 1002关注1001和1003 dao.addAttend("1002", "1001", "1003"); } ``` #### 获取用户初始化数据 **思路步骤** 1. 从微博收件箱中获取所关注的用户的微博 RowKey 2. 根据获取的 RowKey,得到微博内容 **代码实现** ```java // 获取初始化数据 public void getInit(String uid) throws IOException { // 获取connection对象 Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 获取收件箱表对象 Table tInbox = conn.getTable(TableName.valueOf(Constants.TABLE_INBOX)); // 获取内容表对象 Table tContent = conn.getTable(TableName.valueOf(Constants.TABLE_CONTENT)); // 创建收件箱表get对象 Get gInbox = new Get(Bytes.toBytes(uid)); // 设置最大版本 gInbox.setMaxVersions(); // 获取数据 Result result = tInbox.get(gInbox); // 遍历数据 Cell[] cells = result.rawCells(); for (Cell cell : cells) { // 构建内容表的get对象 Get gContent = new Get(CellUtil.cloneValue(cell)); // 获取该get对象的内容 Result rContent = tContent.get(gContent); // 解析内容并打印 Cell[] cs = rContent.rawCells(); for (Cell c : cs) { // 打印信息 System.out.println("===> RowKey: " + Bytes.toString(CellUtil.cloneRow(c)) + ", " + "ColumnFamily: " + Bytes.toString(CellUtil.cloneFamily(c)) + ", " + "ColumnName: " + Bytes.toString(CellUtil.cloneQualifier(c)) + ", " + "Value: " + Bytes.toString(CellUtil.cloneValue(c))); } } // 关闭资源 tContent.close(); tInbox.close(); conn.close(); } ``` **测试获取用户初始化数据** ```java @Test public void getInit() throws IOException { // 获取1002初始化收件箱数据 dao.getInit("1002"); } ``` #### 取消关注用户 **思路步骤** 1. 在微博用户关系表中,对当前主动操作的用户移除取关的用户 2. 在微博用户关系表中,对被取关的用户移除粉丝 3. 微博收件箱中删除取关的用户发布的微博 **代码实现** ```java // 取消关注 public void removeAttend(String uid, String... attends) throws IOException { // 判断是否传入待删除用户 if (attends.length == 0) { System.out.println("无待删除人"); return; } // 获取connection对象 Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 第一部分:操作用户关系表 // 获取用户关系表对象 Table tableRelation = conn.getTable(TableName.valueOf(Constants.TABLE_RELATION)); // 创建集合存放用户关系表的delete对象 List deletesRelation = new ArrayList(); // 创建操作者的delete对象 Delete delete = new Delete(Bytes.toBytes(uid)); // 循环创建被删除者的delete对象 for (String attend : attends) { // 赋值操作者的delete对象 delete.addColumns(Bytes.toBytes(Constants.TABLE_RELATION_CF1), Bytes.toBytes(attend)); // 创建被取关者的delete对象 Delete deleteAttend = new Delete(Bytes.toBytes(attend)); // 赋值被删除者delete对象 deleteAttend.addColumns(Bytes.toBytes(Constants.TABLE_RELATION_CF2), Bytes.toBytes(uid)); // 将被取关者delete对象添加进集合 deletesRelation.add(deleteAttend); } // 将操作者delete对象添加进集合 deletesRelation.add(delete); // 执行删除操作 tableRelation.delete(deletesRelation); // 第二部分:操作收件箱表 Table tableInbox = conn.getTable(TableName.valueOf(Constants.TABLE_INBOX)); // 创建操作者的delete对象 Delete deleteUid = new Delete(Bytes.toBytes(uid)); // 循环赋值delete对象 for (String attend : attends) { deleteUid.addColumns(Bytes.toBytes(Constants.TABLE_INBOX_CF), Bytes.toBytes(attend)); } // 执行收件箱表的删除操作 tableInbox.delete(deleteUid); // 关闭资源 tableInbox.close(); tableRelation.close(); conn.close(); } ``` **测试取消关注用户** ```java @Test public void removeAttend() throws IOException { // 1002取关1003 dao.removeAttend("1002", "1003"); } ``` #### 获取用户所有微博内容 **思路步骤** 1. 构建过滤器怼内容表进行数据过滤,获取所需信息 2. 解析打印所得数据 **代码实现** ```java // 获取某人的所有微博数据 public void getWeibo(String uid) throws IOException { // 获取connection对象 Connection conn = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 获取微博内容表对象 Table tContent = conn.getTable(TableName.valueOf(Constants.TABLE_CONTENT)); // 构建scan对象 Scan scan = new Scan(); // 构建比较器 SubstringComparator comparator = new SubstringComparator(uid + "_"); // 构建过滤器 RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, comparator); // 设置过滤器 scan.setFilter(filter); // 获取数据 ResultScanner scanner = tContent.getScanner(scan); // 解析打印 for (Result result : scanner) { Cell[] cells = result.rawCells(); for (Cell cell : cells) { // 打印信息 System.out.println("===> RowKey: " + Bytes.toString(CellUtil.cloneRow(cell)) + ", " + "ColumnFamily: " + Bytes.toString(CellUtil.cloneFamily(cell)) + ", " + "ColumnName: " + Bytes.toString(CellUtil.cloneQualifier(cell)) + ", " + "Value: " + Bytes.toString(CellUtil.cloneValue(cell))); } } // 关闭资源 tContent.close(); conn.close(); } ``` **测试获取用户所有微博内容** ```java @Test public void getWeibo() throws IOException { // 获取1003所有微博 dao.getWeibo("1003"); } ```