同步操作将从 Carlos/carlos_shop_warehouse 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
Flink 实时数仓
实时数仓项目架构
基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更
从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务,基于日志增量订阅和消费的业务包括
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
github地址:https://github.com/alibaba/canal
MySQL需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,/etc/my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER root IDENTIFIED BY 'root';
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' ;
FLUSH PRIVILEGES;
注意:本项目使用的版本 canal1.0.24
环境要求:
解压缩
mkdir /opt/module/canal
tar -zxvf canal.deployer-1.0.24.tar.gz -C /opt/module/canal/
解压完成后,进入 /opt/module/canal/ 目录,可以看到如下结构
drwxr-xr-x. 2 root root 4096 2月 1 14:07 bin
drwxr-xr-x. 4 root root 4096 2月 1 14:07 conf
drwxr-xr-x. 2 root root 4096 2月 1 14:07 lib
drwxrwxrwx. 2 root root 4096 4月 1 2017 logs
canal server的conf下有几个配置文件
[root@hadoop102 canal]# tree conf/
conf/
├── canal.properties
├── example
│ └── instance.properties
├── logback.xml
└── spring
├── default-instance.xml
├── file-instance.xml
├── group-instance.xml
├── local-instance.xml
└── memory-instance.xml
修改instance 配置文件
vim conf/example/instance.properties
## mysql serverId,这里的slaveId不能和myql集群中已有的server_id一样
canal.instance.mysql.slaveId = 1234
# 按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=hadoop102:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = root
canal.instance.dbPassword = root
#################################################
启动
sh bin/startup.sh
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.24</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
</dependencies>
包名 | 说明 |
---|---|
com.carlos.canal_demo | 代码存放目录 |
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [binlog里记录变更发生的时间戳,精确到秒]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组,变更前的数据字段]
afterColumns [Column类型的数组,变更后的数据字段]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的内容,注意为string文本]
mysql的binlog
它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间。主要用来备份和数据同步。
binlog 有三种: STATEMENT、ROW、MIXED
- STATEMENT 记录的是执行的sql语句
- ROW 记录的是真实的行数据记录
- MIXED 记录的是1+2,优先按照1的模式记录
名词解释:
什么是中继日志
从服务器I/O线程将主服务器的二进制日志读取过来记录到从服务器本地文件,然后从服务器SQL线程会读取 relay-log 日志的内容并应用到从服务器,从而使从服务器和主服务器的数据保持一致
EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功 ),传送成功之后更新Log Position。流程图如下:
包名 | 说明 |
---|---|
com.carlos.canal_client | 存放入口、Canal客户端核心实现 |
com.carlos.canal_client.util | 存放工具类 |
com.carlos.canal_client.kafka | 存放Kafka生产者实现 |
# canal配置
canal.server.ip=node1
canal.server.port=11111
canal.server.destination=example
canal.server.username=canal
canal.server.password=canal
canal.subscribe.filter=itcast_shop.*
# zookeeper配置
zookeeper.server.ip=hadoop102:2181,hadoop103:2181,hadoop104:2181
# kafka配置
kafka.bootstrap_servers_config=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka.batch_size_config=1024
kafka.acks=all
kafka.retries=0
kafka.client_id_config=carlos_shop_canal_click
kafka.key_serializer_class_config=org.apache.kafka.common.serialization.StringSerializer
kafka.value_serializer_class_config=com.carlos.canal.protobuf.ProtoBufSerializer
kafka.topic=ods_carlos_shop_mysql
/**
* 入口
*/
public class Entrance {
public static void main(String[] args) {
CanalClient canalClient = new CanalClient();
canalClient.start();
}
}
包名 | 说明 |
---|---|
com.carlos.canal.bean | 存放通用的JavaBean |
com.carlos.canal.protobuf | 实现Protobuf相关接口、实现 |
该类用于生成数据到Kafka
/**
* Kafka生产者
*/
public class KafkaSender {
private Properties kafkaProps = new Properties();
private KafkaProducer<String, RowData> kafkaProducer;
public KafkaSender() {
kafkaProps.put("bootstrap.servers", ConfigUtil.kafkaBootstrap_servers_config());
kafkaProps.put("acks", ConfigUtil.kafkaAcks());
kafkaProps.put("retries", ConfigUtil.kafkaRetries());
kafkaProps.put("batch.size", ConfigUtil.kafkaBatch_size_config());
kafkaProps.put("key.serializer", ConfigUtil.kafkaKey_serializer_class_config());
kafkaProps.put("value.serializer", ConfigUtil.kafkaValue_serializer_class_config());
kafkaProducer = new KafkaProducer<>(kafkaProps);
}
public void send(RowData rowData) {
kafkaProducer.send(new ProducerRecord<>(ConfigUtil.kafkaTopic(), null, rowData));
}
}
public class CanalClient {
// ...
private KafkaSender kafkaSender;
// 开始监听
public void start() {
...
RowData rowData = new RowData(binlogMsgMap);
kafkaSender.send(rowData);
}
# 创建topic
bin/kafka-topics.sh --create --zookeeper node1:2181 --topic ods_itcast_shop_mysql --replication-factor 3 --partitions 3
# 创建控制台消费者测试
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic ods_itcast_shop_mysql --from-beginning
#
#kafka的配置
#
# Kafka集群地址
bootstrap.servers="hadoop102:9092,hadoop103:9092,hadoop104:9092"
# ZooKeeper集群地址
zookeeper.connect="hadoop102:2181,hadoop103:2181,hadoop104:2181"
# 消费组ID
group.id="carlos"
# 自动提交拉取到消费端的消息offset到kafka
enable.auto.commit="true"
# 自动提交offset到zookeeper的时间间隔单位(毫秒)
auto.commit.interval.ms="5000"
# 每次消费最新的数据
auto.offset.reset="latest"
# kafka序列化器
key.serializer="org.apache.kafka.common.serialization.StringSerializer"
# kafka反序列化器
key.deserializer="org.apache.kafka.common.serialization.StringDeserializer"
# ip库本地文件路径
ip.file.path="E:/2021-05-10/carlos_shop_parent/data/qqwry.dat"
# Redis配置
redis.server.ip="hadoop102"
redis.server.port=6379
# MySQL配置
mysql.server.ip="hadoop102"
mysql.server.port=3306
mysql.server.database="itcast_shop"
mysql.server.username="root"
mysql.server.password="root"
# Kafka Topic 名称
input.topic.canal="ods_carlos_shop_mysql"
# Kafka click_log topic 名称
input.topic.click_log="ods_carlos_click_log"
# Kafka 购物车 topic 名称
input.topic.cart="ods_carlos_cart"
# kafka 评论 topic 名称
input.topic.comments="ods_carlos_comments"
# Druid Kafka 数据源 topic名称
output.topic.order="dwd_order"
output.topic.order_detail="dwd_order_detail"
output.topic.cart="dwd_cart"
output.topic.clicklog="dwd_click_log"
output.topic.goods="dwd_goods"
output.topic.ordertimeout="dwd_order_timeout"
output.topic.comments="dwd_comments"
# HBase订单明细表配置
hbase.table.orderdetail="dwd_order_detail"
hbase.table.family="detail"
拷贝以下内容到resources/log4j.properties
log4j.rootLogger=warn,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
拷贝一下内容到resources/hbase-site.xml
在scala目录中创建以下包结构:
包名 | 说明 |
---|---|
com.carlos.shop.realtime.etl.app | 程序入口 |
com.carlos.shop.realtime.etl.async | 异步IO相关 |
com.carlos.shop.realtime.etl.bean | 实体类 |
com.carlos.shop.realtime.etl.utils | 工具类 |
com.carlos.shop.realtime.etl.process | 实时ETL处理 |
com.carlos.shop.realtime.etl.dataloader | 维度数据离线同步 |
import com.typesafe.config.{Config, ConfigFactory}
object GlobalConfigUtil {
private val config: Config = ConfigFactory.load()
val `bootstrap.servers` = config.getString("bootstrap.servers")
val `zookeeper.connect` = config.getString("zookeeper.connect")
val `input.topic.canal` = config.getString("input.topic.canal")
val `input.topic.click_log` = config.getString("input.topic.click_log")
val `input.topic.comments` = config.getString("input.topic.comments")
val `group.id` = config.getString("group.id")
val `enable.auto.commit` = config.getString("enable.auto.commit")
val `auto.commit.interval.ms` = config.getString("auto.commit.interval.ms")
val `auto.offset.reset` = config.getString("auto.offset.reset")
val `key.serializer` = config.getString("key.serializer")
val `key.deserializer` = config.getString("key.deserializer")
val `output.topic.order` = config.getString("output.topic.order")
val `output.topic.order_detail` = config.getString("output.topic.order_detail")
val `output.topic.cart` = config.getString("output.topic.cart")
val `output.topic.clicklog` = config.getString("output.topic.clicklog")
val `output.topic.goods` = config.getString("output.topic.goods")
val `output.topic.ordertimeout` = config.getString("output.topic.ordertimeout")
val `output.topic.comments` = config.getString("output.topic.comments")
val `hbase.table.orderdetail` = config.getString("hbase.table.orderdetail")
val `hbase.table.family` = config.getString("hbase.table.family")
val `redis.server.ip` = config.getString("redis.server.ip")
val `redis.server.port`: String = config.getString("redis.server.port")
val `ip.file.path` = config.getString("ip.file.path")
val `mysql.server.ip` = config.getString("mysql.server.ip")
val `mysql.server.port` = config.getString("mysql.server.port")
val `mysql.server.database` = config.getString("mysql.server.database")
val `mysql.server.username` = config.getString("mysql.server.username")
val `mysql.server.password` = config.getString("mysql.server.password")
val `input.topic.cart` = config.getString("input.topic.cart")
def main(args: Array[String]): Unit = {
println(`bootstrap.servers`)
println(`zookeeper.connect`)
println(`input.topic.canal`)
println(`input.topic.click_log`)
println(`group.id`)
println(`enable.auto.commit`)
println(`auto.commit.interval.ms`)
println(`auto.offset.reset`)
println(`output.topic.order`)
println(`hbase.table.family`)
}
}
导入配置文件:工具类/1.redis连接池工具类/RedisUtil.scala 到 utils 目录
导入配置文件:工具类/2.hbase 连接池工具类到 utils 目录
注意事项
该特质主要定义统一执行ETL处理,只有一个process方法,用于数据的接入、etl、落地。
对于来自于mysql的binlog日志的数据,抽取出来mysql的基类
对于日志数据,封装来自消息队列的基类
因为后续的ETL处理,都是从Kafka中拉取数据,都需要共用Kafka的配置,所以将Kafka的配置单独抽取到工具类中
在utils包下创建KafkaConsumerProp
import java.util.Properties
object KafkaProp {
/**
* 读取Kafka属性配置
* @return
*/
def getProperties() = {
// 1. 读取Kafka配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", GlobalConfigUtil.`bootstrap.servers`)
properties.setProperty("zookeeper.connect", GlobalConfigUtil.`zookeeper.connect`)
properties.setProperty("group.id", GlobalConfigUtil.`group.id`)
properties.setProperty("enable.auto.commit", GlobalConfigUtil.`enable.auto.commit`)
properties.setProperty("auto.commit.interval.ms", GlobalConfigUtil.`auto.commit.interval.ms`)
properties.setProperty("auto.offset.reset", GlobalConfigUtil.`auto.offset.reset`)
properties.setProperty("key.serializer", GlobalConfigUtil.`key.serializer`)
properties.setProperty("key.deserializer", GlobalConfigUtil.`key.deserializer`)
properties
}
}
数据来源主要是分为三个途径:
在抽象类MySqlBaseETL中,实现Flink整合Kafka。
操作步骤:
1、自定义 ProtoBuf 反序列化
2、Flink 整合 Kafka
3、创建订单实时 etl 处理类
4、编写 App 测试
反序列化主要是将Byte数组转换为之前封装在 common 工程的RowData类
后面不少的业务逻辑(订单、订单明细、商品等)需要共用一份Kafka数据(从一个topic中拉取),抽取抽象类的目的是共用一个FlinkKafkaConsumer,因为后面创建FlinkKafkaConsumer整合Kafka需要使用到Flink流式运行环境,需要在主构造器中传入Flink流式运行环境。该ETL抽象类需要从BaseETL继承。
在etl包下创建OrderETL类,用于后续订单实时拉宽处理,此时只用来进行测试
val orderETL = new OrderETL(env)
orderETL.process()
后面不少的业务逻辑(购物车、评论、点击流等)需要共用一份Kafka数据,抽取抽象类的目的是共用一个FlinkKafkaConsumer,因为后面创建 FlinkKafkaConsumer 整合 Kafka 需要使用到Flink流式运行环境,需要在主构造器中传入Flink流式运行环境。该ETL抽象类需要从BaseETL继承。
# Kafka 购物车 topic名称
input.topic.cart="ods_carlos_cart"
val `input.topic.cart` = config.getString("input.topic.cart")
# kafka 评论 topic名称
input.topic.comments="ods_carlos_shop_comments"
val `input.topic.comments` = config.getString("input.topic.comments")
/**
* 点击流处理逻辑
*/
class CommentsETL(env:StreamExecutionEnvironment) extends BaseETL[String] {
/**
* 业务处理接口
*/
override def process(): Unit = {
// 1. 整合Kafka
val commentsDS: DataStream[String] = getKafkaDataStream(GlobalConfigUtil.`input.topic.comments`)
commentsDS.print()
}
}
# Kafka click_log topic名称
input.topic.click_log="ods_carlos_click_log"
val `input.topic.click_log` = config.getString("input.topic.click_log")
/**
* 点击流处理逻辑
*/
class ClickLogETL(env:StreamExecutionEnvironment) extends BaseETL[String] {
/**
* 业务处理接口
*/
override def process(): Unit = {
// 1. 整合kafka
val clickLogDS: DataStream[String] = getKafkaDataStream(GlobalConfigUtil.`input.topic.click_log`)
clickLogDS.print()
}
}
为了后续将订单、订单明细等数据进行实时ETL拉宽,需要提前将一些维度数据加载一个高性能存储中。此处,选择Redis作为商品维度、商品分类维度、门店维度、运营组织机构维度存储。先一次性将所有MySQL中的维度数据全量装载到Redis中,后续只要MySQL中的维度数据更新,马上使用Flink更新Redis中的维度数据
在 com.carlos.shop.realtime.bean 的 DimEntity 类中创建以下样例类
DimGoodsDBEntity 商品维度样例类
列名 | 描述 |
---|---|
goodsName | 商品名称 |
shopId | 店铺id |
goodsCatId | 商品分类id |
shopPrice | 商品价格 |
goodsId | 商品id |
列名 | 描述 |
---|---|
catId | 商品分类id |
parentId | 商品分类父id |
catName | 商品分类名称 |
cat_level | 商品分类级别 |
列名 | 描述 |
---|---|
shopId | 店铺id |
areaId | 区域id |
shopName | 店铺名称 |
shopCompany | 店铺公司名称 |
列名 | 描述 |
---|---|
orgId | 机构id |
parentId | 父机构id |
orgName | 机构名称 |
orgLevel | 机构级别 |
列名 | 描述 |
---|---|
catId | 门店商品分类id |
parentId | 门店商品分类父id |
catName | 门店商品分类名称 |
catSort | 门店商品分类级别 |
# Redis配置
redis.server.ip="hadoop102"
redis.server.port=6379
# MySQL配置
mysql.server.ip="hadoop102"
mysql.server.port=3306
mysql.server.database="itcast_shop"
mysql.server.username="root"
mysql.server.password="root"
val `redis.server.ip` = config.getString("redis.server.ip")
val `redis.server.port` = config.getString("redis.server.port")
val `mysql.server.ip` = config.getString("mysql.server.ip")
val `mysql.server.port` = config.getString("mysql.server.port")
val `mysql.server.database` = config.getString("mysql.server.database")
val `mysql.server.username` = config.getString("mysql.server.username")
val `mysql.server.password` = config.getString("mysql.server.password")
在utils类中添加RedisUtils类,使用Redis连接池操作Redis
object RedisUtil {
val config = new JedisPoolConfig()
//是否启用后进先出, 默认true
config.setLifo(true)
//最大空闲连接数, 默认8个
config.setMaxIdle(8)
//最大连接数, 默认8个
config.setMaxTotal(1000)
//获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1
config.setMaxWaitMillis(-1)
//逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
config.setMinEvictableIdleTimeMillis(1800000)
//最小空闲连接数, 默认0
config.setMinIdle(0)
//每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
config.setNumTestsPerEvictionRun(3)
//对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
config.setSoftMinEvictableIdleTimeMillis(1800000)
//在获取连接的时候检查有效性, 默认false
config.setTestOnBorrow(false)
//在空闲时检查有效性, 默认false
config.setTestWhileIdle(false)
var jedisPool: JedisPool = new JedisPool(config, GlobalConfigUtil.`redis.server.ip`, GlobalConfigUtil.`redis.server.port`.toInt)
/**
* 获取Redis连接
* @return
*/
def getResouce() = {
jedisPool.getResource
}
}
在 com.carlos.shop.realtime.etl.dataloader 包中创建 DimensionDataLoader 单例对象,实现装载商品维度数据
实现步骤:
1、先从MySQL的 itcast_goods 表中加载数据
2、将数据保存到键为 carlos_goods:dim_goods 的 HASH 结构中
3、关闭资源
实现步骤:
1、先从MySQL的 itcast_shops 表中加载数据
2、将数据保存到键为 carlos_goods:dim_shops 的 HASH 结构中
3、关闭资源
实现步骤:
1、先从MySQL的 itcast_goods_cats 表中加载数据
2、将数据保存到键为 carlos_shop:dim_goods_cats 的 HASH 结构中
3、关闭资源
实现步骤:
1、先从MySQL的 itcast_org 表中加载数据
2、将数据保存到键为 carlos_shop:dim_org 的 HASH 结构中
3、关闭资源
实现步骤:
1、先从MySQL的 itcast_shop_cats表中加载数据
2、将数据保存到键为 carlos_shop:dim_shop_cats 的 HASH 结构中
3、关闭资源
维度数据增量更新
在etl包下创建SyncDimDataETL类,继承MySqlBaseETL特质,实现process方法
/**
* Redis维度数据同步业务
*/
class SyncDimDataETL(env: StreamExecutionEnvironment) extends MySqlBaseETL(env) {
/**
* 业务处理接口
*/
override def process(): Unit = {
}
}
1、在App中创建实时同步ETL,并调用处理方法
val syncRedisDimDataETL = new SyncRedisDimDataETL(env)
syncRedisDimDataETL.process()
2、启动Flink程序
3、测试
新增MySQL中的一条表数据,测试Redis中的数据是否同步更新
修改MySQL中的一条表数据,测试Redis中的数据是否同步更新
删除MySQL中的一条表数据,测试Redis中的数据是否同步更新
查看redis中的数据
这是一个Logparsing框架,旨在简化Apache HTTPD和NGINX访问日志文件的解析。
基本思想是,您应该能够拥有一个解析器,可以通过简单地告诉该行写入了哪些配置选项来构造该解析器。这些配置选项是访问日志行的架构。
<dependency>
<groupId>nl.basjes.parse.httpdlog</groupId>
<artifactId>httpdlog-parser</artifactId>
<version>5.2</version>
</dependency>
在nginx的conf目录下的nginx.conf文件中可以配置日志打印的格式,如下:
#log_format main '$remote_addr - $remote_user [$time_local] [$msec]
[$request_time] [$http_host] "$request" '
'$status $body_bytes_sent "$request_body" "$http_referer" '
'"$http_user_agent" $http_x_forwarded_for'
$remote_addr 对应客户端的地址
$remote_user 是请求客户端请求认证的用户名,如果没有开启认证模块的话是值为空。
$time_local 表示nginx服务器时间
$msec 访问时间与时区字符串形式
$request_time 请求开始到返回时间
$http_host 请求域名
$request 请求的url与http协议
$status 请求状态,如成功200
$body_bytes_sent 表示从服务端返回给客户端的body数据大小
$request_body 访问url时参数
$http_referer 记录从那个页面链接访问过来的
$http_user_agent 记录客户浏览器的相关信息
$http_x_forwarded_for 请求转发过来的地址
$upstream_response_time: 从 Nginx 建立连接 到 接收完数据并关闭连接
参考:https://httpd.apache.org/docs/current/mod/mod_log_config.html
%u %h %l %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" \"%{Cookie}i\" \"%{Addr}i\"
## Flink实时etl
### 点击流消息实时拉宽处理
#### 拉宽点击流消息处理
#### 点击流消息实时拉宽测试
### 订单数据实时处理
#### 操作步骤
1、在etl包下创建OrderETL类,实现MySqlBaseETL
2、过滤出来表名为itcast_orders,且事件类型为 "insert" 的binlog消息
3、将RowData数据流转换为 OrderDBEntity数据源
4、为了方便落地到 Kafka,再将OrderDBEntity 转换为JSON字符串
5、将转换后的json字符串写入到kafka的**dwd_order**topic中
### 订单明细数据实时拉宽处理
#### 操作步骤
1、在etl包下创建OrderGoodsEtl类,实现MySqlBaseETl
2、先过滤出表名为 itcast_order_goods,且事件类型为 "insert" 的binlog消息
3、为了方便落地到 Kafka,再将OrderGoodsWideEntity转换为JSON字符串
4、根据以下方式拉宽订单明细数据
* 根据商品id,从redis中商品维度Hash获取商品数据
* 根据店铺id,从redis中门店维护Hash获取门店数据
* 根据商品数据的三级分类id,从redis中的商品分类Hash获取三级分类数据
* 根据三级分类数据的parentid,从redis中的商品分类Hash获取二级分类数据
* 根据二级分类数据的parentid,从redis中的商品分类Hash获取一级分类数据
* 根据门店数据的areaId,从redis中的组织机构Hash获取城市机构数据
* 根据城市机构数据的parentId,从redis中的组织机构Hash获取省份机构数据
5、将拉宽后的订单明细数据写入到hbase数据库中
### 商品消息实时拉宽处理
#### 操作步骤
1、在etl包下创建GoodsETL类,实现MySqlBaseETL
2、先过滤出表名为 itcast_goods
3、根据以下方式拉宽订单明细数据
* 根据商品id,从redis中商品维度Hash获取商品数据
* 根据店铺id,从redis中门店维护Hash获取门店数据
* 根据商品数据的三级分类id,从redis中的商品分类Hash获取三级分类数据
* 根据三级分类数据的parentid,从redis中的商品分类Hash获取二级分类数据
* 根据二级分类数据的parentid,从redis中的商品分类Hash获取一级分类数据
* 根据门店数据的areaId,从redis中的组织机构Hash获取城市机构数据
* 根据城市机构数据的parentId,从redis中的组织机构Hash获取省份机构数据
4、为了方便落地到 Kafka,再将GoodsWideBean转换为JSON字符串
#### 注册IP库为Flink分布式缓存
```scala
env.registerCachedFile(GlobalConfigUtil.`ip.file.path`, "qqwry.dat")
1、将Kafka的字符串消息转换为实体类
// 将JSON转换为实体类
val cartBeanDS = kafkaDS.map(CartBean(_))
2、使用RichMapFunction实现,从Redis中拉取维度数据
3、解析IP地址
4、拉宽日期时间
1、在etl包下创建CommentsETL类,实现MQBaseETL
2、先过滤出表名为 itcast_goods
3、消费kafka的数据因为kafka的数据是字符串,消费出来需要转换成Comments对象
4、根据以下方式拉宽订单明细数据
4、为了方便落地到 Kafka,再将CommentsWideEntity转换为JSON字符串
Imply-3.0.4 基于 apache-druid-0.15.0-Incubating
cd /opt/module/
wget https://static.imply.io/release/imply-3.0.4.tar.gz
将该 `imply安装包\imply-3.0.4.tar.gz` 安装包上传到 /exports/softwares
tar -xvzf imply-3.0.4.tar.gz -C ../module
cd ../module/imply-3.0.4
CREATE DATABASE `druid` DEFAULT CHARACTER SET utf8;
CREATE DATABASE `pivot` DEFAULT CHARACTER SET utf8;
修改 conf/druid/_common/common.runtime.properties 文件
修改zookeeper的配置
druid.zk.service.host=hadoop102:2181,hadoop103:2181,hadoop104:2181
修改MySQL的配置
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://hadoop102:3306/druid
druid.metadata.storage.connector.user=root
druid.metadata.storage.connector.password=root
修改 conf/pivot/config.yaml 配置文件
stateStore:
type: mysql
location: mysql
connection: 'mysql://root:root@hadoop102:3306/pivot'
将配置好的 imply 分发到不同节点
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
echo Not Enough Arguement!
exit;
fi
#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
echo ==================== $host ====================
#3. 遍历所有目录,挨个发送
for file in $@
do
#4 判断文件是否存在
if [ -e $file ]
then
#5. 获取父目录
pdir=$(cd -P $(dirname $file); pwd)
#6. 获取当前文件的名称
fname=$(basename $file)
ssh $host "mkdir -p $pdir"
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done
imply 分发
[carlos@hadoop102 ~]$ xsync imply-3.0.4
在每台服务器上配置DRUID_HOME环境变量
sudo vim /etc/profile.d/my_env.sh
# DRUID
export DRUID_HOME=/opt/module/imply-3.0.4
source /etc/profile.d/my_env.sh
1、启动zk集群 https://blog.csdn.net/Aeve_imp/article/details/107597274 2、hadoop102节点(使用外部zk而不使用imply自带zk启动overlord和coordinator)
# 使用外部 zk 而不使用imply自带zk启动overlord和coordinator
[carlos@hadoop102 imply-3.0.4]$ bin/supervise -c /opt/module/imply-3.0.4/conf/supervise/master-no-zk.conf
3、hadoop103节点(启动historical和middlemanager)
[carlos@hadoop103 imply-3.0.4]$ bin/supervise -c /opt/module/imply-3.0.4/conf/supervise/data.conf
4、hadoop104节点(启动broker和router)
[carlos@hadoop104 imply-3.0.4]$ bin/supervise -c /opt/module/imply-3.0.4/conf/supervise/query.conf
注意事项
组件名 | URL |
---|---|
broker | http://hadoop104:8888 |
coordinator、overlord | http://hadoop102:8081/index.html |
middleManager、historical | http://hadoop102:8090/console.html |
Druid提供了JDBC接口,JavaWeb项目可以直接使用 JDBC 连接Druid进行实时数据分析。
需求:
实现步骤:
1、创建 druid_jdbc Maven模块
2、导入依赖
3、编写JDBC代码连接Druid获取数据
操作步骤:
1、导入 carlos_dw_web 项目
2、修改 DashboardServiceImpl.java 中 Redis 服务器地址
3、修改 utils.DruidHelper 中Druid的 url 地址
4、修改druid连接字符串的表名:dws_od改成dws_order
5、启动 Jetty 服务器
6、打开浏览器访问 http://localhost:8080/itcast_dw_web
报表工具是数据展示工具,而BI(商业智能)是数据分析工具。报表工具可以制作各类数据报表、图形报表的工具,甚至还可以制作电子发票联、流程单、收据等。
BI可以将数据进行模型构建,制作成Dashboard,相比于报表,侧重点在于分析,操作简单、数据处理量大。常常基于企业搭建的数据平台,连接数据仓库进行分析。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。