欢迎来到 榆林市某某通信设备专卖店
全国咨询热线:020-123456789
联系我们

地址:联系地址联系地址联系地址

电话:020-123456789

传真:020-123456789

邮箱:admin@aa.com

新闻中心
应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案
  来源:榆林市某某通信设备专卖店  更新时间:2024-05-04 08:55:06

应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案

编者荐语 :随着领创集团的应用快速发展 ,为了满足十亿级数据量的实践数据实时数仓实时报表统计与决策分析,领创集团选择了 Flink + Doris 海量的实时数仓方案 。本篇文章详尽了介绍了此方案的秒级实践过程。

以下文章来源于领创集团Advance Group,分析方案 作者苏浩

原文链接:海量数据 !构建秒级分析!应用Flink+Doris构建实时数仓方案

业务背景

Advance Intelligence Group(领创集团)成立于 2016 年,实践数据实时数仓是海量一家以 AI 技术驱动的科技集团 ,致力于通过科技创新的秒级本地化应用 ,改造和重塑金融和零售行业 ,分析方案以多元化的构建业务布局打造一个服务于消费者 、企业和商户的应用生态圈 。集团旗下包含企业业务和消费者业务两大板块 ,实践数据实时数仓企业业务包含 ADVANCE.AI 和 Ginee ,海量分别为银行、金融、金融科技、零售和电商行业客户提供基于 AI 技术的数字身份验证  、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包括亚洲领先的先享后付平台 Atome 和数字金融服务。

2021 年 9 月 ,领创集团宣布完成超 4 亿美元 D 轮融资 ,融资完成后领创集团估值已超 20 亿美元,成为新加坡最大的独立科技创业公司之一。业务覆盖新加坡 、印度尼西亚、中国大陆、印度、越南等 17 个国家与地区,服务了 15 万以上的商户和 2000 万消费者  。

随着集团业务的快速发展 ,为满足十亿级数据量的实时报表统计与决策分析,我们选择基于 Apache Flink + Apache Doris 构建了实时数仓的系统方案。

Doris 基本原理

Apache Doris 基本架构非常简单 ,只有 FE(Frontend) 、BE(Backend) 两种角色 ,不依赖任何外部组件,对部署和运维非常友好 。架构图如下 :

应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案

FE(Frontend)以 Java 语言为主 。

主要功能职责:

  • 接收用户连接请求(MySQL 协议层)
  • 元数据存储与管理
  • 查询语句的解析与执行计划下发
  • 集群管控

FE 主要有有两种角色,一个是 Follower ,还有一个 Observer ,Leader 是经过选举推选出的特殊 Follower 。Follower 主要是用来达到元数据的高可用,保证单节点宕机的情况下  ,元数据能够实时地在线恢复,而不影响整个服务 。

BE(Backend) 以 C++ 语言为主。

主要功能职责:

  • 数据存储与管理
  • 查询计划的执行

技术架构

整体数据链路如下图:

应用实践 | 海量数据
,秒级分析!Flink+Doris 构建实时数仓方案
  1. 通过 FlinkCDC 采集 MySQL Binlog 到 Kafka 中的 Topic1
  1. 开发 Flink 任务消费上述 Binlog 生成相关主题的宽表 ,写入 Topic2
  1. 配置 Doris Routine Load 任务,将 Topic2 的数据导入 Doris

应用实践

关于步骤1和步骤2的实践 ,“基于 Flink-CDC 数据同步⽅案” 的文章中已有说明,本文将对步骤3展开详细的说明  。

建表

因业务数据经常伴随有 UPDATE ,DELETE 等操作,为了保持实时数仓的数据粒度与业务库一致 ,所以选择 Doris Unique 模型(数据模型在下文有重点介绍)具体建表语句如下 :

