1 Star 0 Fork 3

anyine / carlos_shop_warehouse

forked from Carlos / carlos_shop_warehouse 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

carlos_shop_ warehouse

介绍

Flink 实时数仓

实现方案

JAVA 方式实现

  • 一些中小企业当中,由于数据量较小(比如核心总量小于20万条),可通过Java程序定时查询mysql实现
  • 比较简单,但是粗暴实用
  • 仅仅需要对mysql做一些优化即可,比较增加索引

通过flink方案实现

  • 数据量特别大、无法直接通过mysql查询完成,有时候根本查询不动
  • 要求实时性高,比如阿里巴巴双十一监控大屏,要求延迟不超过1秒

实时数仓项目架构 architecture

Canal介绍

简介

  • 基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

  • 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更

  • 从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务,基于日志增量订阅和消费的业务包括

    • 数据库镜像
    • 数据库实时备份
    • 索引构建和实时维护(拆分异构索引、倒排索引等)
    • 业务 cache 刷新
    • 带业务逻辑的增量数据处理
  • 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

  • github地址:https://github.com/alibaba/canal

环境部署

MySQL

  • MySQL需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,/etc/my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER root IDENTIFIED BY 'root';  
    GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' ;
    FLUSH PRIVILEGES;

Canal安装

architecture

注意:本项目使用的版本 canal1.0.24

环境要求:

  • 安装好 ZooKeeper
  • 解压缩

    mkdir /opt/module/canal
    tar -zxvf canal.deployer-1.0.24.tar.gz  -C /opt/module/canal/
  • 解压完成后,进入 /opt/module/canal/ 目录,可以看到如下结构

    drwxr-xr-x. 2 root root 4096 2月   1 14:07 bin
    drwxr-xr-x. 4 root root 4096 2月   1 14:07 conf
    drwxr-xr-x. 2 root root 4096 2月   1 14:07 lib
    drwxrwxrwx. 2 root root 4096 4月   1 2017 logs
  • canal server的conf下有几个配置文件

    [root@hadoop102 canal]# tree conf/ 
    conf/
    ├── canal.properties
    ├── example
    │   └── instance.properties
    ├── logback.xml
    └── spring
        ├── default-instance.xml
        ├── file-instance.xml
        ├── group-instance.xml
        ├── local-instance.xml
        └── memory-instance.xml
  • 修改instance 配置文件

    vim conf/example/instance.properties

    ## mysql serverId,这里的slaveId不能和myql集群中已有的server_id一样
    canal.instance.mysql.slaveId = 1234
    
    #  按需修改成自己的数据库信息
    #################################################
    ...
    canal.instance.master.address=hadoop102:3306
    # username/password,数据库的用户名和密码
    ...
    canal.instance.dbUsername = root
    canal.instance.dbPassword = root
    #################################################
  • 启动

    sh bin/startup.sh

Canal客户端开发

<dependencies>
	<dependency>
    	<groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.0.24</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.58</version>
    </dependency>
</dependencies>

在canal_demo模块创建包结构

包名 说明
com.carlos.canal_demo 代码存放目录

开发步骤

  1. 创建 Connector
  2. 连接 Cannal 服务器,并订阅
  3. 解析 Canal 消息,并打印

Canal 消息格式

转换为 JSON 数据

  • 复制上述代码,将 binlog 日志封装在一个Map结构中,使用 fastjson 转换为 JSON 格式
Entry  
    Header  
        logfileName [binlog文件名]  
        logfileOffset [binlog position]  
        executeTime [binlog里记录变更发生的时间戳,精确到秒]  
        schemaName   
        tableName  
        eventType [insert/update/delete类型]  
    entryType   [事务头BEGIN/事务尾END/数据ROWDATA]  
    storeValue  [byte数据,可展开对应的类型为RowChange]  
RowChange
    isDdl       [是否是ddl变更操作比如create table/drop table]
    sql         [具体的ddl sql]
rowDatas    [具体insert/update/delete的变更数据可为多条1个binlog event事件可对应多条变更比如批处理]
    beforeColumns [Column类型的数组变更前的数据字段]
    afterColumns [Column类型的数组变更后的数据字段]
    Column
    index
    sqlType     [jdbc type]
    name        [column name]
    isKey       [是否为主键]
    updated     [是否发生过变更]
    isNull      [值是否为null]
    value       [具体的内容注意为string文本]
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件 log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据,以此来达到数据一致。

mysql的binlog

它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间。主要用来备份和数据同步。

binlog 有三种: STATEMENT、ROW、MIXED

  • STATEMENT 记录的是执行的sql语句
  • ROW 记录的是真实的行数据记录
  • MIXED 记录的是1+2,优先按照1的模式记录

