# carlos_shop_warehouse
**Repository Path**: Carlosg_admin/carlos_shop_warehouse
## Basic Information
- **Project Name**: carlos_shop_warehouse
- **Description**: 公司需要大屏用于展示订单数据与用户访问数据,这里采用使用 Flink 来搭建实时计算平台
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 14
- **Forks**: 4
- **Created**: 2021-05-18
- **Last Updated**: 2025-02-05
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# carlos_shop_ warehouse
#### 介绍
Flink 实时数仓
## 实现方案
### JAVA 方式实现
- 一些中小企业当中,由于数据量较小(比如核心总量小于20万条),可通过Java程序定时查询mysql实现
- 比较简单,但是粗暴实用
- 仅仅需要对mysql做一些优化即可,比较增加索引
### 通过flink方案实现
- 数据量特别大、无法直接通过mysql查询完成,有时候根本查询不动
- 要求实时性高,比如阿里巴巴双十一监控大屏,要求延迟不超过1秒
实时数仓项目架构

## Canal介绍
### 简介
* 基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
* 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更
* 从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务,基于日志增量订阅和消费的业务包括
* 数据库镜像
* 数据库实时备份
* 索引构建和实时维护(拆分异构索引、倒排索引等)
* 业务 cache 刷新
* 带业务逻辑的增量数据处理
* 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
* github地址:https://github.com/alibaba/canal
## 环境部署
### MySQL
- MySQL需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,/etc/my.cnf 中配置如下
```properties
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
```
- 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
```sql
CREATE USER root IDENTIFIED BY 'root';
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' ;
FLUSH PRIVILEGES;
```
### Canal安装

**注意**:本项目使用的版本 canal1.0.24
环境要求:
* 安装好 ZooKeeper
- 解压缩
```shell
mkdir /opt/module/canal
tar -zxvf canal.deployer-1.0.24.tar.gz -C /opt/module/canal/
```
- 解压完成后,进入 /opt/module/canal/ 目录,可以看到如下结构
```shell
drwxr-xr-x. 2 root root 4096 2月 1 14:07 bin
drwxr-xr-x. 4 root root 4096 2月 1 14:07 conf
drwxr-xr-x. 2 root root 4096 2月 1 14:07 lib
drwxrwxrwx. 2 root root 4096 4月 1 2017 logs
```
- canal server的conf下有几个配置文件
~~~
[root@hadoop102 canal]# tree conf/
conf/
├── canal.properties
├── example
│ └── instance.properties
├── logback.xml
└── spring
├── default-instance.xml
├── file-instance.xml
├── group-instance.xml
├── local-instance.xml
└── memory-instance.xml
~~~
- 修改instance 配置文件
vim conf/example/instance.properties
```properties
## mysql serverId,这里的slaveId不能和myql集群中已有的server_id一样
canal.instance.mysql.slaveId = 1234
# 按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=hadoop102:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = root
canal.instance.dbPassword = root
#################################################
```
- 启动
```
sh bin/startup.sh
```
## Canal客户端开发
```xml
com.alibaba.otter
canal.client
1.0.24
com.alibaba
fastjson
1.2.58
```
### 在canal_demo模块创建包结构
| 包名 | 说明 |
| ---------------------- | ------------ |
| com.carlos.canal_demo | 代码存放目录 |
### 开发步骤
1. 创建 Connector
2. 连接 Cannal 服务器,并订阅
3. 解析 Canal 消息,并打印
#### Canal 消息格式
### 转换为 JSON 数据
* 复制上述代码,将 binlog 日志封装在一个Map结构中,使用 fastjson 转换为 JSON 格式
```java
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [binlog里记录变更发生的时间戳,精确到秒]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组,变更前的数据字段]
afterColumns [Column类型的数组,变更后的数据字段]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的内容,注意为string文本]
```
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件 log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据,以此来达到数据一致。
>**mysql的binlog**
>
>它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间。主要用来备份和数据同步。
>
>binlog 有三种: STATEMENT、ROW、MIXED
>
>* STATEMENT 记录的是执行的sql语句
>* ROW 记录的是真实的行数据记录
>* MIXED 记录的是1+2,优先按照1的模式记录
> **名词解释**:
>
> *什么是中继日志*
>
> 从服务器I/O线程将主服务器的二进制日志读取过来记录到从服务器本地文件,然后从服务器SQL线程会读取 relay-log 日志的内容并应用到从服务器,从而使从服务器和主服务器的数据保持一致
### canal 工作原理

- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
### 架构

- server 代表一个 canal 运行实例,对应于一个 jvm
- instance 对应于一个数据队列 (1个 canal server 对应 1..n 个 instance )
- instance 下的子模块
- eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
- eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作
- eventStore: 数据存储
- metaManager: 增量订阅 & 消费信息管理器
EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则**获取初始指定位置**或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(*传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功* ),传送成功之后更新Log Position。流程图如下:

- EventSink起到一个类似channel的功能,可以对数据进行*过滤、分发/路由(1:n)、归并(n:1)和加工*。EventSink是连接EventParser和EventStore的桥梁。
- EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。
- MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:
- Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象]
- void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
- void ack(long batchId),顾名思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作
### 开发Canal客户端订阅binlog消息
#### 在canal_client模块创建包结构
| 包名 | 说明 |
| ----------------------------- | ----------------------------- |
| com.carlos.canal_client | 存放入口、Canal客户端核心实现 |
| com.carlos.canal_client.util | 存放工具类 |
| com.carlos.canal_client.kafka | 存放Kafka生产者实现 |
#### 添加配置文件
```properties
# canal配置
canal.server.ip=node1
canal.server.port=11111
canal.server.destination=example
canal.server.username=canal
canal.server.password=canal
canal.subscribe.filter=itcast_shop.*
# zookeeper配置
zookeeper.server.ip=hadoop102:2181,hadoop103:2181,hadoop104:2181
# kafka配置
kafka.bootstrap_servers_config=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka.batch_size_config=1024
kafka.acks=all
kafka.retries=0
kafka.client_id_config=carlos_shop_canal_click
kafka.key_serializer_class_config=org.apache.kafka.common.serialization.StringSerializer
kafka.value_serializer_class_config=com.carlos.canal.protobuf.ProtoBufSerializer
kafka.topic=ods_carlos_shop_mysql
```
#### 编写CanalClient客户端核心实现类
#### 编写Entrance入口
```java
/**
* 入口
*/
public class Entrance {
public static void main(String[] args) {
CanalClient canalClient = new CanalClient();
canalClient.start();
}
}
```
### 使用ProtoBuf序列化binlog消息
#### 在 carlos_shop_common 模块创建包结构
| 包名 | 说明 |
| ------------------------ | -------------------------- |
| com.carlos.canal.bean | 存放通用的JavaBean |
| com.carlos.canal.protobuf | 实现Protobuf相关接口、实现 |
### 生产ProtoBuf消息到Kafka中
#### 实现KafkaSender
该类用于生成数据到Kafka
```java
/**
* Kafka生产者
*/
public class KafkaSender {
private Properties kafkaProps = new Properties();
private KafkaProducer kafkaProducer;
public KafkaSender() {
kafkaProps.put("bootstrap.servers", ConfigUtil.kafkaBootstrap_servers_config());
kafkaProps.put("acks", ConfigUtil.kafkaAcks());
kafkaProps.put("retries", ConfigUtil.kafkaRetries());
kafkaProps.put("batch.size", ConfigUtil.kafkaBatch_size_config());
kafkaProps.put("key.serializer", ConfigUtil.kafkaKey_serializer_class_config());
kafkaProps.put("value.serializer", ConfigUtil.kafkaValue_serializer_class_config());
kafkaProducer = new KafkaProducer<>(kafkaProps);
}
public void send(RowData rowData) {
kafkaProducer.send(new ProducerRecord<>(ConfigUtil.kafkaTopic(), null, rowData));
}
}
```
#### CanalClient调用KafkaSender生产数据
```java
public class CanalClient {
// ...
private KafkaSender kafkaSender;
// 开始监听
public void start() {
...
RowData rowData = new RowData(binlogMsgMap);
kafkaSender.send(rowData);
}
```
#### 创建Kafka topic并执行测试
```shell
# 创建topic
bin/kafka-topics.sh --create --zookeeper node1:2181 --topic ods_itcast_shop_mysql --replication-factor 3 --partitions 3
# 创建控制台消费者测试
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic ods_itcast_shop_mysql --from-beginning
```
## Flink实时ETL项目初始化
### 拷贝配置文件
```properties
#
#kafka的配置
#
# Kafka集群地址
bootstrap.servers="hadoop102:9092,hadoop103:9092,hadoop104:9092"
# ZooKeeper集群地址
zookeeper.connect="hadoop102:2181,hadoop103:2181,hadoop104:2181"
# 消费组ID
group.id="carlos"
# 自动提交拉取到消费端的消息offset到kafka
enable.auto.commit="true"
# 自动提交offset到zookeeper的时间间隔单位(毫秒)
auto.commit.interval.ms="5000"
# 每次消费最新的数据
auto.offset.reset="latest"
# kafka序列化器
key.serializer="org.apache.kafka.common.serialization.StringSerializer"
# kafka反序列化器
key.deserializer="org.apache.kafka.common.serialization.StringDeserializer"
# ip库本地文件路径
ip.file.path="E:/2021-05-10/carlos_shop_parent/data/qqwry.dat"
# Redis配置
redis.server.ip="hadoop102"
redis.server.port=6379
# MySQL配置
mysql.server.ip="hadoop102"
mysql.server.port=3306
mysql.server.database="itcast_shop"
mysql.server.username="root"
mysql.server.password="root"
# Kafka Topic 名称
input.topic.canal="ods_carlos_shop_mysql"
# Kafka click_log topic 名称
input.topic.click_log="ods_carlos_click_log"
# Kafka 购物车 topic 名称
input.topic.cart="ods_carlos_cart"
# kafka 评论 topic 名称
input.topic.comments="ods_carlos_comments"
# Druid Kafka 数据源 topic名称
output.topic.order="dwd_order"
output.topic.order_detail="dwd_order_detail"
output.topic.cart="dwd_cart"
output.topic.clicklog="dwd_click_log"
output.topic.goods="dwd_goods"
output.topic.ordertimeout="dwd_order_timeout"
output.topic.comments="dwd_comments"
# HBase订单明细表配置
hbase.table.orderdetail="dwd_order_detail"
hbase.table.family="detail"
```
拷贝以下内容到resources/log4j.properties
```properties
log4j.rootLogger=warn,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
```
拷贝一下内容到resources/hbase-site.xml
###
在scala目录中创建以下包结构:
| 包名 | 说明 |
| -------------------------------------- | ---------------- |
| com.carlos.shop.realtime.etl.app | 程序入口 |
| com.carlos.shop.realtime.etl.async | 异步IO相关 |
| com.carlos.shop.realtime.etl.bean | 实体类 |
| com.carlos.shop.realtime.etl.utils | 工具类 |
| com.carlos.shop.realtime.etl.process | 实时ETL处理 |
| com.carlos.shop.realtime.etl.dataloader | 维度数据离线同步 |
### 编写工具类加载配置文件
- 在 util 包下创建 GlobalConfigUtil 单例对象
- 编写代码
- 使用 ConfigFactory.load 获取配置对象
- 调用config.getString方法加载 application.conf 配置
- 添加一个main方法测试,工具类是否能够正确读取出配置项
**参考代码**
```scala
import com.typesafe.config.{Config, ConfigFactory}
object GlobalConfigUtil {
private val config: Config = ConfigFactory.load()
val `bootstrap.servers` = config.getString("bootstrap.servers")
val `zookeeper.connect` = config.getString("zookeeper.connect")
val `input.topic.canal` = config.getString("input.topic.canal")
val `input.topic.click_log` = config.getString("input.topic.click_log")
val `input.topic.comments` = config.getString("input.topic.comments")
val `group.id` = config.getString("group.id")
val `enable.auto.commit` = config.getString("enable.auto.commit")
val `auto.commit.interval.ms` = config.getString("auto.commit.interval.ms")
val `auto.offset.reset` = config.getString("auto.offset.reset")
val `key.serializer` = config.getString("key.serializer")
val `key.deserializer` = config.getString("key.deserializer")
val `output.topic.order` = config.getString("output.topic.order")
val `output.topic.order_detail` = config.getString("output.topic.order_detail")
val `output.topic.cart` = config.getString("output.topic.cart")
val `output.topic.clicklog` = config.getString("output.topic.clicklog")
val `output.topic.goods` = config.getString("output.topic.goods")
val `output.topic.ordertimeout` = config.getString("output.topic.ordertimeout")
val `output.topic.comments` = config.getString("output.topic.comments")
val `hbase.table.orderdetail` = config.getString("hbase.table.orderdetail")
val `hbase.table.family` = config.getString("hbase.table.family")
val `redis.server.ip` = config.getString("redis.server.ip")
val `redis.server.port`: String = config.getString("redis.server.port")
val `ip.file.path` = config.getString("ip.file.path")
val `mysql.server.ip` = config.getString("mysql.server.ip")
val `mysql.server.port` = config.getString("mysql.server.port")
val `mysql.server.database` = config.getString("mysql.server.database")
val `mysql.server.username` = config.getString("mysql.server.username")
val `mysql.server.password` = config.getString("mysql.server.password")
val `input.topic.cart` = config.getString("input.topic.cart")
def main(args: Array[String]): Unit = {
println(`bootstrap.servers`)
println(`zookeeper.connect`)
println(`input.topic.canal`)
println(`input.topic.click_log`)
println(`group.id`)
println(`enable.auto.commit`)
println(`auto.commit.interval.ms`)
println(`auto.offset.reset`)
println(`output.topic.order`)
println(`hbase.table.family`)
}
}
```
### 导入 Redis 连接池工具类
导入配置文件:工具类/1.redis连接池工具类/RedisUtil.scala 到 utils 目录
### 导入 Hbase 连接池工具类
导入配置文件:工具类/2.hbase 连接池工具类到 utils 目录
### 初始化Flink流式计算程序
- 创建App单例对象,初始化Flink运行环境
- 创建main方法
- 编写代码
- 获取StreamExecutionEnvironment运行环境
- 将Flink默认的开发环境并行度设置为1
- 开启checkpoint
- 编写测试代码,测试 Flink 程序是否能够正确执行
**注意事项**
- 一定要导入 import org.apache.flink.api.scala._ 隐式转换,否则Flink程序无法执行
### 实时 etl 特质抽取
#### 定义特质
该特质主要定义统一执行ETL处理,只有一个process方法,用于数据的接入、etl、落地。
#### 根据数据来源抽取etl的抽象类
对于来自于mysql的binlog日志的数据,抽取出来mysql的基类
对于日志数据,封装来自消息队列的基类
## 编写 Flink 程序解析 Kafka 中的 ProtoBuf
### 抽取Flink整合Kafka配置
因为后续的ETL处理,都是从Kafka中拉取数据,都需要共用Kafka的配置,所以将Kafka的配置单独抽取到工具类中
# ods层接入kafka
在utils包下创建KafkaConsumerProp
```scala
import java.util.Properties
object KafkaProp {
/**
* 读取Kafka属性配置
* @return
*/
def getProperties() = {
// 1. 读取Kafka配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", GlobalConfigUtil.`bootstrap.servers`)
properties.setProperty("zookeeper.connect", GlobalConfigUtil.`zookeeper.connect`)
properties.setProperty("group.id", GlobalConfigUtil.`group.id`)
properties.setProperty("enable.auto.commit", GlobalConfigUtil.`enable.auto.commit`)
properties.setProperty("auto.commit.interval.ms", GlobalConfigUtil.`auto.commit.interval.ms`)
properties.setProperty("auto.offset.reset", GlobalConfigUtil.`auto.offset.reset`)
properties.setProperty("key.serializer", GlobalConfigUtil.`key.serializer`)
properties.setProperty("key.deserializer", GlobalConfigUtil.`key.deserializer`)
properties
}
}
```
### Flink整合kafka数据源
数据来源主要是分为三个途径:
- 对于商品、订单、订单明细等等数据主要是来自于mysql的binlog日志
- 对于购物车、评论等数据主要是java后台程序直接推送到kafka集群中
- 对于点击流日志主要来自于nginx服务器使用flume采集到kafka集群中
#### 整合 Kafka 消费 binlog 消息
在抽象类MySqlBaseETL中,实现Flink整合Kafka。
操作步骤:
1、自定义 ProtoBuf 反序列化
* 因为 Canal 采集到的数据是以 ProtoBuf 形式推入到 Kafka 中的,故应该使用 ProtoBuf 来进行反序列化
2、Flink 整合 Kafka
3、创建订单实时 etl 处理类
4、编写 App 测试
##### 自定义 ProtoBuf 反序列化
反序列化主要是将Byte数组转换为之前封装在 common 工程的RowData类
##### 定义MySQL消息etl抽象类
后面不少的业务逻辑(订单、订单明细、商品等)需要共用一份Kafka数据(从一个topic中拉取),抽取抽象类的目的是共用一个FlinkKafkaConsumer,因为后面创建FlinkKafkaConsumer整合Kafka需要使用到Flink流式运行环境,需要在主构造器中传入Flink流式运行环境。该ETL抽象类需要从BaseETL继承。
##### 创建订单实时etl处理类
在etl包下创建OrderETL类,用于后续订单实时拉宽处理,此时只用来进行测试
##### 编写App测试
* 创建OrderETL实例对象,并调用process方法执行测试
* 启动canal-client客户端程序订阅binlog消息
```scala
val orderETL = new OrderETL(env)
orderETL.process()
```
#### 整合 kafka 消费字符串类型消息
##### 定义 MQBase 消息 et l抽象类
后面不少的业务逻辑(购物车、评论、点击流等)需要共用一份Kafka数据,抽取抽象类的目的是共用一个FlinkKafkaConsumer,因为后面创建 FlinkKafkaConsumer 整合 Kafka 需要使用到Flink流式运行环境,需要在主构造器中传入Flink流式运行环境。该ETL抽象类需要从BaseETL继承。
#### 整合Kafka消费购物车消息
##### 在application.conf中添加配置
```properties
# Kafka 购物车 topic名称
input.topic.cart="ods_carlos_cart"
```
##### 读取配置
```scala
val `input.topic.cart` = config.getString("input.topic.cart")
```
##### 创建CartETL类,整合Kafka
* 在etl包下创建CartETL,从MQBaseETL继承
* 实现process方法
* 实现getKafkaDS方法,在该方法中整合Kafka,并测试打印消费数据
#### 整合Kafka消费评论消息
##### 在application.conf中添加配置
```properties
# kafka 评论 topic名称
input.topic.comments="ods_carlos_shop_comments"
```
##### 读取配置
```scala
val `input.topic.comments` = config.getString("input.topic.comments")
```
##### 创建 CommentsETL 类,整合 Kafka
* 在etl包下创建CommentsETL,从MQBaseETL继承
* 实现process方法,并测试打印消费数据
```scala
/**
* 点击流处理逻辑
*/
class CommentsETL(env:StreamExecutionEnvironment) extends BaseETL[String] {
/**
* 业务处理接口
*/
override def process(): Unit = {
// 1. 整合Kafka
val commentsDS: DataStream[String] = getKafkaDataStream(GlobalConfigUtil.`input.topic.comments`)
commentsDS.print()
}
}
```
#### 整合Kafka消费点击流消息
##### 在application.conf中添加配置
```properties
# Kafka click_log topic名称
input.topic.click_log="ods_carlos_click_log"
```
##### 读取配置
```scala
val `input.topic.click_log` = config.getString("input.topic.click_log")
```
##### 创建ClickLogETL类,整合Kafka
* 在etl包下创建ClickLogETL,从MQBaseETL继承
* 实现process方法,整合Kafka,并测试打印消费数据
```scala
/**
* 点击流处理逻辑
*/
class ClickLogETL(env:StreamExecutionEnvironment) extends BaseETL[String] {
/**
* 业务处理接口
*/
override def process(): Unit = {
// 1. 整合kafka
val clickLogDS: DataStream[String] = getKafkaDataStream(GlobalConfigUtil.`input.topic.click_log`)
clickLogDS.print()
}
}
```
## Flink实时etl
### 开发环境准备
#### 维度数据全量装载
为了后续将订单、订单明细等数据进行实时ETL拉宽,需要提前将一些维度数据加载一个高性能存储中。此处,选择Redis作为商品维度、商品分类维度、门店维度、运营组织机构维度存储。先一次性将所有MySQL中的维度数据全量装载到Redis中,后续只要MySQL中的维度数据更新,马上使用Flink更新Redis中的维度数据
##### 创建样例类
* 在 com.carlos.shop.realtime.bean 的 DimEntity 类中创建以下样例类
* DimGoodsDBEntity 商品维度样例类
| 列名 | 描述 |
| ---------- | ---------- |
| goodsName | 商品名称 |
| shopId | 店铺id |
| goodsCatId | 商品分类id |
| shopPrice | 商品价格 |
| goodsId | 商品id |
* DimGoodsCatDBEntity 商品分类维度样例类
| 列名 | 描述 |
| --------- | ------------ |
| catId | 商品分类id |
| parentId | 商品分类父id |
| catName | 商品分类名称 |
| cat_level | 商品分类级别 |
* DimShopsDBEntity 店铺维度样例类
| 列名 | 描述 |
| ----------- | ------------ |
| shopId | 店铺id |
| areaId | 区域id |
| shopName | 店铺名称 |
| shopCompany | 店铺公司名称 |
* DimOrgDBEntity 机构维度样例表
| 列名 | 描述 |
| -------- | -------- |
| orgId | 机构id |
| parentId | 父机构id |
| orgName | 机构名称 |
| orgLevel | 机构级别 |
- DimShopCatsDBEntity门店商品分类维度样例表
| 列名 | 描述 |
| -------- | ---------------- |
| catId | 门店商品分类id |
| parentId | 门店商品分类父id |
| catName | 门店商品分类名称 |
| catSort | 门店商品分类级别 |
##### 在配置文件中添加 Redis 配置、MySQL 配置
```properties
# Redis配置
redis.server.ip="hadoop102"
redis.server.port=6379
# MySQL配置
mysql.server.ip="hadoop102"
mysql.server.port=3306
mysql.server.database="itcast_shop"
mysql.server.username="root"
mysql.server.password="root"
```
##### 编写配置工具类
```scala
val `redis.server.ip` = config.getString("redis.server.ip")
val `redis.server.port` = config.getString("redis.server.port")
val `mysql.server.ip` = config.getString("mysql.server.ip")
val `mysql.server.port` = config.getString("mysql.server.port")
val `mysql.server.database` = config.getString("mysql.server.database")
val `mysql.server.username` = config.getString("mysql.server.username")
val `mysql.server.password` = config.getString("mysql.server.password")
```
##### 编写Redis操作工具类
在utils类中添加RedisUtils类,使用Redis连接池操作Redis
```scala
object RedisUtil {
val config = new JedisPoolConfig()
//是否启用后进先出, 默认true
config.setLifo(true)
//最大空闲连接数, 默认8个
config.setMaxIdle(8)
//最大连接数, 默认8个
config.setMaxTotal(1000)
//获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1
config.setMaxWaitMillis(-1)
//逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
config.setMinEvictableIdleTimeMillis(1800000)
//最小空闲连接数, 默认0
config.setMinIdle(0)
//每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
config.setNumTestsPerEvictionRun(3)
//对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
config.setSoftMinEvictableIdleTimeMillis(1800000)
//在获取连接的时候检查有效性, 默认false
config.setTestOnBorrow(false)
//在空闲时检查有效性, 默认false
config.setTestWhileIdle(false)
var jedisPool: JedisPool = new JedisPool(config, GlobalConfigUtil.`redis.server.ip`, GlobalConfigUtil.`redis.server.port`.toInt)
/**
* 获取Redis连接
* @return
*/
def getResouce() = {
jedisPool.getResource
}
}
```
##### 读取MySQL商品维度数据到Redis
在 com.carlos.shop.realtime.etl.dataloader 包中创建 DimensionDataLoader 单例对象,实现装载商品维度数据
实现步骤:
1、先从MySQL的 itcast_goods 表中加载数据
2、将数据保存到键为 carlos_goods:dim_goods 的 HASH 结构中
3、关闭资源
##### 读取 MySQL店铺维度数据到Redis
实现步骤:
1、先从MySQL的 itcast_shops 表中加载数据
2、将数据保存到键为 carlos_goods:dim_shops 的 HASH 结构中
3、关闭资源
##### 读取MySQL商品分类维度数据到Redis
实现步骤:
1、先从MySQL的 itcast_goods_cats 表中加载数据
2、将数据保存到键为 carlos_shop:dim_goods_cats 的 HASH 结构中
3、关闭资源
##### 读取MySQL组织结构数据到Redis
实现步骤:
1、先从MySQL的 itcast_org 表中加载数据
2、将数据保存到键为 carlos_shop:dim_org 的 HASH 结构中
3、关闭资源
##### 读取MySQL门店商品分类维度数据到Redis
实现步骤:
1、先从MySQL的 itcast_shop_cats表中加载数据
2、将数据保存到键为 carlos_shop:dim_shop_cats 的 HASH 结构中
3、关闭资源
维度数据增量更新
#### 创建同步Redis中维度数据ETL处理类
在etl包下创建SyncDimDataETL类,继承MySqlBaseETL特质,实现process方法
```scala
/**
* Redis维度数据同步业务
*/
class SyncDimDataETL(env: StreamExecutionEnvironment) extends MySqlBaseETL(env) {
/**
* 业务处理接口
*/
override def process(): Unit = {
}
}
```
##### 在App中调用同步ETL
1、在App中创建实时同步ETL,并调用处理方法
```scala
val syncRedisDimDataETL = new SyncRedisDimDataETL(env)
syncRedisDimDataETL.process()
```
2、启动Flink程序
3、测试
* 新增MySQL中的一条表数据,测试Redis中的数据是否同步更新
* 修改MySQL中的一条表数据,测试Redis中的数据是否同步更新
* 删除MySQL中的一条表数据,测试Redis中的数据是否同步更新
查看redis中的数据
### 点击流消息实时拉宽处理
#### Apache HTTPD和NGINX访问日志解析器
这是一个Logparsing框架,旨在简化[Apache HTTPD](https://httpd.apache.org/)和[NGINX](https://nginx.org/)访问日志文件的解析。
基本思想是,您应该能够拥有一个解析器,可以通过简单地告诉该行写入了哪些配置选项来构造该解析器。这些配置选项是访问日志行的架构。
##### 导入依赖
```xml
nl.basjes.parse.httpdlog
httpdlog-parser
5.2
```
##### nginx日志样本
在nginx的conf目录下的nginx.conf文件中可以配置日志打印的格式,如下:
```
#log_format main '$remote_addr - $remote_user [$time_local] [$msec]
[$request_time] [$http_host] "$request" '
'$status $body_bytes_sent "$request_body" "$http_referer" '
'"$http_user_agent" $http_x_forwarded_for'
```
$remote_addr 对应客户端的地址
$remote_user 是请求客户端请求认证的用户名,如果没有开启认证模块的话是值为空。
$time_local 表示nginx服务器时间
$msec 访问时间与时区字符串形式
$request_time 请求开始到返回时间
$http_host 请求域名
$request 请求的url与http协议
$status 请求状态,如成功200
$body_bytes_sent 表示从服务端返回给客户端的body数据大小
$request_body 访问url时参数
$http_referer 记录从那个页面链接访问过来的
$http_user_agent 记录客户浏览器的相关信息
$http_x_forwarded_for 请求转发过来的地址
$upstream_response_time: 从 Nginx 建立连接 到 接收完数据并关闭连接
##### 定义格式化字符串
参考:https://httpd.apache.org/docs/current/mod/mod_log_config.html
```
%u %h %l %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" \"%{Cookie}i\" \"%{Addr}i\"
## Flink实时etl
### 点击流消息实时拉宽处理
#### 拉宽点击流消息处理
#### 点击流消息实时拉宽测试
### 订单数据实时处理
#### 操作步骤
1、在etl包下创建OrderETL类,实现MySqlBaseETL
2、过滤出来表名为itcast_orders,且事件类型为 "insert" 的binlog消息
3、将RowData数据流转换为 OrderDBEntity数据源
4、为了方便落地到 Kafka,再将OrderDBEntity 转换为JSON字符串
5、将转换后的json字符串写入到kafka的**dwd_order**topic中
### 订单明细数据实时拉宽处理
#### 操作步骤
1、在etl包下创建OrderGoodsEtl类,实现MySqlBaseETl
2、先过滤出表名为 itcast_order_goods,且事件类型为 "insert" 的binlog消息
3、为了方便落地到 Kafka,再将OrderGoodsWideEntity转换为JSON字符串
4、根据以下方式拉宽订单明细数据
* 根据商品id,从redis中商品维度Hash获取商品数据
* 根据店铺id,从redis中门店维护Hash获取门店数据
* 根据商品数据的三级分类id,从redis中的商品分类Hash获取三级分类数据
* 根据三级分类数据的parentid,从redis中的商品分类Hash获取二级分类数据
* 根据二级分类数据的parentid,从redis中的商品分类Hash获取一级分类数据
* 根据门店数据的areaId,从redis中的组织机构Hash获取城市机构数据
* 根据城市机构数据的parentId,从redis中的组织机构Hash获取省份机构数据
5、将拉宽后的订单明细数据写入到hbase数据库中
### 商品消息实时拉宽处理
#### 操作步骤
1、在etl包下创建GoodsETL类,实现MySqlBaseETL
2、先过滤出表名为 itcast_goods
3、根据以下方式拉宽订单明细数据
* 根据商品id,从redis中商品维度Hash获取商品数据
* 根据店铺id,从redis中门店维护Hash获取门店数据
* 根据商品数据的三级分类id,从redis中的商品分类Hash获取三级分类数据
* 根据三级分类数据的parentid,从redis中的商品分类Hash获取二级分类数据
* 根据二级分类数据的parentid,从redis中的商品分类Hash获取一级分类数据
* 根据门店数据的areaId,从redis中的组织机构Hash获取城市机构数据
* 根据城市机构数据的parentId,从redis中的组织机构Hash获取省份机构数据
4、为了方便落地到 Kafka,再将GoodsWideBean转换为JSON字符串
#### 注册IP库为Flink分布式缓存
```scala
env.registerCachedFile(GlobalConfigUtil.`ip.file.path`, "qqwry.dat")
```
#### 拉宽购物车数据
1、将Kafka的字符串消息转换为实体类
```scala
// 将JSON转换为实体类
val cartBeanDS = kafkaDS.map(CartBean(_))
```
2、使用RichMapFunction实现,从Redis中拉取维度数据
3、解析IP地址
4、拉宽日期时间
### 评论消息实时拉宽处理
#### 操作步骤
1、在etl包下创建CommentsETL类,实现MQBaseETL
2、先过滤出表名为 itcast_goods
3、消费kafka的数据因为kafka的数据是字符串,消费出来需要转换成Comments对象
4、根据以下方式拉宽订单明细数据
* 根据商品id,从redis中商品维度Hash获取商品数据
* 根据店铺id,从redis中门店维护Hash获取门店数据
* 根据商品数据的三级分类id,从redis中的商品分类Hash获取三级分类数据
* 根据三级分类数据的parentid,从redis中的商品分类Hash获取二级分类数据
* 根据二级分类数据的parentid,从redis中的商品分类Hash获取一级分类数据
* 根据门店数据的areaId,从redis中的组织机构Hash获取城市机构数据
* 根据城市机构数据的parentId,从redis中的组织机构Hash获取省份机构数据
4、为了方便落地到 Kafka,再将CommentsWideEntity转换为JSON字符串
## 使用JDBC查询Druid中的数据
### 安装 imply-3.0.4
Imply-3.0.4 基于 apache-druid-0.15.0-Incubating
- 1、下载imply
```shell
cd /opt/module/
wget https://static.imply.io/release/imply-3.0.4.tar.gz
```
- 2、直接使用资料 imply安装包jps
```shell script
将该 `imply安装包\imply-3.0.4.tar.gz` 安装包上传到 /exports/softwares
```
- 3、解压imply-3.0.4
```shell script
tar -xvzf imply-3.0.4.tar.gz -C ../module
cd ../module/imply-3.0.4
```
- 4、配置 imply-3.0.4
mysql中创建imply相关的数据库
```sql
CREATE DATABASE `druid` DEFAULT CHARACTER SET utf8;
CREATE DATABASE `pivot` DEFAULT CHARACTER SET utf8;
```
### 修改并上传配置文件
修改 conf/druid/_common/common.runtime.properties 文件
修改zookeeper的配置
```shell script
druid.zk.service.host=hadoop102:2181,hadoop103:2181,hadoop104:2181
```
修改MySQL的配置
```shell script
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://hadoop102:3306/druid
druid.metadata.storage.connector.user=root
druid.metadata.storage.connector.password=root
```
修改 conf/pivot/config.yaml 配置文件
- 修改mysql的配置
```shell script
stateStore:
type: mysql
location: mysql
connection: 'mysql://root:root@hadoop102:3306/pivot'
```
将配置好的 imply 分发到不同节点
- 分发脚本 xsync
```shell script
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
echo Not Enough Arguement!
exit;
fi
#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
echo ==================== $host ====================
#3. 遍历所有目录,挨个发送
for file in $@
do
#4 判断文件是否存在
if [ -e $file ]
then
#5. 获取父目录
pdir=$(cd -P $(dirname $file); pwd)
#6. 获取当前文件的名称
fname=$(basename $file)
ssh $host "mkdir -p $pdir"
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done
```
imply 分发
```shell script
[carlos@hadoop102 ~]$ xsync imply-3.0.4
```
### 配置环境变量
在每台服务器上配置DRUID_HOME环境变量
sudo vim /etc/profile.d/my_env.sh
```shell script
# DRUID
export DRUID_HOME=/opt/module/imply-3.0.4
```
source /etc/profile.d/my_env.sh
### 启动 imply 集群
1、启动zk集群
https://blog.csdn.net/Aeve_imp/article/details/107597274
2、hadoop102节点(使用外部zk而不使用imply自带zk启动overlord和coordinator)
```shell script
# 使用外部 zk 而不使用imply自带zk启动overlord和coordinator
[carlos@hadoop102 imply-3.0.4]$ bin/supervise -c /opt/module/imply-3.0.4/conf/supervise/master-no-zk.conf
```
3、hadoop103节点(启动historical和middlemanager)
```shell script
[carlos@hadoop103 imply-3.0.4]$ bin/supervise -c /opt/module/imply-3.0.4/conf/supervise/data.conf
```
4、hadoop104节点(启动broker和router)
```shell script
[carlos@hadoop104 imply-3.0.4]$ bin/supervise -c /opt/module/imply-3.0.4/conf/supervise/query.conf
```
**注意事项**
- 如果希望imply运行在后台,在每个执行命令后面加 --daemonize
#### 访问WebUI
| **组件名** | **URL** |
| ------------------------- | ---------------------------------- |
| broker | http://hadoop104:8888 |
| coordinator、overlord | http://hadoop102:8081/index.html |
| middleManager、historical | http://hadoop102:8090/console.html |
Druid提供了JDBC接口,JavaWeb项目可以直接使用 JDBC 连接Druid进行实时数据分析。
需求:
- 获取 metrics-kafka 数据源中,不同用户的访问次数
实现步骤:
1、创建 druid_jdbc Maven模块
2、导入依赖
3、编写JDBC代码连接Druid获取数据
- 加载Druid JDBC驱动
- 获取Druid JDBC连接
- 构建SQL语句
- 构建Statement,执行SQL获取结果集
- 关闭Druid连接
# 数据可视化项目
操作步骤:
1、导入 carlos_dw_web 项目
2、修改 DashboardServiceImpl.java 中 Redis 服务器地址
3、修改 utils.DruidHelper 中Druid的 url 地址
4、修改druid连接字符串的表名:dws_od改成dws_order
5、启动 Jetty 服务器
6、打开浏览器访问 http://localhost:8080/itcast_dw_web
## Superset
### BI VS 报表工具
* 报表工具是数据展示工具,而BI(商业智能)是数据分析工具。报表工具可以制作各类数据报表、图形报表的工具,甚至还可以制作电子发票联、流程单、收据等。
* BI可以将数据进行模型构建,制作成Dashboard,相比于报表,侧重点在于分析,操作简单、数据处理量大。常常基于企业搭建的数据平台,连接数据仓库进行分析。
#### 安装教程
1. xxxx
2. xxxx
3. xxxx
#### 参与贡献
1. Fork 本仓库
2. 新建 Feat_xxx 分支
3. 提交代码
4. 新建 Pull Request