CREATE TABLE IF NOT EXISTS table_1n(nkey1 varchar(32),nkey2 varchar(32),nkey3 varchar(32),nvalue1 int,nvalue2 varchar(128),nvalue3 Decimal(20, 6),ndata_deal_datetime DateTime COMMENT '数据处理时间',ndata_status INT COMMENT '数据是否删除 ,1表示正常 ,-1表示数据已经删除'n) nENGINE=OLAPnUNIQUE KEY(`key1`,`key2`,`key3`)nCOMMENT "xxx"nDISTRIBUTED BY HASH(`key2`) BUCKETS 32nPROPERTIES (n"storage_type"="column",n"replication_num" = "3",n"function_column.sequence_type" = 'DateTime'n);

可以看到,表结构中有两个字段分别是 data_deal_datetime ,data_status。

  • data_deal_datetime 主要是相同 key 情况下数据覆盖的判断依据
  • data_status 用来兼容业务库对数据的删除操作

数据导入任务

Doris 提供了主动拉取 Kafka 数据的功能  ,配置如下 :

CREATE ROUTINE LOAD database.table1 ON table1nCOLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status),nORDER BY data_deal_datetimenPROPERTIESn(n"desired_concurrent_number"="3",n"max_batch_interval" = "10",n"max_batch_rows" = "500000",n"max_batch_size" = "209715200",n"format" = "json",n"json_root" = "$.data",n"jsonpaths"="["$.key1","$.key2","$.key3","$.value1","$.value2",n "$.value3","$.data_deal_datetime","$.data_status"]"n)FROM KAFKAn(n"kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3",n"kafka_topic"="topic_name",n"property.group.id"="group_id",n"property.kafka_default_offsets"="OFFSET_BEGINNING"n);

导入语句中:

  • ORDER BY data_deal_datetime 表示根据 data_deal_datetime 字段去覆盖 key 相同的数据
  • desired_concurrent_number 表示期望的并发度。

max_batch_interval/max_batch_rows/max_batch_size 这 3 个参数分别表示:

  • 每个子任务最大执行时间 。
  • 每个子任务最多读取的行数 。
  • 每个子任务最多读取的字节数。

任务监控与报警

Doris routine load 如果遇到脏数据会导致任务暂停,所以需要定时监控数据导入任务的状态并且自动恢复失败任务 。并且将错误信息发至指定的 lark 群 。具体脚本如下 :

import pymysql #导入 pymysqlnimport requests,jsonnnn#打开数据库连接ndb= pymysql.connect(host="host",user="user",n password="passwd",db="database",port=port)nn# 使用cursor()方法获取操作游标ncur = db.cursor()nn#1.查询操作n# 编写sql 查询语句 nsql = "show routine load"ncur.execute(sql) #执行sql语句nresults = cur.fetchall() #获取查询的所有记录nfor row in results :n name = row[1]n state = row[7]n if state != 'RUNNING':n err_log_urls = row[16]n reason_state_changed = row[15]n msg = "doris 数据导入任务异常:n name=%s n state=%s n reason_state_changed=%s n err_log_urls=%s n即将自动恢复 ,请检查错误信息" % (name, state,nreason_state_changed, err_log_urls)n payload_message = { n "msg_type": "text",n "content": { n "text": msgn }n}n url = 'lark 报警url'n s = json.dumps(payload_message)n r = requests.post(url, data=s)n cur.execute("resume routine load for " + name)nncur.close()ndb.close()

现在线上配置的监控 1 分钟执行一次,如果遇到任务暂停 ,会自动恢复导入任务 ,但是导致任务失败的脏数据会跳过  ,此时需要人工排查失败原因 ,修复后重新触发该条数据的导入 。

数据模型

Doris 内部表中,主要有 3 种数据模型 ,分别是 Aggregate  、Unique 、Duplicate。在介绍数据模型之前 ,先解释一下 Column:在 Doris 中 ,Column 可以分为两大类 :Key 和 Value 。从业务角度看 ,Key 和 Value 分别对应维度列和指标列。

Aggregate