名词解释

什么是中继日志

从服务器I/O线程将主服务器的二进制日志读取过来记录到从服务器本地文件,然后从服务器SQL线程会读取 relay-log 日志的内容并应用到从服务器,从而使从服务器和主服务器的数据保持一致

canal 工作原理

img

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

架构

img

  • server 代表一个 canal 运行实例,对应于一个 jvm
  • instance 对应于一个数据队列 (1个 canal server 对应 1..n 个 instance )
  • instance 下的子模块
    • eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
    • eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作
    • eventStore: 数据存储
    • metaManager: 增量订阅 & 消费信息管理器

EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功 ),传送成功之后更新Log Position。流程图如下:

image-20200214104557452

  • EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是连接EventParser和EventStore的桥梁。
  • EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。
  • MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:
    • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象]
    • void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
    • void ack(long batchId),顾名思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

开发Canal客户端订阅binlog消息

在canal_client模块创建包结构

包名 说明
com.carlos.canal_client 存放入口、Canal客户端核心实现
com.carlos.canal_client.util 存放工具类
com.carlos.canal_client.kafka 存放Kafka生产者实现

添加配置文件

# canal配置
canal.server.ip=node1
canal.server.port=11111
canal.server.destination=example
canal.server.username=canal
canal.server.password=canal
canal.subscribe.filter=itcast_shop.*

# zookeeper配置
zookeeper.server.ip=hadoop102:2181,hadoop103:2181,hadoop104:2181

# kafka配置
kafka.bootstrap_servers_config=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka.batch_size_config=1024
kafka.acks=all
kafka.retries=0
kafka.client_id_config=carlos_shop_canal_click
kafka.key_serializer_class_config=org.apache.kafka.common.serialization.StringSerializer
kafka.value_serializer_class_config=com.carlos.canal.protobuf.ProtoBufSerializer
kafka.topic=ods_carlos_shop_mysql

编写CanalClient客户端核心实现类

编写Entrance入口

/**
 * 入口
 */
public class Entrance {
    public static void main(String[] args) {
        CanalClient canalClient = new CanalClient();
        canalClient.start();
    }
}

使用ProtoBuf序列化binlog消息

在 carlos_shop_common 模块创建包结构

包名 说明
com.carlos.canal.bean 存放通用的JavaBean
com.carlos.canal.protobuf 实现Protobuf相关接口、实现

生产ProtoBuf消息到Kafka中

实现KafkaSender

该类用于生成数据到Kafka

/**
 * Kafka生产者
 */
public class KafkaSender {
    private Properties kafkaProps = new Properties();
    private KafkaProducer<String, RowData> kafkaProducer;

    public KafkaSender() {
        kafkaProps.put("bootstrap.servers", ConfigUtil.kafkaBootstrap_servers_config());
        kafkaProps.put("acks", ConfigUtil.kafkaAcks());
        kafkaProps.put("retries", ConfigUtil.kafkaRetries());
        kafkaProps.put("batch.size", ConfigUtil.kafkaBatch_size_config());
        kafkaProps.put("key.serializer", ConfigUtil.kafkaKey_serializer_class_config());
        kafkaProps.put("value.serializer", ConfigUtil.kafkaValue_serializer_class_config());

        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public void send(RowData rowData) {
        kafkaProducer.send(new ProducerRecord<>(ConfigUtil.kafkaTopic(), null, rowData));
    }
}

CanalClient调用KafkaSender生产数据

public class CanalClient {
	// ...
    private KafkaSender kafkaSender;
    
