# carlos_shop_warehouse **Repository Path**: Carlosg_admin/carlos_shop_warehouse ## Basic Information - **Project Name**: carlos_shop_warehouse - **Description**: 公司需要大屏用于展示订单数据与用户访问数据,这里采用使用 Flink 来搭建实时计算平台 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 14 - **Forks**: 4 - **Created**: 2021-05-18 - **Last Updated**: 2025-02-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # carlos_shop_ warehouse #### 介绍 Flink 实时数仓 ## 实现方案 ### JAVA 方式实现 - 一些中小企业当中,由于数据量较小(比如核心总量小于20万条),可通过Java程序定时查询mysql实现 - 比较简单,但是粗暴实用 - 仅仅需要对mysql做一些优化即可,比较增加索引 ### 通过flink方案实现 - 数据量特别大、无法直接通过mysql查询完成,有时候根本查询不动 - 要求实时性高,比如阿里巴巴双十一监控大屏,要求延迟不超过1秒 实时数仓项目架构 ![architecture](images/architecture.jpg) ## Canal介绍 ### 简介 * 基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 * 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更 * 从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务,基于日志增量订阅和消费的业务包括 * 数据库镜像 * 数据库实时备份 * 索引构建和实时维护(拆分异构索引、倒排索引等) * 业务 cache 刷新 * 带业务逻辑的增量数据处理 * 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x * github地址:https://github.com/alibaba/canal ## 环境部署 ### MySQL - MySQL需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,/etc/my.cnf 中配置如下 ```properties [mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 ``` - 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant ```sql CREATE USER root IDENTIFIED BY 'root'; GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' ; FLUSH PRIVILEGES; ``` ### Canal安装 ![architecture](images/canaL.png) **注意**:本项目使用的版本 canal1.0.24 环境要求: * 安装好 ZooKeeper - 解压缩 ```shell mkdir /opt/module/canal tar -zxvf canal.deployer-1.0.24.tar.gz -C /opt/module/canal/ ``` - 解压完成后,进入 /opt/module/canal/ 目录,可以看到如下结构 ```shell drwxr-xr-x. 2 root root 4096 2月 1 14:07 bin drwxr-xr-x. 4 root root 4096 2月 1 14:07 conf drwxr-xr-x. 2 root root 4096 2月 1 14:07 lib drwxrwxrwx. 2 root root 4096 4月 1 2017 logs ``` - canal server的conf下有几个配置文件 ~~~ [root@hadoop102 canal]# tree conf/ conf/ ├── canal.properties ├── example │   └── instance.properties ├── logback.xml └── spring ├── default-instance.xml ├── file-instance.xml ├── group-instance.xml ├── local-instance.xml └── memory-instance.xml ~~~ - 修改instance 配置文件 vim conf/example/instance.properties ```properties ## mysql serverId,这里的slaveId不能和myql集群中已有的server_id一样 canal.instance.mysql.slaveId = 1234 # 按需修改成自己的数据库信息 ################################################# ... canal.instance.master.address=hadoop102:3306 # username/password,数据库的用户名和密码 ... canal.instance.dbUsername = root canal.instance.dbPassword = root ################################################# ``` - 启动 ``` sh bin/startup.sh ``` ## Canal客户端开发 ```xml com.alibaba.otter canal.client 1.0.24 com.alibaba fastjson 1.2.58 ``` ### 在canal_demo模块创建包结构 | 包名 | 说明 | | ---------------------- | ------------ | | com.carlos.canal_demo | 代码存放目录 | ### 开发步骤 1. 创建 Connector 2. 连接 Cannal 服务器,并订阅 3. 解析 Canal 消息,并打印 #### Canal 消息格式 ### 转换为 JSON 数据 * 复制上述代码,将 binlog 日志封装在一个Map结构中,使用 fastjson 转换为 JSON 格式 ```java Entry Header logfileName [binlog文件名] logfileOffset [binlog position] executeTime [binlog里记录变更发生的时间戳,精确到秒] schemaName tableName eventType [insert/update/delete类型] entryType [事务头BEGIN/事务尾END/数据ROWDATA] storeValue [byte数据,可展开,对应的类型为RowChange] RowChange isDdl [是否是ddl变更操作,比如create table/drop table] sql [具体的ddl sql] rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理] beforeColumns [Column类型的数组,变更前的数据字段] afterColumns [Column类型的数组,变更后的数据字段] Column index sqlType [jdbc type] name [column name] isKey [是否为主键] updated [是否发生过变更] isNull [值是否为null] value [具体的内容,注意为string文本] ``` - MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件 log events,可以通过 show binlog events 进行查看) - MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log) - MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据,以此来达到数据一致。 >**mysql的binlog** > >它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间。主要用来备份和数据同步。 > >binlog 有三种: STATEMENT、ROW、MIXED > >* STATEMENT 记录的是执行的sql语句 >* ROW 记录的是真实的行数据记录 >* MIXED 记录的是1+2,优先按照1的模式记录 > **名词解释**: > > *什么是中继日志* > > 从服务器I/O线程将主服务器的二进制日志读取过来记录到从服务器本地文件,然后从服务器SQL线程会读取 relay-log 日志的内容并应用到从服务器,从而使从服务器和主服务器的数据保持一致 ### canal 工作原理 ![img](images/canaL.png) - canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议 - MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ) - canal 解析 binary log 对象(原始为 byte 流) ### 架构 ![img](assets/9190482-da6ee9204d5d0f12.webp) - server 代表一个 canal 运行实例,对应于一个 jvm - instance 对应于一个数据队列 (1个 canal server 对应 1..n 个 instance ) - instance 下的子模块 - eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析 - eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作 - eventStore: 数据存储 - metaManager: 增量订阅 & 消费信息管理器 EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则**获取初始指定位置**或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(*传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功* ),传送成功之后更新Log Position。流程图如下: ![image-20200214104557452](images/canal_structure.png) - EventSink起到一个类似channel的功能,可以对数据进行*过滤、分发/路由(1:n)、归并(n:1)和加工*。EventSink是连接EventParser和EventStore的桥梁。 - EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。 - MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为: - Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象] - void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作 - void ack(long batchId),顾名思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作 ### 开发Canal客户端订阅binlog消息 #### 在canal_client模块创建包结构 | 包名 | 说明 | | ----------------------------- | ----------------------------- | | com.carlos.canal_client | 存放入口、Canal客户端核心实现 | | com.carlos.canal_client.util | 存放工具类 | | com.carlos.canal_client.kafka | 存放Kafka生产者实现 | #### 添加配置文件 ```properties # canal配置 canal.server.ip=node1 canal.server.port=11111 canal.server.destination=example canal.server.username=canal canal.server.password=canal canal.subscribe.filter=itcast_shop.* # zookeeper配置 zookeeper.server.ip=hadoop102:2181,hadoop103:2181,hadoop104:2181 # kafka配置 kafka.bootstrap_servers_config=hadoop102:9092,hadoop103:9092,hadoop104:9092 kafka.batch_size_config=1024 kafka.acks=all kafka.retries=0 kafka.client_id_config=carlos_shop_canal_click kafka.key_serializer_class_config=org.apache.kafka.common.serialization.StringSerializer kafka.value_serializer_class_config=com.carlos.canal.protobuf.ProtoBufSerializer kafka.topic=ods_carlos_shop_mysql ``` #### 编写CanalClient客户端核心实现类 #### 编写Entrance入口 ```java /** * 入口 */ public class Entrance { public static void main(String[] args) { CanalClient canalClient = new CanalClient(); canalClient.start(); } } ``` ### 使用ProtoBuf序列化binlog消息 #### 在 carlos_shop_common 模块创建包结构 | 包名 | 说明 | | ------------------------ | -------------------------- | | com.carlos.canal.bean | 存放通用的JavaBean | | com.carlos.canal.protobuf | 实现Protobuf相关接口、实现 | ### 生产ProtoBuf消息到Kafka中 #### 实现KafkaSender 该类用于生成数据到Kafka ```java /** * Kafka生产者 */ public class KafkaSender { private Properties kafkaProps = new Properties(); private KafkaProducer kafkaProducer; public KafkaSender() { kafkaProps.put("bootstrap.servers", ConfigUtil.kafkaBootstrap_servers_config()); kafkaProps.put("acks", ConfigUtil.kafkaAcks()); kafkaProps.put("retries", ConfigUtil.kafkaRetries()); kafkaProps.put("batch.size", ConfigUtil.kafkaBatch_size_config()); kafkaProps.put("key.serializer", ConfigUtil.kafkaKey_serializer_class_config()); kafkaProps.put("value.serializer", ConfigUtil.kafkaValue_serializer_class_config()); kafkaProducer = new KafkaProducer<>(kafkaProps); } public void send(RowData rowData) { kafkaProducer.send(new ProducerRecord<>(ConfigUtil.kafkaTopic(), null, rowData)); } } ``` #### CanalClient调用KafkaSender生产数据 ```java public class CanalClient { // ... private KafkaSender kafkaSender; // 开始监听 public void start() { ... RowData rowData = new RowData(binlogMsgMap); kafkaSender.send(rowData); } ``` #### 创建Kafka topic并执行测试 ```shell # 创建topic bin/kafka-topics.sh --create --zookeeper node1:2181 --topic ods_itcast_shop_mysql --replication-factor 3 --partitions 3 # 创建控制台消费者测试 bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic ods_itcast_shop_mysql --from-beginning ``` ## Flink实时ETL项目初始化 ### 拷贝配置文件 ```properties # #kafka的配置 # # Kafka集群地址 bootstrap.servers="hadoop102:9092,hadoop103:9092,hadoop104:9092" # ZooKeeper集群地址 zookeeper.connect="hadoop102:2181,hadoop103:2181,hadoop104:2181" # 消费组ID group.id="carlos" # 自动提交拉取到消费端的消息offset到kafka enable.auto.commit="true" # 自动提交offset到zookeeper的时间间隔单位(毫秒) auto.commit.interval.ms="5000" # 每次消费最新的数据 auto.offset.reset="latest" # kafka序列化器 key.serializer="org.apache.kafka.common.serialization.StringSerializer" # kafka反序列化器 key.deserializer="org.apache.kafka.common.serialization.StringDeserializer" # ip库本地文件路径 ip.file.path="E:/2021-05-10/carlos_shop_parent/data/qqwry.dat" # Redis配置 redis.server.ip="hadoop102" redis.server.port=6379 # MySQL配置 mysql.server.ip="hadoop102" mysql.server.port=3306 mysql.server.database="itcast_shop" mysql.server.username="root" mysql.server.password="root" # Kafka Topic 名称 input.topic.canal="ods_carlos_shop_mysql" # Kafka click_log topic 名称 input.topic.click_log="ods_carlos_click_log" # Kafka 购物车 topic 名称 input.topic.cart="ods_carlos_cart" # kafka 评论 topic 名称 input.topic.comments="ods_carlos_comments" # Druid Kafka 数据源 topic名称 output.topic.order="dwd_order" output.topic.order_detail="dwd_order_detail" output.topic.cart="dwd_cart" output.topic.clicklog="dwd_click_log" output.topic.goods="dwd_goods" output.topic.ordertimeout="dwd_order_timeout" output.topic.comments="dwd_comments" # HBase订单明细表配置 hbase.table.orderdetail="dwd_order_detail" hbase.table.family="detail" ``` 拷贝以下内容到resources/log4j.properties ```properties log4j.rootLogger=warn,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n ``` 拷贝一下内容到resources/hbase-site.xml ### 在scala目录中创建以下包结构: | 包名 | 说明 | | -------------------------------------- | ---------------- | | com.carlos.shop.realtime.etl.app | 程序入口 | | com.carlos.shop.realtime.etl.async | 异步IO相关 | | com.carlos.shop.realtime.etl.bean | 实体类 | | com.carlos.shop.realtime.etl.utils | 工具类 | | com.carlos.shop.realtime.etl.process | 实时ETL处理 | | com.carlos.shop.realtime.etl.dataloader | 维度数据离线同步 | ### 编写工具类加载配置文件 - 在 util 包下创建 GlobalConfigUtil 单例对象 - 编写代码 - 使用 ConfigFactory.load 获取配置对象 - 调用config.getString方法加载 application.conf 配置 - 添加一个main方法测试,工具类是否能够正确读取出配置项 **参考代码** ```scala import com.typesafe.config.{Config, ConfigFactory} object GlobalConfigUtil { private val config: Config = ConfigFactory.load() val `bootstrap.servers` = config.getString("bootstrap.servers") val `zookeeper.connect` = config.getString("zookeeper.connect") val `input.topic.canal` = config.getString("input.topic.canal") val `input.topic.click_log` = config.getString("input.topic.click_log") val `input.topic.comments` = config.getString("input.topic.comments") val `group.id` = config.getString("group.id") val `enable.auto.commit` = config.getString("enable.auto.commit") val `auto.commit.interval.ms` = config.getString("auto.commit.interval.ms") val `auto.offset.reset` = config.getString("auto.offset.reset") val `key.serializer` = config.getString("key.serializer") val `key.deserializer` = config.getString("key.deserializer") val `output.topic.order` = config.getString("output.topic.order") val `output.topic.order_detail` = config.getString("output.topic.order_detail") val `output.topic.cart` = config.getString("output.topic.cart") val `output.topic.clicklog` = config.getString("output.topic.clicklog") val `output.topic.goods` = config.getString("output.topic.goods") val `output.topic.ordertimeout` = config.getString("output.topic.ordertimeout") val `output.topic.comments` = config.getString("output.topic.comments") val `hbase.table.orderdetail` = config.getString("hbase.table.orderdetail") val `hbase.table.family` = config.getString("hbase.table.family") val `redis.server.ip` = config.getString("redis.server.ip") val `redis.server.port`: String = config.getString("redis.server.port") val `ip.file.path` = config.getString("ip.file.path") val `mysql.server.ip` = config.getString("mysql.server.ip") val `mysql.server.port` = config.getString("mysql.server.port") val `mysql.server.database` = config.getString("mysql.server.database") val `mysql.server.username` = config.getString("mysql.server.username") val `mysql.server.password` = config.getString("mysql.server.password") val `input.topic.cart` = config.getString("input.topic.cart") def main(args: Array[String]): Unit = { println(`bootstrap.servers`) println(`zookeeper.connect`) println(`input.topic.canal`) println(`input.topic.click_log`) println(`group.id`) println(`enable.auto.commit`) println(`auto.commit.interval.ms`) println(`auto.offset.reset`) println(`output.topic.order`) println(`hbase.table.family`) } } ``` ### 导入 Redis 连接池工具类 导入配置文件:工具类/1.redis连接池工具类/RedisUtil.scala 到 utils 目录 ### 导入 Hbase 连接池工具类 导入配置文件:工具类/2.hbase 连接池工具类到 utils 目录 ### 初始化Flink流式计算程序 - 创建App单例对象,初始化Flink运行环境 - 创建main方法 - 编写代码 - 获取StreamExecutionEnvironment运行环境 - 将Flink默认的开发环境并行度设置为1 - 开启checkpoint - 编写测试代码,测试 Flink 程序是否能够正确执行 **注意事项** - 一定要导入 import org.apache.flink.api.scala._ 隐式转换,否则Flink程序无法执行 ### 实时 etl 特质抽取 #### 定义特质 该特质主要定义统一执行ETL处理,只有一个process方法,用于数据的接入、etl、落地。 #### 根据数据来源抽取etl的抽象类 对于来自于mysql的binlog日志的数据,抽取出来mysql的基类 对于日志数据,封装来自消息队列的基类 ## 编写 Flink 程序解析 Kafka 中的 ProtoBuf ### 抽取Flink整合Kafka配置 因为后续的ETL处理,都是从Kafka中拉取数据,都需要共用Kafka的配置,所以将Kafka的配置单独抽取到工具类中 # ods层接入kafka 在utils包下创建KafkaConsumerProp ```scala import java.util.Properties object KafkaProp { /** * 读取Kafka属性配置 * @return */ def getProperties() = { // 1. 读取Kafka配置 val properties = new Properties() properties.setProperty("bootstrap.servers", GlobalConfigUtil.`bootstrap.servers`) properties.setProperty("zookeeper.connect", GlobalConfigUtil.`zookeeper.connect`) properties.setProperty("group.id", GlobalConfigUtil.`group.id`) properties.setProperty("enable.auto.commit", GlobalConfigUtil.`enable.auto.commit`) properties.setProperty("auto.commit.interval.ms", GlobalConfigUtil.`auto.commit.interval.ms`) properties.setProperty("auto.offset.reset", GlobalConfigUtil.`auto.offset.reset`) properties.setProperty("key.serializer", GlobalConfigUtil.`key.serializer`) properties.setProperty("key.deserializer", GlobalConfigUtil.`key.deserializer`) properties } } ``` ### Flink整合kafka数据源 数据来源主要是分为三个途径: - 对于商品、订单、订单明细等等数据主要是来自于mysql的binlog日志 - 对于购物车、评论等数据主要是java后台程序直接推送到kafka集群中 - 对于点击流日志主要来自于nginx服务器使用flume采集到kafka集群中 #### 整合 Kafka 消费 binlog 消息 在抽象类MySqlBaseETL中,实现Flink整合Kafka。 操作步骤: 1、自定义 ProtoBuf 反序列化 * 因为 Canal 采集到的数据是以 ProtoBuf 形式推入到 Kafka 中的,故应该使用 ProtoBuf 来进行反序列化 2、Flink 整合 Kafka 3、创建订单实时 etl 处理类 4、编写 App 测试 ##### 自定义 ProtoBuf 反序列化 反序列化主要是将Byte数组转换为之前封装在 common 工程的RowData类 ##### 定义MySQL消息etl抽象类 后面不少的业务逻辑(订单、订单明细、商品等)需要共用一份Kafka数据(从一个topic中拉取),抽取抽象类的目的是共用一个FlinkKafkaConsumer,因为后面创建FlinkKafkaConsumer整合Kafka需要使用到Flink流式运行环境,需要在主构造器中传入Flink流式运行环境。该ETL抽象类需要从BaseETL继承。 ##### 创建订单实时etl处理类 在etl包下创建OrderETL类,用于后续订单实时拉宽处理,此时只用来进行测试 ##### 编写App测试 * 创建OrderETL实例对象,并调用process方法执行测试 * 启动canal-client客户端程序订阅binlog消息 ```scala val orderETL = new OrderETL(env) orderETL.process() ``` #### 整合 kafka 消费字符串类型消息 ##### 定义 MQBase 消息 et l抽象类 后面不少的业务逻辑(购物车、评论、点击流等)需要共用一份Kafka数据,抽取抽象类的目的是共用一个FlinkKafkaConsumer,因为后面创建 FlinkKafkaConsumer 整合 Kafka 需要使用到Flink流式运行环境,需要在主构造器中传入Flink流式运行环境。该ETL抽象类需要从BaseETL继承。 #### 整合Kafka消费购物车消息 ##### 在application.conf中添加配置 ```properties # Kafka 购物车 topic名称 input.topic.cart="ods_carlos_cart" ``` ##### 读取配置 ```scala val `input.topic.cart` = config.getString("input.topic.cart") ``` ##### 创建CartETL类,整合Kafka * 在etl包下创建CartETL,从MQBaseETL继承 * 实现process方法 * 实现getKafkaDS方法,在该方法中整合Kafka,并测试打印消费数据 #### 整合Kafka消费评论消息 ##### 在application.conf中添加配置 ```properties # kafka 评论 topic名称 input.topic.comments="ods_carlos_shop_comments" ``` ##### 读取配置 ```scala val `input.topic.comments` = config.getString("input.topic.comments") ``` ##### 创建 CommentsETL 类,整合 Kafka * 在etl包下创建CommentsETL,从MQBaseETL继承 * 实现process方法,并测试打印消费数据 ```scala /** * 点击流处理逻辑 */ class CommentsETL(env:StreamExecutionEnvironment) extends BaseETL[String] { /** * 业务处理接口 */ override def process(): Unit = { // 1. 整合Kafka val commentsDS: DataStream[String] = getKafkaDataStream(GlobalConfigUtil.`input.topic.comments`) commentsDS.print() } } ``` #### 整合Kafka消费点击流消息 ##### 在application.conf中添加配置 ```properties # Kafka click_log topic名称 input.topic.click_log="ods_carlos_click_log" ``` ##### 读取配置 ```scala val `input.topic.click_log` = config.getString("input.topic.click_log") ``` ##### 创建ClickLogETL类,整合Kafka * 在etl包下创建ClickLogETL,从MQBaseETL继承 * 实现process方法,整合Kafka,并测试打印消费数据 ```scala /** * 点击流处理逻辑 */ class ClickLogETL(env:StreamExecutionEnvironment) extends BaseETL[String] { /** * 业务处理接口 */ override def process(): Unit = { // 1. 整合kafka val clickLogDS: DataStream[String] = getKafkaDataStream(GlobalConfigUtil.`input.topic.click_log`) clickLogDS.print() } } ``` ## Flink实时etl ### 开发环境准备 #### 维度数据全量装载 为了后续将订单、订单明细等数据进行实时ETL拉宽,需要提前将一些维度数据加载一个高性能存储中。此处,选择Redis作为商品维度、商品分类维度、门店维度、运营组织机构维度存储。先一次性将所有MySQL中的维度数据全量装载到Redis中,后续只要MySQL中的维度数据更新,马上使用Flink更新Redis中的维度数据 ##### 创建样例类 * 在 com.carlos.shop.realtime.bean 的 DimEntity 类中创建以下样例类 * DimGoodsDBEntity 商品维度样例类 | 列名 | 描述 | | ---------- | ---------- | | goodsName | 商品名称 | | shopId | 店铺id | | goodsCatId | 商品分类id | | shopPrice | 商品价格 | | goodsId | 商品id | * DimGoodsCatDBEntity 商品分类维度样例类 | 列名 | 描述 | | --------- | ------------ | | catId | 商品分类id | | parentId | 商品分类父id | | catName | 商品分类名称 | | cat_level | 商品分类级别 | * DimShopsDBEntity 店铺维度样例类 | 列名 | 描述 | | ----------- | ------------ | | shopId | 店铺id | | areaId | 区域id | | shopName | 店铺名称 | | shopCompany | 店铺公司名称 | * DimOrgDBEntity 机构维度样例表 | 列名 | 描述 | | -------- | -------- | | orgId | 机构id | | parentId | 父机构id | | orgName | 机构名称 | | orgLevel | 机构级别 | - DimShopCatsDBEntity门店商品分类维度样例表 | 列名 | 描述 | | -------- | ---------------- | | catId | 门店商品分类id | | parentId | 门店商品分类父id | | catName | 门店商品分类名称 | | catSort | 门店商品分类级别 | ##### 在配置文件中添加 Redis 配置、MySQL 配置 ```properties # Redis配置 redis.server.ip="hadoop102" redis.server.port=6379 # MySQL配置 mysql.server.ip="hadoop102" mysql.server.port=3306 mysql.server.database="itcast_shop" mysql.server.username="root" mysql.server.password="root" ``` ##### 编写配置工具类 ```scala val `redis.server.ip` = config.getString("redis.server.ip") val `redis.server.port` = config.getString("redis.server.port") val `mysql.server.ip` = config.getString("mysql.server.ip") val `mysql.server.port` = config.getString("mysql.server.port") val `mysql.server.database` = config.getString("mysql.server.database") val `mysql.server.username` = config.getString("mysql.server.username") val `mysql.server.password` = config.getString("mysql.server.password") ``` ##### 编写Redis操作工具类 在utils类中添加RedisUtils类,使用Redis连接池操作Redis ```scala object RedisUtil { val config = new JedisPoolConfig() //是否启用后进先出, 默认true config.setLifo(true) //最大空闲连接数, 默认8个 config.setMaxIdle(8) //最大连接数, 默认8个 config.setMaxTotal(1000) //获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1 config.setMaxWaitMillis(-1) //逐出连接的最小空闲时间 默认1800000毫秒(30分钟) config.setMinEvictableIdleTimeMillis(1800000) //最小空闲连接数, 默认0 config.setMinIdle(0) //每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3 config.setNumTestsPerEvictionRun(3) //对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略) config.setSoftMinEvictableIdleTimeMillis(1800000) //在获取连接的时候检查有效性, 默认false config.setTestOnBorrow(false) //在空闲时检查有效性, 默认false config.setTestWhileIdle(false) var jedisPool: JedisPool = new JedisPool(config, GlobalConfigUtil.`redis.server.ip`, GlobalConfigUtil.`redis.server.port`.toInt) /** * 获取Redis连接 * @return */ def getResouce() = { jedisPool.getResource } } ``` ##### 读取MySQL商品维度数据到Redis 在 com.carlos.shop.realtime.etl.dataloader 包中创建 DimensionDataLoader 单例对象,实现装载商品维度数据 实现步骤: 1、先从MySQL的 itcast_goods 表中加载数据 2、将数据保存到键为 carlos_goods:dim_goods 的 HASH 结构中 3、关闭资源 ##### 读取 MySQL店铺维度数据到Redis 实现步骤: 1、先从MySQL的 itcast_shops 表中加载数据 2、将数据保存到键为 carlos_goods:dim_shops 的 HASH 结构中 3、关闭资源 ##### 读取MySQL商品分类维度数据到Redis 实现步骤: 1、先从MySQL的 itcast_goods_cats 表中加载数据 2、将数据保存到键为 carlos_shop:dim_goods_cats 的 HASH 结构中 3、关闭资源 ##### 读取MySQL组织结构数据到Redis 实现步骤: 1、先从MySQL的 itcast_org 表中加载数据 2、将数据保存到键为 carlos_shop:dim_org 的 HASH 结构中 3、关闭资源 ##### 读取MySQL门店商品分类维度数据到Redis 实现步骤: 1、先从MySQL的 itcast_shop_cats表中加载数据 2、将数据保存到键为 carlos_shop:dim_shop_cats 的 HASH 结构中 3、关闭资源 维度数据增量更新 #### 创建同步Redis中维度数据ETL处理类 在etl包下创建SyncDimDataETL类,继承MySqlBaseETL特质,实现process方法 ```scala /** * Redis维度数据同步业务 */ class SyncDimDataETL(env: StreamExecutionEnvironment) extends MySqlBaseETL(env) { /** * 业务处理接口 */ override def process(): Unit = { } } ``` ##### 在App中调用同步ETL 1、在App中创建实时同步ETL,并调用处理方法 ```scala val syncRedisDimDataETL = new SyncRedisDimDataETL(env) syncRedisDimDataETL.process() ``` 2、启动Flink程序 3、测试 * 新增MySQL中的一条表数据,测试Redis中的数据是否同步更新 * 修改MySQL中的一条表数据,测试Redis中的数据是否同步更新 * 删除MySQL中的一条表数据,测试Redis中的数据是否同步更新 查看redis中的数据 ### 点击流消息实时拉宽处理 #### Apache HTTPD和NGINX访问日志解析器 这是一个Logparsing框架,旨在简化[Apache HTTPD](https://httpd.apache.org/)和[NGINX](https://nginx.org/)访问日志文件的解析。 基本思想是,您应该能够拥有一个解析器,可以通过简单地告诉该行写入了哪些配置选项来构造该解析器。这些配置选项是访问日志行的架构。 ##### 导入依赖 ```xml nl.basjes.parse.httpdlog httpdlog-parser 5.2 ``` ##### nginx日志样本 在nginx的conf目录下的nginx.conf文件中可以配置日志打印的格式,如下: ``` #log_format main  '$remote_addr - $remote_user [$time_local] [$msec] [$request_time] [$http_host] "$request" '                     '$status $body_bytes_sent "$request_body" "$http_referer" '                     '"$http_user_agent" $http_x_forwarded_for' ``` $remote_addr 对应客户端的地址 $remote_user 是请求客户端请求认证的用户名,如果没有开启认证模块的话是值为空。 $time_local 表示nginx服务器时间 $msec 访问时间与时区字符串形式 $request_time 请求开始到返回时间 $http_host 请求域名 $request 请求的url与http协议 $status 请求状态,如成功200 $body_bytes_sent 表示从服务端返回给客户端的body数据大小 $request_body 访问url时参数 $http_referer 记录从那个页面链接访问过来的 $http_user_agent 记录客户浏览器的相关信息 $http_x_forwarded_for 请求转发过来的地址 $upstream_response_time: 从 Nginx 建立连接 到 接收完数据并关闭连接 ##### 定义格式化字符串 参考:https://httpd.apache.org/docs/current/mod/mod_log_config.html ``` %u %h %l %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" \"%{Cookie}i\" \"%{Addr}i\" ## Flink实时etl ### 点击流消息实时拉宽处理 #### 拉宽点击流消息处理 #### 点击流消息实时拉宽测试 ### 订单数据实时处理 #### 操作步骤 1、在etl包下创建OrderETL类,实现MySqlBaseETL 2、过滤出来表名为itcast_orders,且事件类型为 "insert" 的binlog消息 3、将RowData数据流转换为 OrderDBEntity数据源 4、为了方便落地到 Kafka,再将OrderDBEntity 转换为JSON字符串 5、将转换后的json字符串写入到kafka的**dwd_order**topic中 ### 订单明细数据实时拉宽处理 #### 操作步骤 1、在etl包下创建OrderGoodsEtl类,实现MySqlBaseETl 2、先过滤出表名为 itcast_order_goods,且事件类型为 "insert" 的binlog消息 3、为了方便落地到 Kafka,再将OrderGoodsWideEntity转换为JSON字符串 4、根据以下方式拉宽订单明细数据 * 根据商品id,从redis中商品维度Hash获取商品数据 * 根据店铺id,从redis中门店维护Hash获取门店数据 * 根据商品数据的三级分类id,从redis中的商品分类Hash获取三级分类数据 * 根据三级分类数据的parentid,从redis中的商品分类Hash获取二级分类数据 * 根据二级分类数据的parentid,从redis中的商品分类Hash获取一级分类数据 * 根据门店数据的areaId,从redis中的组织机构Hash获取城市机构数据 * 根据城市机构数据的parentId,从redis中的组织机构Hash获取省份机构数据 5、将拉宽后的订单明细数据写入到hbase数据库中 ### 商品消息实时拉宽处理 #### 操作步骤 1、在etl包下创建GoodsETL类,实现MySqlBaseETL 2、先过滤出表名为 itcast_goods 3、根据以下方式拉宽订单明细数据 * 根据商品id,从redis中商品维度Hash获取商品数据 * 根据店铺id,从redis中门店维护Hash获取门店数据 * 根据商品数据的三级分类id,从redis中的商品分类Hash获取三级分类数据 * 根据三级分类数据的parentid,从redis中的商品分类Hash获取二级分类数据 * 根据二级分类数据的parentid,从redis中的商品分类Hash获取一级分类数据 * 根据门店数据的areaId,从redis中的组织机构Hash获取城市机构数据 * 根据城市机构数据的parentId,从redis中的组织机构Hash获取省份机构数据 4、为了方便落地到 Kafka,再将GoodsWideBean转换为JSON字符串 #### 注册IP库为Flink分布式缓存 ```scala env.registerCachedFile(GlobalConfigUtil.`ip.file.path`, "qqwry.dat") ``` #### 拉宽购物车数据 1、将Kafka的字符串消息转换为实体类 ```scala // 将JSON转换为实体类 val cartBeanDS = kafkaDS.map(CartBean(_)) ``` 2、使用RichMapFunction实现,从Redis中拉取维度数据 3、解析IP地址 4、拉宽日期时间 ### 评论消息实时拉宽处理 #### 操作步骤 1、在etl包下创建CommentsETL类,实现MQBaseETL 2、先过滤出表名为 itcast_goods 3、消费kafka的数据因为kafka的数据是字符串,消费出来需要转换成Comments对象 4、根据以下方式拉宽订单明细数据 * 根据商品id,从redis中商品维度Hash获取商品数据 * 根据店铺id,从redis中门店维护Hash获取门店数据 * 根据商品数据的三级分类id,从redis中的商品分类Hash获取三级分类数据 * 根据三级分类数据的parentid,从redis中的商品分类Hash获取二级分类数据 * 根据二级分类数据的parentid,从redis中的商品分类Hash获取一级分类数据 * 根据门店数据的areaId,从redis中的组织机构Hash获取城市机构数据 * 根据城市机构数据的parentId,从redis中的组织机构Hash获取省份机构数据 4、为了方便落地到 Kafka,再将CommentsWideEntity转换为JSON字符串 ## 使用JDBC查询Druid中的数据 ### 安装 imply-3.0.4 Imply-3.0.4 基于 apache-druid-0.15.0-Incubating - 1、下载imply ```shell cd /opt/module/ wget https://static.imply.io/release/imply-3.0.4.tar.gz ``` - 2、直接使用资料 imply安装包jps ```shell script 将该 `imply安装包\imply-3.0.4.tar.gz` 安装包上传到 /exports/softwares ``` - 3、解压imply-3.0.4 ```shell script tar -xvzf imply-3.0.4.tar.gz -C ../module cd ../module/imply-3.0.4 ``` - 4、配置 imply-3.0.4 mysql中创建imply相关的数据库 ```sql CREATE DATABASE `druid` DEFAULT CHARACTER SET utf8; CREATE DATABASE `pivot` DEFAULT CHARACTER SET utf8; ``` ### 修改并上传配置文件 修改 conf/druid/_common/common.runtime.properties 文件 修改zookeeper的配置 ```shell script druid.zk.service.host=hadoop102:2181,hadoop103:2181,hadoop104:2181 ``` 修改MySQL的配置 ```shell script druid.metadata.storage.type=mysql druid.metadata.storage.connector.connectURI=jdbc:mysql://hadoop102:3306/druid druid.metadata.storage.connector.user=root druid.metadata.storage.connector.password=root ``` 修改 conf/pivot/config.yaml 配置文件 - 修改mysql的配置 ```shell script stateStore: type: mysql location: mysql connection: 'mysql://root:root@hadoop102:3306/pivot' ``` 将配置好的 imply 分发到不同节点 - 分发脚本 xsync ```shell script #!/bin/bash #1. 判断参数个数 if [ $# -lt 1 ] then echo Not Enough Arguement! exit; fi #2. 遍历集群所有机器 for host in hadoop102 hadoop103 hadoop104 do echo ==================== $host ==================== #3. 遍历所有目录,挨个发送 for file in $@ do #4 判断文件是否存在 if [ -e $file ] then #5. 获取父目录 pdir=$(cd -P $(dirname $file); pwd) #6. 获取当前文件的名称 fname=$(basename $file) ssh $host "mkdir -p $pdir" rsync -av $pdir/$fname $host:$pdir else echo $file does not exists! fi done done ``` imply 分发 ```shell script [carlos@hadoop102 ~]$ xsync imply-3.0.4 ``` ### 配置环境变量 在每台服务器上配置DRUID_HOME环境变量 sudo vim /etc/profile.d/my_env.sh ```shell script # DRUID export DRUID_HOME=/opt/module/imply-3.0.4 ``` source /etc/profile.d/my_env.sh ### 启动 imply 集群 1、启动zk集群 https://blog.csdn.net/Aeve_imp/article/details/107597274 2、hadoop102节点(使用外部zk而不使用imply自带zk启动overlord和coordinator) ```shell script # 使用外部 zk 而不使用imply自带zk启动overlord和coordinator [carlos@hadoop102 imply-3.0.4]$ bin/supervise -c /opt/module/imply-3.0.4/conf/supervise/master-no-zk.conf ``` 3、hadoop103节点(启动historical和middlemanager) ```shell script [carlos@hadoop103 imply-3.0.4]$ bin/supervise -c /opt/module/imply-3.0.4/conf/supervise/data.conf ``` 4、hadoop104节点(启动broker和router) ```shell script [carlos@hadoop104 imply-3.0.4]$ bin/supervise -c /opt/module/imply-3.0.4/conf/supervise/query.conf ``` **注意事项** - 如果希望imply运行在后台,在每个执行命令后面加 --daemonize #### 访问WebUI | **组件名** | **URL** | | ------------------------- | ---------------------------------- | | broker | http://hadoop104:8888 | | coordinator、overlord | http://hadoop102:8081/index.html | | middleManager、historical | http://hadoop102:8090/console.html | Druid提供了JDBC接口,JavaWeb项目可以直接使用 JDBC 连接Druid进行实时数据分析。 需求: - 获取 metrics-kafka 数据源中,不同用户的访问次数 实现步骤: 1、创建 druid_jdbc Maven模块 2、导入依赖 3、编写JDBC代码连接Druid获取数据 - 加载Druid JDBC驱动 - 获取Druid JDBC连接 - 构建SQL语句 - 构建Statement,执行SQL获取结果集 - 关闭Druid连接 # 数据可视化项目 操作步骤: 1、导入 carlos_dw_web 项目 2、修改 DashboardServiceImpl.java 中 Redis 服务器地址 3、修改 utils.DruidHelper 中Druid的 url 地址 4、修改druid连接字符串的表名:dws_od改成dws_order 5、启动 Jetty 服务器 6、打开浏览器访问 http://localhost:8080/itcast_dw_web ## Superset ### BI VS 报表工具 * 报表工具是数据展示工具,而BI(商业智能)是数据分析工具。报表工具可以制作各类数据报表、图形报表的工具,甚至还可以制作电子发票联、流程单、收据等。 * BI可以将数据进行模型构建,制作成Dashboard,相比于报表,侧重点在于分析,操作简单、数据处理量大。常常基于企业搭建的数据平台,连接数据仓库进行分析。 #### 安装教程 1. xxxx 2. xxxx 3. xxxx #### 参与贡献 1. Fork 本仓库 2. 新建 Feat_xxx 分支 3. 提交代码 4. 新建 Pull Request