简单来说 ,Aggregate 模型就是预聚合模型,类似于 MOLAP ,通过提前定义 Key 列及 Value 列的聚合方式 ,在数据导入的时候已经将 Key 列相同的数据按照 value 列的聚合方式聚合在一起,即最终表里 Key 相同的数据只保留一条 ,Value 按照相应的规则计算 。下面举例说明 。

表结构如下 :

CREATE TABLE tmp_table_1n (n user_id varchar(64) COMMENT "用户id",n channel varchar(64) COMMENT "用户来源渠道",n city_code varchar(64) COMMENT "用户所在城市编码",n last_visit_date DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",n total_cost BIGINT SUM DEFAULT "0" COMMENT "用户总消费"n )nENGINE=OLAPnAGGREGATE KEY(user_id, channel, city_code)nDISTRIBUTED BY HASH(user_id) BUCKETS 6n PROPERTIES("storage_type"="column","replication_num" = "1"):

表结构中,Key 列分别是 user_id、channel 、city_code ,Value 列是 last_visit_date 、total_cost ,他们的聚合方式分别为 REPLACE、SUM。

现在 ,向该表中插入一批数据:

insert into tmp_table_1 values('suh_001','JD','001','2022-01-01 00:00:01','57');ninsert into tmp_table_1 values('suh_001','JD','001','2022-02-01 00:00:01','76');ninsert into tmp_table_1 values('suh_001','JD','001','2022-03-01 00:00:01','107');

按照我们的理解,现在 tmp_table_1 中虽然我们插入了 3 条数据 ,但是这 3 条数据的 Key 都是一致的 ,那么最终表中应该只有一条数据 ,并且 last_visit_date 的值应为"2022-03-01 00:00:01" ,total_cost 的值应为 240。下面我们验证一下:

应用实践 | 海量数据,秒级分析
!Flink+Doris 构建实时数仓方案

可以看到,结果与我们预期⼀致 。

Unique 模型

正如本次建设的实时数仓那样 ,我们更加关注的是如何保证主键的唯⼀性 ,即如何获得 Primary Key 唯⼀性约束 。⼤家可以参考上⾯建表的例⼦ ,在这⾥不再举例说明 。

Duplicate 模型

在某些多维分析场景下 ,数据既没有主键 ,也没有聚合需求  。因此引⼊ Duplicate 数据模型来满⾜这类需求 。举例说明。

表结构如下:

CREATE TABLE tmp_table_2n (n user_id varchar(64) COMMENT "用户id",n channel varchar(64) COMMENT "用户来源渠道",n city_code varchar(64) COMMENT "用户所在城市编码",n visit_date DATETIME COMMENT "用户登陆时间",ncost BIGINT COMMENT "用户消费金额"n )nENGINE=OLAPnDUPLICATE KEY(user_id, channel, city_code)nDISTRIBUTED BY HASH(user_id) BUCKETS 6n PROPERTIES("storage_type"="column","replication_num" = "1");

插入数据:

insert into tmp_table_2 values('suh_001','JD','001','2022-01-01 00:00:01','57');ninsert into tmp_table_2 values('suh_001','JD','001','2022-02-01 00:00:01','76');ninsert into tmp_table_2 values('suh_001','JD','001','2022-03-01 00:00:01','107');

因为此时数据是 Duplicate 模型,不会进行任何处理,查询应该能查到 3 条数据

应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案

数据模型的选择建议

因为数据模型在建表时就已经确定 ,且无法修改 。所以 ,选择一个合适的数据模型非常重要 。

Aggregate 模型可以通过预聚合 ,极大地降低聚合查询时所需扫描的数据量和查询的计算量 ,非常适合有固定模式的报表类查询场景 。但是该模型对 count(*) 查询很不友好。同时因为固定了 Value 列上的聚合方式,在进行其他类型的聚合查询时 ,需要考虑语意正确性 。

Unique 模型针对需要唯一主键约束的场景  ,可以保证主键唯一性约束,但是无法利用 ROLLUP 等预聚合带来的查询优势。

Duplicate 适合任意维度的 Ad-hoc 查询 ,虽然同样无法利用预聚合的特性 ,但是不受聚合模型的约束  ,可以发挥列存模型的优势。

总结

Flink + Doris 构建的实时数仓上线后 ,报表接口相应速度得到了明显提高 ,单表 10 亿级聚合查询响应速度 TP95 为 0.79 秒 ,TP99 为 5.03 秒。到目前为止,整套数仓体系已平稳运行 8 个多月。

应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案

SelectDB 是一家开源技术公司,致力于为 Apache Doris 社区提供一个由全职工程师 、产品经理和支持工程师组成的团队,繁荣开源社区生态,打造实时分析型数据库领域的国际工业界标准 。基于 Apache Doris 研发的新一代云原生实时数仓 SelectDB ,运行于多家云上 ,为用户和客户提供开箱即用的能力 。


友情链接英雄联盟手游永恒梦魇攻略 魔腾技能天赋出装阵容搭配推荐英雄联盟下棋攻略(英雄联盟下棋技巧教学)LOL斯特拉克的挑战护手详解 LOL国服5.16新装备解析盘点魔兽世界那些紫色品质的宠物,你都拥有了没?七天学堂梦幻西游五开攻略,低等级五开性价比的法宠介绍(69-109)路在何方!魔兽世界6.0术士竞技场PvP心得指南dnf碳结晶体有什么作用?碳结晶体获取方式/用处一览P3阶段海加尔峰掉落哪些神器(海加尔峰掉落神器全汇总)星辰变银色北伐军日常怎么开启(银色北伐军声望开启任务流程)阴阳师SP式神是什么意思_SP式神介绍问道手游做装备哪里有白装,问道套装怎样做用白装备广州黄埔区南岗河成为都市中的“小鸟天堂”dnf正义审判者装备搭配 地下城正义审判者技能选择刀剑神域无限时刻(含金手指)gm版手游平台十大推荐 2023最火的gm手游平台排行榜为什么dnf启动不了,dnf启动没反应这个四年级的女孩,喜欢用文字和人拥抱。金赞娱乐登录拳皇2002网页游戏,拳皇2002怎么最快的提升技术啊我中指和无名指按键不灵活,而且老想着怎么连到实战中什么招都发不出来永劫无间手游金色皮肤怎么获得 金色皮肤获取方法说明红米redmi pad给你性价比极至的享受金铲铲之战开启内测,网友没资格也能进,云顶之弈S1羁绊回归王者荣耀S11赛季结束时间 S11赛季什么时候结束英雄联盟星光纪念册隐藏卡怎么获取 星光纪念册隐藏卡获取攻略天涯明月刀祈年结束 各区的贡献以及人数统计在《火影忍者》漫画粉丝的眼中,《博人传》的故事大概是个什么水准?DNF泰波尔斯详细打法攻略 DNF泰波尔斯副本过关解析英雄联盟手游s10什么时候结束 s11赛季开始时间赛尔号精灵最新特性配方大全王者荣耀:S22要出定制装备?属性完美契合,相当于有2个大性格S7暗夜猎手 薇恩加点问道1.46鬼宠成长、技能、携带等级、宠物轮回.doc梦境药剂图纸配方(魔兽世界wlk药剂有什么作用)赛尔号:新旧融合的区别,前者原理是吞噬,后者是单纯的合体!DNF:21号商城大动作!旧版魔盒删除,“龙盒”长期上架问道手游做装备哪里有白装,问道套装怎样做用白装备极品飞车 Devil❼Whisper阴阳师正式服11月22日更新公告
联系我们

地址:联系地址联系地址联系地址

电话:020-123456789

传真:020-123456789

邮箱:admin@aa.com

0.1949

Copyright © 2024 Powered by 榆林市某某通信设备专卖店   sitemap