        // 开始监听
    public void start() {
        	...
            RowData rowData = new RowData(binlogMsgMap);
            kafkaSender.send(rowData);
    }

创建Kafka topic并执行测试

# 创建topic
bin/kafka-topics.sh --create --zookeeper node1:2181 --topic ods_itcast_shop_mysql --replication-factor 3 --partitions 3
# 创建控制台消费者测试
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic ods_itcast_shop_mysql --from-beginning

Flink实时ETL项目初始化

拷贝配置文件

#
#kafka的配置
#
# Kafka集群地址
bootstrap.servers="hadoop102:9092,hadoop103:9092,hadoop104:9092"
# ZooKeeper集群地址
zookeeper.connect="hadoop102:2181,hadoop103:2181,hadoop104:2181"
# 消费组ID
group.id="carlos"
# 自动提交拉取到消费端的消息offset到kafka
enable.auto.commit="true"
# 自动提交offset到zookeeper的时间间隔单位(毫秒)
auto.commit.interval.ms="5000"
# 每次消费最新的数据
auto.offset.reset="latest"
# kafka序列化器
key.serializer="org.apache.kafka.common.serialization.StringSerializer"
# kafka反序列化器
key.deserializer="org.apache.kafka.common.serialization.StringDeserializer"

# ip库本地文件路径
ip.file.path="E:/2021-05-10/carlos_shop_parent/data/qqwry.dat"

# Redis配置
redis.server.ip="hadoop102"
redis.server.port=6379

# MySQL配置
mysql.server.ip="hadoop102"
mysql.server.port=3306
mysql.server.database="itcast_shop"
mysql.server.username="root"
mysql.server.password="root"

# Kafka Topic 名称
input.topic.canal="ods_carlos_shop_mysql"
# Kafka click_log topic 名称
input.topic.click_log="ods_carlos_click_log"
# Kafka 购物车 topic 名称
input.topic.cart="ods_carlos_cart"
# kafka 评论 topic 名称
input.topic.comments="ods_carlos_comments"

# Druid Kafka 数据源 topic名称
output.topic.order="dwd_order"
output.topic.order_detail="dwd_order_detail"
output.topic.cart="dwd_cart"
output.topic.clicklog="dwd_click_log"
output.topic.goods="dwd_goods"
output.topic.ordertimeout="dwd_order_timeout"
output.topic.comments="dwd_comments"

# HBase订单明细表配置
hbase.table.orderdetail="dwd_order_detail"
hbase.table.family="detail"

拷贝以下内容到resources/log4j.properties

log4j.rootLogger=warn,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n

拷贝一下内容到resources/hbase-site.xml

在scala目录中创建以下包结构:

包名 说明
com.carlos.shop.realtime.etl.app 程序入口
com.carlos.shop.realtime.etl.async 异步IO相关
com.carlos.shop.realtime.etl.bean 实体类
com.carlos.shop.realtime.etl.utils 工具类
com.carlos.shop.realtime.etl.process 实时ETL处理
com.carlos.shop.realtime.etl.dataloader 维度数据离线同步

编写工具类加载配置文件

  • 在 util 包下创建 GlobalConfigUtil 单例对象
  • 编写代码
    • 使用 ConfigFactory.load 获取配置对象
    • 调用config.getString方法加载 application.conf 配置
    • 添加一个main方法测试,工具类是否能够正确读取出配置项 参考代码
import com.typesafe.config.{Config, ConfigFactory}

object GlobalConfigUtil {
  private val config: Config = ConfigFactory.load()

  val `bootstrap.servers` = config.getString("bootstrap.servers")
  val `zookeeper.connect` = config.getString("zookeeper.connect")
  val `input.topic.canal` = config.getString("input.topic.canal")
  val `input.topic.click_log` = config.getString("input.topic.click_log")
  val `input.topic.comments` = config.getString("input.topic.comments")
  val `group.id` = config.getString("group.id")
  val `enable.auto.commit` = config.getString("enable.auto.commit")
  val `auto.commit.interval.ms` = config.getString("auto.commit.interval.ms")
  val `auto.offset.reset` = config.getString("auto.offset.reset")
  val `key.serializer` = config.getString("key.serializer")
  val `key.deserializer` = config.getString("key.deserializer")
  val `output.topic.order` = config.getString("output.topic.order")
  val `output.topic.order_detail` = config.getString("output.topic.order_detail")
  val `output.topic.cart` = config.getString("output.topic.cart")
  val `output.topic.clicklog` = config.getString("output.topic.clicklog")
  val `output.topic.goods` = config.getString("output.topic.goods")
  val `output.topic.ordertimeout` = config.getString("output.topic.ordertimeout")
  val `output.topic.comments` =  config.getString("output.topic.comments")
  val `hbase.table.orderdetail` = config.getString("hbase.table.orderdetail")
  val `hbase.table.family` = config.getString("hbase.table.family")
  val `redis.server.ip` = config.getString("redis.server.ip")
  val `redis.server.port`: String = config.getString("redis.server.port")
  val `ip.file.path` = config.getString("ip.file.path")
  val `mysql.server.ip` = config.getString("mysql.server.ip")
  val `mysql.server.port` = config.getString("mysql.server.port")
  val `mysql.server.database` = config.getString("mysql.server.database")
  val `mysql.server.username` = config.getString("mysql.server.username")
  val `mysql.server.password` = config.getString("mysql.server.password")
  val `input.topic.cart` = config.getString("input.topic.cart")

  def main(args: Array[String]): Unit = {
    println(`bootstrap.servers`)
    println(`zookeeper.connect`)
    println(`input.topic.canal`)
    println(`input.topic.click_log`)
    println(`group.id`)
    println(`enable.auto.commit`)
    println(`auto.commit.interval.ms`)
    println(`auto.offset.reset`)
    println(`output.topic.order`)
    println(`hbase.table.family`)
  }
}

导入 Redis 连接池工具类

导入配置文件:工具类/1.redis连接池工具类/RedisUtil.scala 到 utils 目录

导入 Hbase 连接池工具类

导入配置文件:工具类/2.hbase 连接池工具类到 utils 目录

初始化Flink流式计算程序

