# 实时计算-电商大屏数据看板 **Repository Path**: chuming/bulletin-board ## Basic Information - **Project Name**: 实时计算-电商大屏数据看板 - **Description**: 基于kafka,storm,redis,flask,echarts构建的实时数据看板,每秒刷新订单频率多线程可达数百。 - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 4 - **Created**: 2022-04-30 - **Last Updated**: 2024-02-26 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 实时计算-电商大屏数据看板 #### 介绍 某在线电子图书商城当前注册会员数有近千万会员,遍布于全国多个省份,日常在线业务量较大。为了提高商城的用户知名度,该公司拟计划开展大型促销活动(类似于双十一的电商购物活动)。为了准确、实时统计当前销售数量,公司拟委托你开发一个促销活动的大数据看板,用于当日活动启动后销售订单实时数据的变化情况。 #### 功能需求 运用实时计算技术、Web 技术构建一个图书商城的促销活动大数据看板,实现以下功能: (1) 界面每 3 秒刷新一次,如果能做到 2 秒或 1 秒刷新一次,可加分; (2) 展示当前订单的已处理速度,单位为“条/秒”; (3) 大数据看板界面应简洁大气,有特点; (4) 展示截止当前时间的总销售金额、订单数量、下单客户数基本信息; (5) 展示截止当前时间销量排名前 10 的图书排行榜; (6) 展示截止当前时间销量排名前 10 的出版社排行榜; (7) 展示全国各地下单客户的累计数量(按省份),在地图上直观展示; (8) 数据统计误差(数据丢失、统计错误)不超过 1%,应设计实验计算数据误差率; (9) 展示的数据延迟应不超过 30 秒,每次刷新时应显示获取的数据最新时间; (10) 测试出系统的最大承载负荷量,即最大并发线程数; #### 软件架构 使用python API(即pymysql-replication 中的BinLogStreamReader)读取mysql中的binlog日志(以latest增量读取)递送给kafka消息队列,再由kafka分发给flink处理,将处理好的数据存入redis,通过flask和echarts实现可视化web界面。 #### 单元实现 1. 数据采集 首先需要使mysql数据库开启binlog日志的服务,同时对创建的用户赋予相关权限,然后通过pymysql-replication监控MySQL的binlog变动,对于binlog日志中新增的写操作(即增、删、改)进行流处理,最后使用kafka的producer将binlogstreamreader读取到的数据send到kafka的消息队列中以供后续的分发和订阅。 2. 数据分发和订阅 Kafka集群通过三个节点来构建,一个用于producer,两个用于consumer,从而实现消息的分发,flink集群通过flink中的datastream流处理,利用flinkkafkaconsumer可以使用两个流同时订阅kafka两个topic中的消息。分发订阅过程见图11。 3. 数据统计和计算 Flink订阅到kafka中的消息后,对两个流采用不同的算子同时执行不同的统计和计算,最终以自增的方式写入redis数据库,流处理过程见图12。 #### 实验设计与结果分析 |参数项 |系统的延迟时间(s)|能处理的数据量(个)|CPU使用率(%)|内存占用率(%)|并发任务数(个)| |---|---|---|---|---|---| |线程数1|0.1|30|50|87|3| |线程数10|0.1|70-80|65|90|3| |线程数20|1|100-120|75|93|3| |线程数50|1|180-210|95|97|3| #### 项目特色 1. 在数据提取部分使用了python API读取mysql数据,相较于JAVA以及MAXWELL、CANAL等工具,python API在性能相差不大的基础上大大降低了环境搭建的复杂程度,有利于项目的开发和维护。 2. 在实时流计算处理方面,相较于Strom单/多线程吞吐量及各项性能指标对比,Flink各方面都占有较大优势。 3. 在实时面板显示方面,并没有套用网上已存在的模板,而是通过自写的flask-ajax-echarts脚本实现了整个web页面的设计,因而看板显示并不复杂冗余,简单明了的实现了各项任务点。 #### 主要问题 flume集群搭建问题 1.java.io.FileNotFoundException: /tmp/flume/status/sqlSource.status (No such file or directory) 原因分析:没有创建日志文件的目录 解决方法:新建文件夹/tmp/flume/status 2. flume采集数据报错 :java.lang.OutOfMemoryError: GC overhead limit exceeded 原因分析:flume接受消息过多导致gc 解决方法:进入flume bin目录下,修改flume-ngJAVA_OPTS=’-Xmx10240m’ kafka集群搭建问题 1.Recovery is suppressed by NoRestartBackoffTimeStrategy 原因分析: kafka配置文件中的listeners默认参数是localhost 解决方法:改为虚拟机对应的ip地址 2. Unable to read additional data from server sessionid 0x0 原因分析:kafka版本更换与zk不符 解决方法:重启下zookeeper leader 3. kafka服务宕机:org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) 原因分析:kafka版本问题 解决方法:将kafka更换至与scala相匹配的1.0之后的版本 4. “锁定文件失败打不开磁盘“XXX.vmdk”或它所依赖的某个快照磁盘模块“Disk”启动失败” 原因分析:使用VMWare虚拟机完后,没有退出就关机了 解决方法:找到虚拟机文件夹下面删除.lck文件,关机重新运行虚拟机 pyflink环境依赖问题 1.error:curl: (35) SSL connect error 原因分析:curl版本较低 解决方法:yum -y update curl 2.vmvare中python版本过低 原因分析:虚拟机自带的python只有2.6,而pyflink所需python版本必须为3.5、3.6、3.7中的一种 解决方法:更新虚拟机的python kafka消息生产问题 1.The last packet sent successfully to the server was 0 milliseconds ago 原因分析:由于数据库回收了连接,而系统的缓冲池不知道,继续使用被回收的连接所致的。 解决方法: 将mysql回收空闲连接的时间变长 调整my.ini配置。 [mysqld] wait_timeout=86400 2.KafkaAdminClient not found 原因分析:kafka0-1版本(刚开始用的0.9.1)不支持adminclient。 解决方法:将kafka版本替换至1.0.0。 3. producer发出的信息为unicode编码格式 原因分析:默认编码格式及unicode。 解决方法: value_serializer=lambda v: json.dumps(v,ensure_ascii=False).encode('utf-8') flink流处理相关问题 1.FlinkKafkaConsumer 版本问题 原因分析:flinkkafkaconsumer根据flink的版本不同可以调用09,010之类的指定版本consumer,但是无法使用反序列化优化器simplestringschema,报错显示没有这个构造函数。 解决方法:使用flinkkafkaconsumer基类(基于flink1.12.1)。 2.Pom.xml依赖报错无法构建环境 原因分析:idea maven环境配置问题。 解决方法:需要下载apache的maven包并添加阿里云的镜像环境。 3. JRedisUtil对象无法实例化 原因分析:非静态方法无法实例化。 解决方法:在类中定义redis对象,同时能提升很多速度。 public static JRedisUtil conn = new JRedisUtil("127.0.0.1",6379); 4.Datastream流映射关系 原因分析:flink的datastream对象提取数据。 解决方法:使用map将message映射给字符数组。 5.分词紊乱 原因分析:脚本生成的图书订单中有书名里包含有英文逗号的书,无法通过split(‘,’)获取准确分词。 解决方法:可通过try,cathe将错误捕获,也可以在字符数组分词后判断其长度是否符合正确的分词结果进行筛选。 6.自增插入incrby无法统计排行类数据 原因分析:incrby排行类数据会在redis里 产生多个key,影响查询速度的同时也不利于查询统计数据。 解决方法:可以使用zincrby将排行类或者是同一类数据存入一个有序集合,通过zrange获取有序集合中的数据。 7.double型数据无法直接存储到redis 原因分析:如book_num,total等数据为double类型无法直接zincrby 。 解决方法:可通过重写构造函数对double类型进行修改。 8. java.lang.Error: Unresolved compilation problems 原因分析:编译等级跟jdk不一致 。 解决方法:更换jdk1.8。 9. Caused by: java.lang.Exception: Could not create actor system 原因分析: kafak对应的scala版本应和集群中的不一致。 解决方法:修改maven依赖中的scala对应版本。 10. java.lang.ClassNotFoundException: redis.clients.jedis.Jedis 原因分析:redis版本问题。 解决方法:将redis jar包换成2.9,redis使用6.2.1。 mysql相关问题 1.无法访问脚本run.exe产生的数据库表 原因分析:order是mysql的保留字符,无法直接使用 解决方法:使用`order`。 2.mysql遍历查询速度较慢问题 原因分析:每次执行查询时都需要遍历整个mysql数据库,不满足实时计算的需求。 解决方法:对mysql的binlog实现增量读取。 3.虚拟机无法访问本地mysql 原因分析:用户没有相对应的远程访问权限 解决方法: 创建远程用户 create user root@'192.168.133.140' identified by '123456'; 赋予远程访问权限 grant all privileges on *.* to root@'192.168.133.140' with grant option; ALTER USER 'root'@'192.168.133.140' IDENTIFIED BY '123456' PASSWORD EXPIRE NEVER; ALTER USER 'root'@'192.168.133.140' IDENTIFIED WITH mysql_native_password BY '123456'; FLUSH PRIVILEGES; 4. java读文件时,明明文件存在,却报错java.io.FileNotFoundException 原因分析:更改文件名之后,流程比OS处理的快,缓存暂时没有同步过来 解决方法:在读文件的之前加入Thread.sleep(500) 5.mysql未开启binlog服务 原因分析:mysql未开启binlog服务 解决方法:修改配置文件/etc/my.cnf [mysqld] server_id=10000 log_bin = mysql-bin binlog_format = ROW 重启mysql服务 6. 虚拟机无法访问本地(ping不通本机) 原因分析:防火墙不允许连接 解决方法:需要修改本地防火墙的入站连接为允许 详情见:https://blog.csdn.net/qq_42500577/article/details/95954948 maxwell相关问题 1.无法识别insert 原因分析:maxwell的schema_database数据库和表是不监控的,在这个数据库里插入或删除数据不会被kafka消费 解决方法:更换插件