的批流一体存储通常 Hudi 字节基于
首先来引见一下相关背景。
传统数仓存在实时和离线两条链路,来满足业务关于时效数据的时效性和数据量的不同需求。离线会保养历史的全量视图,实时会保养增量视图,最后在服务层去启动数据的汇总,从而允许后续的在线的serving、 OLAP 查问以及看板的运行等等。
由于处置场景的差异,在实时和离线数仓的详细成功上,依赖的底层存储计算引擎基本上是齐全隔离的,实时依赖的重要是以 Flink 为代表的流式计算引擎来做计算,而离线依赖重要是以 Spark 为代表的引擎,实时重要依赖 KV 或 MQ 这样的多种存储选型。离线则经常驳回 Hive 为代表的存储引擎,传统的数仓架构,它实质上结合了流计算和 批计算的长处,经过两套代码来兼容实时数据和离线数据的长处,补偿各自的缺陷。然而这两套架构或代码也带来了两倍的资源老本,并且由于底层计算引擎的不同,关于相反算子的处置的语义不是齐全分歧的,它们的计算结果就会存在差异。所以关于研发同窗来说,这局部差异也会给数据校验等其它一些上班带来额外的累赘。总结上去,传统数仓的 Lambda 架构重要存在三个疑问要处置:第一个是一套计算逻辑,但须要写离线和实时两套代码,带来了两倍的运维老本;第二是离线实时两条链路带来了资源冗余的疑问,双倍的资源的老本;第三是两套引擎计算口径不齐全对齐,造成数据校准方面会有比拟大的艰巨。所以业内提出了宿愿经过批流一体来处置传统 Lambda 架构的疑问。基于上述疑问,再结合外部的实践场景,批流一体的诉求可以分为两种,第一种是计算的批流一体,第二种是存储的批流一体。计算的批流一体指的是宿愿经过底层一套系统,业务层的一套代码同时满足离线和实时的开发需求,从而处置两套系统带来的研发效率、人工老本、运维老本等资源老本相关的疑问。另外咱们宿愿一套系统能够对齐计算目的的口径来处置数据分歧性的疑问。
存储的流批一体包含,第一是实时和离线所在的存储一致,第二是实时和离线的数据能够复用。实时和离线存储一致是指咱们宿愿实时和离线能够经常使用一致选型的存储,这就要求存储能够满足大规模、全量和增量数据的读写诉求。批流数据复用指的是批处置能够经常使用流处置的结果数据,优化整个离线数仓的产出期间。典型的 case 就是 ODS 层的数据复用,另外流处置也能够复用批处置的数据来处置链路冷启动的时刻,须要将离线的数据回灌到实时存储中的额外老本,咱们经过 LAS 去成功了批流一体的才干。在计算层,咱们的处置思绪是对外泄露一致的 SQL,底层依据 SQL 处置的场景选用不同的口头引擎去口头,对用户屏蔽以底层口头引擎的差同性。之所以这样做的要素是咱们以为不同引擎实用于不同场景,很难找到引擎能够同时满足实时和离线,不同时效性和数据规模的要求,在 SQL 层,咱们对齐了底层口头引擎的差同性来成功计算口径的对齐,处置了上方说到的分歧性疑问。在存储层,咱们基于湖仓一体的架构,经过数据湖成功了批流一体存储的才干。除了能够允许流式的增量和批式的全量读写之外,咱们还允许了高效的 OLAP 查问才干以及维表 join 的才干。
既然提到了LAS,咱们看一下 LAS 的全体结构, LAS全称是 Lakehouse Analysis Service。湖仓一体的数据剖析服务,融合了湖与仓的长处,既能够应用湖的长处将一切的数据都存储到便宜存储中,供踊跃学习、数据剖析等场景经常使用,又能够基于数据湖构建数仓,让BI、报表等业务场景去经常使用。
LAS具有如下一些个性:首先是能够允许一致的元数据,防止在数据湖中存在数据孤岛的疑问,每个数据都是可追溯的。第二个是附丽数据湖提供ACID 的才干。第三点是机器允许企业级的权限管控。第四点是允许资源的极致弹性扩缩容,降落用户的经常使用老本。最后是引擎的内核的极致优化,提供高效的读写性能。LAS 全体架构,最上方一层是湖仓开发工具,为数据运行场景提供才干撑持。上方一层是数据剖析引擎,允许流批一体SQL,处置计算批流一体的疑问,并且允许依据 SQL 的特点去做引擎的默认选用和减速。针对 OLAP 剖析,咱们会将 SQL 路由到不同的口头引擎去口头,比如对 Ad-hoc咱们会用 Presto 去启动查问。再往下一层是一致元数据层,最后一层是基于 Hudi 去成功的流批存储层。本文集聚焦在流批一体存储的细节成功上。
咱们须要剖析现有的离线数仓和实时数仓的详细需求,来思索流批一体存储的成功方式。离线数仓的全体结构分层相对来说还是比拟明晰的,经常使用的存储也会比拟繁多,重要是Spark 加 Hive 的方式,提供高效的数据处置和吞吐才干,能够允许离线数据回溯场景下的并发降级。然而实时数仓的经常使用存储相对来说会复杂一些,普通会附丽 Kafka 或 MQ 启动每一层数据表的构建,为了允许高效的 join 的性能,在维表的存储选型上,咱们往往会依据数据量的差异选用KV、Hbase 或许是 MySQL 去启动存储。在全体的 DWS 数仓链路梳理完之后,到了数据运行层会对接 ClickHouse、Doris这样的高效的 OLAP 引擎,去对外提供计时的数据看板报表等等。数据运行层还会有一些 serving 的配置,服务层会将数据写到 KV 或许 MySQL 或许 ES 这些存储里,对外提供 serving 的服务。
在构建实时数仓新链路的时刻,关于链路冷启动,须要经常使用历史分区的数据,所以咱们须要将离线的数据回灌到实时链路的 MQ 外面,受限于 MQ 带宽的限度,全体的回溯周期或许会十分的长,并且操作很复杂。另外当计算目的有疑问,或许是全体的计算口径须要调整的时刻,也须要经常使用离线的数据去对实时数据启动回刷,雷同它也会遇到回溯周期长,操作复杂等一系列疑问。经过引见可以看出实时数据仓库全体相对比拟复杂,存储方式和构建规范没有齐全一致。为了更精细地剖析实时数据仓库关于流批一体存储的需求,咱们基于数据量提前、数据分歧性要求和计算周期等维度,将场景划分红了三类:日志计算、长周期计算和全量计算。
日志计算的场景特点在于数据量比拟大,然而可以接受大批数据失落。大局部数据要求在分钟内计算,并启动分组聚合。但该场景的痛点在于宿愿经过批流数据复用和一致以优化数据时效性和降落资源老本。关于长周期计算场景,数据量相对中等。须要对目的启动复算计算,但全体数据周期或许较长。直播类业务场景或许继续一个月,数据要求在秒级。该场景的痛点在于冷启动和回溯环节复杂、周期长、老本高。全量计算场景数据量不大,会将全量数据存储到Flink state中启动计算,要求强分歧性,时效性要求在秒级别。但该场景遇到的最大疑问在于因数据存储到Flink state中,未启动分层结构,回溯的两边结果或许不显显露,不太利于开发人员启动调试操作。总结来看,数仓关于存储的重要需求可以概括为以下几点:其一,实时存储不一致,运维复杂;其二,实时离线存储不一致,资源老本高;其三,冷启动或回溯环节复杂耗时;其四,无法查问两边数据。因此,咱们的批流一体存储方案,不只要处置上述痛点,还须要具有以下基天性力:允许离线回溯场景的分区并发降级,且数据读写吞吐量不低于Hive。关于流式场景的流批处置,须要满足低提前的要求,数据提前约为几秒钟,并能够提供高吞吐量以允许千万级RPS。此外,咱们还须要提供允许Exactly-once和At-least-once数据分歧性语义的配置。为了成功全体的流批一体的目的,还须要允许多引擎,例如Spark、Flink的读写,同时也须要允许多种OLAP引擎启动查问。
接上去看一下咱们的流批一体存储方案,结合刚刚探讨的流批一体存储目的,咱们发现现有的基于数据湖仓一体的架构,实践上曾经可以满足大少数要求了。数据湖仓一体的架构曾经允许一切数据入湖,并允许Spark、Flink引擎,同时也可以启动离线和实时的数据操作。在下游数据运行方面,数据湖仓一体的架构还允许ihook、metastore、adhoc等OLAP查问方式。在字节外部经常使用的场景中,业务会经过Flink实时将数据入湖,经常使用Spark批量回溯降级湖内的数据,并且下游会经常使用Presto查问服务来触及下游的看板。因此,咱们消息湖仓库重要经常使用了Hudi这样的开源方案。在配置方面,数据湖仓库基本合乎了实时和离线数仓关于流批一体存储的需求,而这重要是由于Hudi自身提供了事务允许,咱们在外部还启动了桶索引机制的优化以进一步提高入湖的性能,并且经过metastore的元数据服务来允许并发写入配置。此外,Hudi原生允许多引擎,因此既可以对批流启动读写消费,也可以经常使用Presto启动交互式剖析。
在外部,湖仓一体架构大规模地落地了离线的数仓场景和局部近实时的数仓场景。然而由于 Hudi 自身的或许数据库自身分钟级别的可见性,它还是没有方法做到实时数仓存储的规范方案。
为了处置时效性的疑问,提供低提前的才干,咱们外部自研了基于内存的服务,它构建于数据湖之上,构成了一套全体的高吞吐、高并发、低提前的实时数据服务方案。底层方案的全体架构如图所示。底层是耐久化数据层,会复用Hudi的才干耐久化数据,文件散布跟 Hudi分歧,经过 log 的行存文件和 base 的列存文件启动数据存储,会经过 file slice 这种基于期间戳的方式去保养数据的版本消息,经过 file group 这样的方式去对文件启动分组,相反组件的数据会存储在同文件组内。这种文件变分组的方式,再结合索引的才干,能够有效地优化数据入湖的性能和查问的性能。
下层服务层重要分为两个组件, BTS 和 Table Service Management。BTS 是基于内存构建的服务层,它重要是来处置实时场景下数据读写的时效性的疑问,经过内存去对数据读写启动减速,TSM (Table Service Management),是表优化的服务,它会异步地去口头一些表优化的操作,从而成功对查问的减速。
这里的表优化操作指的包含社区原生的紧缩聚合 clustering,以及一些索引异步构建,视图异步构建的一些操作。紧缩聚合指的是对日志文件和基础文件进一步兼并以生成新的列式存储文件,这样对全体查问效率而言更优。而 clustering 则是兼并小文件以缩小文件开支。 TSM 只允许这两种才干以及清算才干,咱们方案结合社区现有的 MDT 才干来异步构建多级索引,以优化交互式查问的性能。
表优化操作是一个齐全异步的环节。这局部是咱们自主开发的服务,由于一些社区原生成功并没有做到齐全异步。为什么要异步呢?由于 compaction 和 clustering 的口头期间比拟长,同步操作会影响数据湖的写入速度,特意是在实时场景下无法接受。而社区的异步操作仅指写入时不阻塞,然而 compaction 会共享写入资源在同一个运行程序外口头。这或许会影响写入作业的稳固性,因此咱们在外部落地环节中发现了这个疑问,最后成功了一个齐全异步的调度口头,同时不共享写入资源的服务。在详细的口头层面上,咱们还应用混部的资源以降落老本。
1、数据组织方式
基于这样的新的流批存储架构,咱们新增了两边的服务层,特意是BTS这样的实时元数据减速层,全体的数据组织方式如下图所示。数据组织方式在逻辑层分为表分区、文件组和文件大小等概念。数据写入时,先写入对应分区,再依据主键写入对应文件组。文件组的底层文件存储分为内存数据和散布式文件系统数据两种类型。在内存中,数据由块构成,而在散布式文件系统中,数据的组织方式与Hudi相似,驳回基础文件和日志文件的形式。值得留意的是,咱们引入了日志存储层来存储WAL文件,以确保数据在写入环节中的有效性,并处置内存数据失落的疑问。因此,在写入内存数据之前,咱们会先写入WAL文件,确保这局部数据已被耐久化存储,实践操作才被以为是成功的。关于内存文件块与耐久化存储文件之间的映射相关,每个块对应一个WAL文件以保证数据的容错性。每个内存中的块都不会终身存储在内存中,而是活期地刷到耐久化存储文件中。在实践操作中,多个块通常对应一个日志文件。
2、数据读写方式
再来看一下整个数据读写的交互方式。首先,做了流批复杂的分别,由于流场景和批场景关于数据的可见性和时效性的要求是不齐全分歧的。关于批量回溯的场景,用户并不是宿愿能够马上可见,只是宿愿把这局部数据做好降级和校准而已。
在批量数据降级的场景中,数据会间接写入耐久性存储中,也就是会写入到HDFS上,而不是经过内存。这种方法可以极大地提高咱们的读写吞吐量。关于流式读写的状况,会首先访问BTS之类的内存服务启动读写。这里的重要实施细节是,在写入数据时,咱们会优先写入内存中,会首先写WAL文件以保证容错。在读取数据时,由于内存中的数据不会不时存在,由于cache会活期清算,所以读取时会优先访问内存中的数据。假设发现内存中没有数据,咱们会先加载WAL文件并对其启动预加载,以尽或许地将数据优先加载到内存中,以保证流式读取的时效性。假设发现WAL文件也不存在,或许被清算了,那么咱们就会转而去读取耐久性存储中的日志。在大少数状况下,这是流式读写,整个周期相对较短。因此,在内存中,WAL文件的存储能够做到一周之内的时效性。一周之内的用户都可以反常从内存中消费这局部数据。假设用户宿愿存储长周期的数据,那么他或许须要承当更多的存储老本。咱们须要尽或许防止从HDFS上加载日志文件,这是全体的数据读写方式。
3、BTS架构
刚刚提到的流读的场景,咱们也做了读写的负载分别,会有独自的读集群去承接全体的读流量,来防止它影响写入节点的性能。咱们来看一下全体的BTS架构,BTS首先是Master-Slave架构。Master重要担任一些元数据消息的治理,它的结构与HDFS相似。Table Server以Slave的方式存在,担任数据的读写,并在其上存储由若干个block组织而成的文件。关于Master,它治理的元数据包含Table Server的消息以及block的元数据治理。由于元数据治理的须要,它肯定会引入肯定的负载平衡机制。因此,咱们目前成功了比拟便捷的负载平衡机制,旨在防止某一Table Server内存被打爆的状况。
Table Server重要提供数据读写才干,保养本地的块并活期启动块清算。它异步将某些块刷新到HDFS上,整个数据读写流程是客户端恳求master,失掉须要写入的块,而后找到对应的Table Server启动数据写入。在写入时,它优先写WAL文件,再写内存文件。当数据所有写入并ACK前往后,示意这批数据曾经成功写入,不会再失落。此时触及到数据提交疑问。这局部一致由主节点master去治理事务,其全体的事务机制与Hudi目前的成功相似,即附丽于引擎启动提交。关于Flink来说,每次checkpoint都会触发主节点启动提交。因此,当下游消费这批数据时,假设咱们须要到达秒级数据,则不太或许启动秒级消息源数据的提交。因此,在这局部下游或许会读取一些基于read on committed的数据,所以须要启动去重操作,以确保At least once的语义。以上就是全体的BTS结构的引见。接上去引见落地场景。
三、
咱们的重要落地场景是流式数据计算,它相似于离线数仓,须要启动一些ETL清算和便捷的聚算计算。左侧是全体架构方案,咱们经常使用了基于Hudi加BTS的数据湖方案来交流MQ,从而成功每一层数据表的存储。在维表上,咱们目前仍经常使用KV存储。咱们的目的是交流MQ场景。
原先离线须要将实时表从 MQ dump 成 Hive,去启动后续的离线数仓的相关上班。切换成这套方案后,dump 操作就可以省掉,能够做到流批的数据复用的才干来缩小全体的两边存储的老本。
第二个场景是多维剖析场景,其特点是实时数据荡涤后间接允许看板等实时的OLAP查问。基于批流一体方案结合 Presto 查问来满足业务侧分钟级时延诉求,和秒级查问照应诉求。咱们团队针对Presto启动了许多优化,包含native engine等相关技术,以成功高性能查问。目前,该场景也在现场落地,并取得了不错的收益。另外,由于全体的流批是数据表,存储是一致的,所以不须要额外将其转储为Hive表,也不须要保养离线存储的快照。
第三个场景就是批流复用的日志场景,普通大家直觉上会从 ODS 层切换做批流复用,字节外部,在实时场景会先对接埋点数据, Flink 端去做荡涤,落到实时的存储外面,而后对接看板等下游。在离线数仓上,也会将一切埋点消息存储上去,依据详细的业务场景落成不同的 ODS 表,再去构建离线数仓的义务。
当咱们全体把存储换成 LAS 这套方案之后,只要要保养 ODS 层的数据,就能允许离线和实时两个的场景去启动剖析。
最后是飞书数仓的场景落地,全体链路比拟明晰,分为实时和离线两个链路,这里离线实时链路重要是去做一些人事消息变卦之类的业务。离线链路重要是对一些长周期的疑问去做一些数据的批改,把这局部批改的数据回补到咱们的实时链路外面,让下游的看板数据变得更准确。
在实时数据传输和离线数据处置两个环节中,咱们都驳回了LAS之类的存储来交流底层存储。离线数据处置要求数据的处置期间在10-15分钟内成功,因此用户更习用Spark处置离线数据处置环节。针对此,咱们提供了基于Spark Thrift Server的处置方案,以缩小离线数据处置中每个环节的资源开开放支,维持常驻资源,让用户运转SQL来构建离线数据处置模块。
在实时数据传输环节中,用户原本驳回Kafka构建基础表并经常使用Hbase启动维表构建。由于Hbase对联结主键的允许并不友好,用户每次在读写时都须要去序列化和反序列化主键列,并将复合主键拼装成繁多主键,最后将其写入Hbase中,而后再从Hbase中读取。这样的流程很复杂,且期间耗时较长。
那么除了交流之外,就像之前提到的一样,咱们会交流掉MQ或Kafka这样的组件,并且咱们还用其余存储代替了Hbase。咱们的重要目的是成功基于Flink的lookup join配置,并将小表间接加载到内存中,以提高lookup join的性能。因此,在实时链路这一块,咱们曾经交流了一切的存储组件。
最后分享一下未来布局。首先,咱们会探求更多业务场景,大规模落地更多形式。其次,在技术迭代方面,咱们会对负载平衡和分别做出优化。负载分别在BTS外部(即内存服务外部)会针对一些小组件启动优化,比如将WAL文件刷入耐久化存储(如HDFS),这局部资源占用比拟高,须要分别处置。另一方面,咱们会愈加粗疏化地对读写负载分别和内存服务负载平衡启动优化。此外,咱们还会成功更精细的流批负载分别。第三点是查问优化,咱们会结合索引的才干,在内存层和全体存储方案中经过构建索引优化块的数据结构来减速查问性能,包含点查和联结查问等。最后一点是与native engine的集成,以优化全体的读写速度。在这方面,咱们须要对底层的log、block和parquet文件启意向量化处置。
五、
Q:Hudi允许流式的写入与降级,那Kafka能否可以被取代了?
A:我不确定是指社区的Hudi还是我分享的Hudi。所以我谈一下我分享的整套方案,但我以为咱们目前在某些方面还有短少。比拟艰巨的是如何成功Kafka的exactly once语义。
Q:LAS允许的组件索引和二级索引是什么样的索引结构?
A:实践上,这局部重要是社区原生成功,包含咱们外部的成功也大局部曾经奉献到社区了。就组件而言,社区成功是基于哈希去启动分桶,并同时记载一些布隆过滤器(Bloom filter)。关于二级索引,社区目前正在启动迭代,但并没有齐全合入。
Q:经常使用BTS 可以减速 Flink 写入 Hudi的性能吗?
A:会。时效性优化很多。由于第一个是咱们写内存,第二个是整个数据结构会相对 Hudi 来说会比拟轻量一些。
Q:BTS 与Hudi区分实用的运行场景。BTS 与Hudi的详细相关。
A:首先关于咱们来说,咱们全体的这套方案叫LAS, LAS 底层是基于 Hudi 去做的成功,为了允许流批一体存储,咱们在Hudi上加了一层内存缓存层BTS,结合查问引擎,一整套方案称之为 LAS。由于 BTS 是基于 Hudi 上架了一层,所以 BTS 全体的逻辑会跟 Hudi 强相关,它我两者之间的交互重要就是内存文件到耐久化存储两边的一些交互,其余的大局部的设计会沿用Hudi的局部的逻辑,比如说咱们会经过TSM, Table Service Management.去做一些比拟优化的服务,比如说 Hudi 的compaction, BTS 的 WAL 的clean,就这些操作。
Q:LAS怎样保证查问数据的准确性?
A:咱们允许的语义是 At least once,就是说首先用户能够在数据外面加业务字段来判别哪条数据是最新的At least once,然而这条数据有或许会被写入屡次,所以用户下游须要做一些去重的操作。BTS 数据怎样保证肯定是写出去的?首先咱们会写 WAL 文件,就是 WAL 文件是耐久化存储上的文件,当文件写完之后咱们才会,咱们会一边写 WAL文件一边写内存的数据,只要写到 WAL 文件才会写内存的数据,那全体都成功了之后才会前往给 client ACK,否则, client 会以为这次提交失败了,会从新去往里写,所以全体上分歧性是不会有太大的疑问的。