  • 创建App单例对象,初始化Flink运行环境
  • 创建main方法
  • 编写代码
    • 获取StreamExecutionEnvironment运行环境
    • 将Flink默认的开发环境并行度设置为1
    • 开启checkpoint
    • 编写测试代码,测试 Flink 程序是否能够正确执行

注意事项

  • 一定要导入 import org.apache.flink.api.scala._ 隐式转换,否则Flink程序无法执行

实时 etl 特质抽取

定义特质

该特质主要定义统一执行ETL处理,只有一个process方法,用于数据的接入、etl、落地。

根据数据来源抽取etl的抽象类

对于来自于mysql的binlog日志的数据,抽取出来mysql的基类

对于日志数据,封装来自消息队列的基类

编写 Flink 程序解析 Kafka 中的 ProtoBuf

抽取Flink整合Kafka配置

因为后续的ETL处理,都是从Kafka中拉取数据,都需要共用Kafka的配置,所以将Kafka的配置单独抽取到工具类中

ods层接入kafka

在utils包下创建KafkaConsumerProp

import java.util.Properties

object KafkaProp {
  /**
    * 读取Kafka属性配置
    * @return
    */
  def getProperties() = {
    // 1. 读取Kafka配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", GlobalConfigUtil.`bootstrap.servers`)
    properties.setProperty("zookeeper.connect", GlobalConfigUtil.`zookeeper.connect`)
    properties.setProperty("group.id", GlobalConfigUtil.`group.id`)
    properties.setProperty("enable.auto.commit", GlobalConfigUtil.`enable.auto.commit`)
    properties.setProperty("auto.commit.interval.ms", GlobalConfigUtil.`auto.commit.interval.ms`)
    properties.setProperty("auto.offset.reset", GlobalConfigUtil.`auto.offset.reset`)
    properties.setProperty("key.serializer", GlobalConfigUtil.`key.serializer`)
    properties.setProperty("key.deserializer", GlobalConfigUtil.`key.deserializer`)

    properties
  }
}

Flink整合kafka数据源

数据来源主要是分为三个途径:

  • 对于商品、订单、订单明细等等数据主要是来自于mysql的binlog日志
  • 对于购物车、评论等数据主要是java后台程序直接推送到kafka集群中
  • 对于点击流日志主要来自于nginx服务器使用flume采集到kafka集群中

整合 Kafka 消费 binlog 消息

在抽象类MySqlBaseETL中,实现Flink整合Kafka。

操作步骤:

1、自定义 ProtoBuf 反序列化

  • 因为 Canal 采集到的数据是以 ProtoBuf 形式推入到 Kafka 中的,故应该使用 ProtoBuf 来进行反序列化

2、Flink 整合 Kafka

3、创建订单实时 etl 处理类

4、编写 App 测试

自定义 ProtoBuf 反序列化

反序列化主要是将Byte数组转换为之前封装在 common 工程的RowData类

定义MySQL消息etl抽象类

后面不少的业务逻辑(订单、订单明细、商品等)需要共用一份Kafka数据(从一个topic中拉取),抽取抽象类的目的是共用一个FlinkKafkaConsumer,因为后面创建FlinkKafkaConsumer整合Kafka需要使用到Flink流式运行环境,需要在主构造器中传入Flink流式运行环境。该ETL抽象类需要从BaseETL继承。

创建订单实时etl处理类

在etl包下创建OrderETL类,用于后续订单实时拉宽处理,此时只用来进行测试

编写App测试
  • 创建OrderETL实例对象,并调用process方法执行测试
  • 启动canal-client客户端程序订阅binlog消息
val orderETL = new OrderETL(env)
orderETL.process()

整合 kafka 消费字符串类型消息

定义 MQBase 消息 et l抽象类

后面不少的业务逻辑(购物车、评论、点击流等)需要共用一份Kafka数据,抽取抽象类的目的是共用一个FlinkKafkaConsumer,因为后面创建 FlinkKafkaConsumer 整合 Kafka 需要使用到Flink流式运行环境,需要在主构造器中传入Flink流式运行环境。该ETL抽象类需要从BaseETL继承。

整合Kafka消费购物车消息

在application.conf中添加配置
# Kafka 购物车 topic名称
input.topic.cart="ods_carlos_cart"
读取配置
val `input.topic.cart` = config.getString("input.topic.cart")
创建CartETL类,整合Kafka
  • 在etl包下创建CartETL,从MQBaseETL继承
  • 实现process方法
  • 实现getKafkaDS方法,在该方法中整合Kafka,并测试打印消费数据

整合Kafka消费评论消息

在application.conf中添加配置
# kafka 评论 topic名称
input.topic.comments="ods_carlos_shop_comments"
读取配置
val `input.topic.comments` = config.getString("input.topic.comments")
创建 CommentsETL 类,整合 Kafka
  • 在etl包下创建CommentsETL,从MQBaseETL继承
  • 实现process方法,并测试打印消费数据
/**
  * 点击流处理逻辑
  */
class CommentsETL(env:StreamExecutionEnvironment) extends BaseETL[String] {

  /**
   * 业务处理接口
   */
  override def process(): Unit = {
    // 1. 整合Kafka
    val commentsDS: DataStream[String] = getKafkaDataStream(GlobalConfigUtil.`input.topic.comments`)
    commentsDS.print()
  }
}

整合Kafka消费点击流消息

在application.conf中添加配置
# Kafka click_log topic名称
input.topic.click_log="ods_carlos_click_log"
读取配置
val `input.topic.click_log` = config.getString("input.topic.click_log")
创建ClickLogETL类,整合Kafka
  • 在etl包下创建ClickLogETL,从MQBaseETL继承
  • 实现process方法,整合Kafka,并测试打印消费数据
/**
  * 点击流处理逻辑
  */
class ClickLogETL(env:StreamExecutionEnvironment) extends BaseETL[String] {
  /**
    * 业务处理接口
    */
  override def process(): Unit = {
      // 1. 整合kafka
    val clickLogDS: DataStream[String] = getKafkaDataStream(GlobalConfigUtil.`input.topic.click_log`)
    clickLogDS.print()
  }
}

Flink实时etl

开发环境准备

维度数据全量装载

为了后续将订单、订单明细等数据进行实时ETL拉宽,需要提前将一些维度数据加载一个高性能存储中。此处,选择Redis作为商品维度、商品分类维度、门店维度、运营组织机构维度存储。先一次性将所有MySQL中的维度数据全量装载到Redis中,后续只要MySQL中的维度数据更新,马上使用Flink更新Redis中的维度数据

创建样例类
  • 在 com.carlos.shop.realtime.bean 的 DimEntity 类中创建以下样例类

  • DimGoodsDBEntity 商品维度样例类

列名 描述
goodsName 商品名称
shopId 店铺id
goodsCatId 商品分类id
shopPrice 商品价格
goodsId 商品id
  • DimGoodsCatDBEntity 商品分类维度样例类
列名 描述
catId 商品分类id
parentId 商品分类父id
catName 商品分类名称
cat_level 商品分类级别
  • DimShopsDBEntity 店铺维度样例类
列名 描述
shopId 店铺id
areaId 区域id
shopName 店铺名称
shopCompany 店铺公司名称
  • DimOrgDBEntity 机构维度样例表
列名 描述
orgId 机构id
parentId 父机构id
orgName 机构名称
orgLevel 机构级别
  • DimShopCatsDBEntity门店商品分类维度样例表
列名 描述
catId 门店商品分类id
parentId 门店商品分类父id
catName 门店商品分类名称
catSort 门店商品分类级别
在配置文件中添加 Redis 配置、MySQL 配置
# Redis配置
redis.server.ip="hadoop102"
redis.server.port=6379

# MySQL配置
mysql.server.ip="hadoop102"
mysql.server.port=3306
mysql.server.database="itcast_shop"
mysql.server.username="root"
mysql.server.password="root"
编写配置工具类
val `redis.server.ip` = config.getString("redis.server.ip")
val `redis.server.port` = config.getString("redis.server.port")
val `mysql.server.ip` = config.getString("mysql.server.ip")
val `mysql.server.port` = config.getString("mysql.server.port")
val `mysql.server.database` = config.getString("mysql.server.database")
val `mysql.server.username` = config.getString("mysql.server.username")
val `mysql.server.password` = config.getString("mysql.server.password")
编写Redis操作工具类

在utils类中添加RedisUtils类,使用Redis连接池操作Redis

object RedisUtil {
  val config = new JedisPoolConfig()

  //是否启用后进先出, 默认true
  config.setLifo(true)
  //最大空闲连接数, 默认8个
  config.setMaxIdle(8)
  //最大连接数, 默认8个
  config.setMaxTotal(1000)
  //获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间,  默认-1
  config.setMaxWaitMillis(-1)
  //逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
  config.setMinEvictableIdleTimeMillis(1800000)
  //最小空闲连接数, 默认0
  config.setMinIdle(0)
  //每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
  config.setNumTestsPerEvictionRun(3)
  //对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断  (默认逐出策略)
  config.setSoftMinEvictableIdleTimeMillis(1800000)
  //在获取连接的时候检查有效性, 默认false
  config.setTestOnBorrow(false)
  //在空闲时检查有效性, 默认false
  config.setTestWhileIdle(false)

  var jedisPool: JedisPool = new JedisPool(config, GlobalConfigUtil.`redis.server.ip`, GlobalConfigUtil.`redis.server.port`.toInt)

  /**
    * 获取Redis连接
    * @return
    */
  def getResouce() = {
    jedisPool.getResource
  }
}
读取MySQL商品维度数据到Redis

在 com.carlos.shop.realtime.etl.dataloader 包中创建 DimensionDataLoader 单例对象,实现装载商品维度数据

实现步骤:

1、先从MySQL的 itcast_goods 表中加载数据

2、将数据保存到键为 carlos_goods:dim_goods 的 HASH 结构中

3、关闭资源

读取 MySQL店铺维度数据到Redis

实现步骤:

1、先从MySQL的 itcast_shops 表中加载数据

2、将数据保存到键为 carlos_goods:dim_shops 的 HASH 结构中

3、关闭资源

读取MySQL商品分类维度数据到Redis

实现步骤:

1、先从MySQL的 itcast_goods_cats 表中加载数据

2、将数据保存到键为 carlos_shop:dim_goods_cats 的 HASH 结构中

3、关闭资源

读取MySQL组织结构数据到Redis

实现步骤:

1、先从MySQL的 itcast_org 表中加载数据

2、将数据保存到键为 carlos_shop:dim_org 的 HASH 结构中

3、关闭资源

读取MySQL门店商品分类维度数据到Redis

实现步骤:

1、先从MySQL的 itcast_shop_cats表中加载数据

2、将数据保存到键为 carlos_shop:dim_shop_cats 的 HASH 结构中

3、关闭资源

维度数据增量更新

创建同步Redis中维度数据ETL处理类

在etl包下创建SyncDimDataETL类,继承MySqlBaseETL特质,实现process方法

/**
  * Redis维度数据同步业务
  */
class SyncDimDataETL(env: StreamExecutionEnvironment) extends MySqlBaseETL(env) {
  /**
    * 业务处理接口
    */
  override def process(): Unit = {
    
  }
}
在App中调用同步ETL

1、在App中创建实时同步ETL,并调用处理方法

    val syncRedisDimDataETL = new SyncRedisDimDataETL(env)
    syncRedisDimDataETL.process()

2、启动Flink程序

3、测试

  • 新增MySQL中的一条表数据,测试Redis中的数据是否同步更新

  • 修改MySQL中的一条表数据,测试Redis中的数据是否同步更新

  • 删除MySQL中的一条表数据,测试Redis中的数据是否同步更新

查看redis中的数据

点击流消息实时拉宽处理

Apache HTTPD和NGINX访问日志解析器

这是一个Logparsing框架,旨在简化Apache HTTPDNGINX访问日志文件的解析。

基本思想是,您应该能够拥有一个解析器,可以通过简单地告诉该行写入了哪些配置选项来构造该解析器。这些配置选项是访问日志行的架构。

导入依赖
<dependency>
    <groupId>nl.basjes.parse.httpdlog</groupId>
    <artifactId>httpdlog-parser</artifactId>
    <version>5.2</version>
</dependency>
nginx日志样本

在nginx的conf目录下的nginx.conf文件中可以配置日志打印的格式,如下:

 #log_format  main   '$remote_addr - $remote_user [$time_local] [$msec] 	
 						[$request_time] [$http_host] "$request" '
                    '$status $body_bytes_sent "$request_body" "$http_referer" '
                    '"$http_user_agent" $http_x_forwarded_for'

$remote_addr 对应客户端的地址

$remote_user 是请求客户端请求认证的用户名,如果没有开启认证模块的话是值为空。

$time_local 表示nginx服务器时间

$msec 访问时间与时区字符串形式

$request_time 请求开始到返回时间

$http_host 请求域名

$request 请求的url与http协议

$status 请求状态,如成功200

$body_bytes_sent 表示从服务端返回给客户端的body数据大小

$request_body 访问url时参数

$http_referer 记录从那个页面链接访问过来的

$http_user_agent 记录客户浏览器的相关信息

$http_x_forwarded_for 请求转发过来的地址

$upstream_response_time: 从 Nginx 建立连接 到 接收完数据并关闭连接

定义格式化字符串

参考:https://httpd.apache.org/docs/current/mod/mod_log_config.html

%u %h %l %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" \"%{Cookie}i\" \"%{Addr}i\"

## Flink实时etl

### 点击流消息实时拉宽处理

#### 拉宽点击流消息处理

#### 点击流消息实时拉宽测试

### 订单数据实时处理

#### 操作步骤

1、在etl包下创建OrderETL类,实现MySqlBaseETL

2、过滤出来表名为itcast_orders,且事件类型为 "insert" 的binlog消息

3、将RowData数据流转换为 OrderDBEntity数据源

4、为了方便落地到 Kafka,再将OrderDBEntity 转换为JSON字符串

5、将转换后的json字符串写入到kafka的**dwd_order**topic中


### 订单明细数据实时拉宽处理

#### 操作步骤

1、在etl包下创建OrderGoodsEtl类,实现MySqlBaseETl

2、先过滤出表名为 itcast_order_goods,且事件类型为 "insert" 的binlog消息

3、为了方便落地到 Kafka,再将OrderGoodsWideEntity转换为JSON字符串

4、根据以下方式拉宽订单明细数据

* 根据商品id,从redis中商品维度Hash获取商品数据
* 根据店铺id,从redis中门店维护Hash获取门店数据
* 根据商品数据的三级分类id,从redis中的商品分类Hash获取三级分类数据
* 根据三级分类数据的parentid,从redis中的商品分类Hash获取二级分类数据
* 根据二级分类数据的parentid,从redis中的商品分类Hash获取一级分类数据
* 根据门店数据的areaId,从redis中的组织机构Hash获取城市机构数据
* 根据城市机构数据的parentId,从redis中的组织机构Hash获取省份机构数据

5、将拉宽后的订单明细数据写入到hbase数据库中


### 商品消息实时拉宽处理

#### 操作步骤

1、在etl包下创建GoodsETL类,实现MySqlBaseETL

2、先过滤出表名为 itcast_goods

3、根据以下方式拉宽订单明细数据

* 根据商品id,从redis中商品维度Hash获取商品数据
* 根据店铺id,从redis中门店维护Hash获取门店数据
* 根据商品数据的三级分类id,从redis中的商品分类Hash获取三级分类数据
* 根据三级分类数据的parentid,从redis中的商品分类Hash获取二级分类数据
* 根据二级分类数据的parentid,从redis中的商品分类Hash获取一级分类数据
* 根据门店数据的areaId,从redis中的组织机构Hash获取城市机构数据
* 根据城市机构数据的parentId,从redis中的组织机构Hash获取省份机构数据

4、为了方便落地到 Kafka,再将GoodsWideBean转换为JSON字符串

#### 注册IP库为Flink分布式缓存

```scala
env.registerCachedFile(GlobalConfigUtil.`ip.file.path`, "qqwry.dat")

拉宽购物车数据

1、将Kafka的字符串消息转换为实体类

    // 将JSON转换为实体类
    val cartBeanDS = kafkaDS.map(CartBean(_))

2、使用RichMapFunction实现,从Redis中拉取维度数据

3、解析IP地址

4、拉宽日期时间

评论消息实时拉宽处理

操作步骤

1、在etl包下创建CommentsETL类,实现MQBaseETL

2、先过滤出表名为 itcast_goods

3、消费kafka的数据因为kafka的数据是字符串,消费出来需要转换成Comments对象

4、根据以下方式拉宽订单明细数据

  • 根据商品id,从redis中商品维度Hash获取商品数据
  • 根据店铺id,从redis中门店维护Hash获取门店数据
  • 根据商品数据的三级分类id,从redis中的商品分类Hash获取三级分类数据
  • 根据三级分类数据的parentid,从redis中的商品分类Hash获取二级分类数据
  • 根据二级分类数据的parentid,从redis中的商品分类Hash获取一级分类数据
  • 根据门店数据的areaId,从redis中的组织机构Hash获取城市机构数据
  • 根据城市机构数据的parentId,从redis中的组织机构Hash获取省份机构数据

4、为了方便落地到 Kafka,再将CommentsWideEntity转换为JSON字符串

使用JDBC查询Druid中的数据

安装 imply-3.0.4

Imply-3.0.4 基于 apache-druid-0.15.0-Incubating

  • 1、下载imply
cd /opt/module/
wget https://static.imply.io/release/imply-3.0.4.tar.gz
  • 2、直接使用资料 imply安装包jps
将该 `imply安装包\imply-3.0.4.tar.gz` 安装包上传到 /exports/softwares
  • 3、解压imply-3.0.4
tar -xvzf imply-3.0.4.tar.gz -C ../module
cd ../module/imply-3.0.4
  • 4、配置 imply-3.0.4 mysql中创建imply相关的数据库
CREATE DATABASE `druid` DEFAULT CHARACTER SET utf8;
CREATE DATABASE `pivot` DEFAULT CHARACTER SET utf8;

修改并上传配置文件

修改 conf/druid/_common/common.runtime.properties 文件

修改zookeeper的配置

druid.zk.service.host=hadoop102:2181,hadoop103:2181,hadoop104:2181

修改MySQL的配置

druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://hadoop102:3306/druid
druid.metadata.storage.connector.user=root
druid.metadata.storage.connector.password=root

修改 conf/pivot/config.yaml 配置文件

  • 修改mysql的配置
stateStore:
  type: mysql
  location: mysql
  connection: 'mysql://root:root@hadoop102:3306/pivot'

将配置好的 imply 分发到不同节点

  • 分发脚本 xsync
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
  echo Not Enough Arguement!
  exit;
fi
#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
  echo ====================  $host  ====================
  #3. 遍历所有目录,挨个发送
  for file in $@
  do
    #4 判断文件是否存在
    if [ -e $file ]
    then
      #5. 获取父目录
      pdir=$(cd -P $(dirname $file); pwd)
      #6. 获取当前文件的名称
      fname=$(basename $file)
      ssh $host "mkdir -p $pdir"
      rsync -av $pdir/$fname $host:$pdir
    else
      echo $file does not exists!
    fi
  done
done

imply 分发

[carlos@hadoop102 ~]$ xsync imply-3.0.4

配置环境变量

在每台服务器上配置DRUID_HOME环境变量

sudo vim /etc/profile.d/my_env.sh

# DRUID
export DRUID_HOME=/opt/module/imply-3.0.4

source /etc/profile.d/my_env.sh

启动 imply 集群

1、启动zk集群 https://blog.csdn.net/Aeve_imp/article/details/107597274 2、hadoop102节点(使用外部zk而不使用imply自带zk启动overlord和coordinator)

# 使用外部 zk 而不使用imply自带zk启动overlord和coordinator
[carlos@hadoop102 imply-3.0.4]$ bin/supervise -c /opt/module/imply-3.0.4/conf/supervise/master-no-zk.conf

3、hadoop103节点(启动historical和middlemanager)


[carlos@hadoop103 imply-3.0.4]$ bin/supervise -c /opt/module/imply-3.0.4/conf/supervise/data.conf

4、hadoop104节点(启动broker和router)

[carlos@hadoop104 imply-3.0.4]$ bin/supervise -c /opt/module/imply-3.0.4/conf/supervise/query.conf

注意事项

  • 如果希望imply运行在后台,在每个执行命令后面加 --daemonize

访问WebUI

组件名 URL
broker http://hadoop104:8888
coordinator、overlord http://hadoop102:8081/index.html
middleManager、historical http://hadoop102:8090/console.html

Druid提供了JDBC接口,JavaWeb项目可以直接使用 JDBC 连接Druid进行实时数据分析。

需求:

  • 获取 metrics-kafka 数据源中,不同用户的访问次数

实现步骤:

1、创建 druid_jdbc Maven模块

2、导入依赖

3、编写JDBC代码连接Druid获取数据

  • 加载Druid JDBC驱动
  • 获取Druid JDBC连接
  • 构建SQL语句
  • 构建Statement,执行SQL获取结果集
  • 关闭Druid连接

数据可视化项目

操作步骤:

1、导入 carlos_dw_web 项目

2、修改 DashboardServiceImpl.java 中 Redis 服务器地址

3、修改 utils.DruidHelper 中Druid的 url 地址

4、修改druid连接字符串的表名:dws_od改成dws_order

5、启动 Jetty 服务器

6、打开浏览器访问 http://localhost:8080/itcast_dw_web

Superset

BI VS 报表工具

  • 报表工具是数据展示工具,而BI(商业智能)是数据分析工具。报表工具可以制作各类数据报表、图形报表的工具,甚至还可以制作电子发票联、流程单、收据等。

  • BI可以将数据进行模型构建,制作成Dashboard,相比于报表,侧重点在于分析,操作简单、数据处理量大。常常基于企业搭建的数据平台,连接数据仓库进行分析。

安装教程

  1. xxxx
  2. xxxx
  3. xxxx

参与贡献

  1. Fork 本仓库
  2. 新建 Feat_xxx 分支
  3. 提交代码
  4. 新建 Pull Request
Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

公司需要大屏用于展示订单数据与用户访问数据,这里采用使用 Flink 来搭建实时计算平台 展开 收起
Java
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/anyine/carlos_shop_warehouse.git
git@gitee.com:anyine/carlos_shop_warehouse.git
anyine
carlos_shop_warehouse
carlos_shop_warehouse
master

搜索帮助

53164aa7 5694891 3bd8fe86 5694891