实时处理技术,是强调当前处理状态的一门技术,所以当两个对立的方案重叠在一起的时候,它注定不是用来解决一个比较广泛问题的一种方案。于是,我们把实时数仓建设的目的定位为解决由于传统数据仓库数据时效性解决不了的问题。
由于这个特点,我们给定了两个原则:
- 传统数仓能解决的问题,实时数仓就不解决了。比如上个月的一些历史的统计,这些数据是不会用实时数仓来建设的。
- 问题本身就不太适合用数仓来解决,也不用实时数仓解决。比如业务性很强的需求,或者是对时效性要求特别高的需求。这些需求我们也不建议通过实时数仓这种方式来进行解决。
离线数仓和实时数仓的区别在于离线数据仓库是一个保存历史累积的数据,而我们在建设实时数仓的时候,我们只保留上一次批处理到当前的数据。这个说法非常的拗口,但是实际上操作起来还是蛮轻松的。
通常来讲解决方案是保留大概三天的数据,因为保留三天的数据的话,可以稳定地保证两天完整的数据,这样就能保证,在批处理流程还没有处理完昨天的数据的这段间隙,依然能够提供一个完整的数据服务。
- 实时olap分析
OLAP 分析本身就非常适合用数仓去解决的一类问题,我们通过实时数仓的扩展,把数仓的时效性能力进行提升。甚至可能在分析层面上都不用再做太多改造,就可以使原有的 OLAP 分析工具具有分析实时数据的能力。
- 实时数据看板
这种场景比较容易接受,比如天猫双11的实时大屏滚动展示核心数据的变化。实际上对于我们公司来讲,更多用于BI层面,支持市场人员以及领导层的决策。
- 实时特征
实时特征指通过汇总指标的运算来对某一行为结果或者用户标记上一些特征。比如多次购买商品的用户后台会判定为优质用户。 广告投入带来的流量,在做实时精准推广方面也是急需要支持。
- 实时业务监控
同时也会对一些核心业务指标进行监控,比如说当线上出现一些问题的时候,可能会导致某些业务指标下降,我们可以通过监控尽早发现这些问题,进而来减少损失。
- 编程方式
离线开发最常见的方案就是采用 Hive SQL 进行开发,然后加上一些扩展的 udf 。映射到实时数仓里来,我们会使用 Spark SQL ,同样也是配合 udf 来进行开发。
- 作业执行层面
离线处理的执行层面一般是 MapReduce 或者 Spark Job ,对应到实时数仓就是一个持续不断运行的 Spark Structure Streaming 的程序。
- 数仓对象层面
离线数仓实际上就是在使用 Hive 表。 对于实时数仓来讲,我们对表的抽象是使用 Stream Table 来进行抽象。
- 物理存储
离线数仓,我们多数情况下会使用 HDFS 进行存储。 实时数仓会分成两部分,数据量比较大的存储到Kudu,数据量比较小的则会采用像 Kafka 这样的消息队列来进行数据的存储。
实时数仓的数据架构会跟离线数仓有很多类似的地方。比如分层结构;比如说 ODS 层,明细层、汇总层,乃至应用层,它们命名的模式可能都是一样的。尽管如此,实时数仓和离线数仓还是有很多的区别的。
(1) 跟离线数仓主要不一样的地方,就是实时数仓的层次更少一些
以建设离线数仓的经验来看,数仓的第二层远远不止这么简单,一般都会有一些轻度汇总层这样的概念,其实第二层会包含很多层。另外一个就是应用层,以往建设数仓的时候,应用层其实是在仓库内部的。在应用层建设好后,会建同步任务,把数据同步到应用系统的数据库里。
在实时数仓里面,所谓 APP 层的应用表,实际上就已经在应用系统的数据库里了。上图,虽然画了 APP 层,但它其实并不算是数仓里的表,这些数据本质上已经存过去了。
- 为什么主题层次要少一些?
是因为在实时处理数据的时候,每建一个层次,数据必然会产生一定的延迟。
- 为什么汇总层也会尽量少建?
是因为在汇总统计的时候,往往为了容忍一部分数据的延迟,可能会人为的制造一些延迟来保证数据的准确。比如,统计事件中的数据时,可能会等到 10:00:05 或者 10:00:10再统计,确保 10:00 前的数据已经全部接受到位了,再进行统计。所以,汇总层的层次太多的话,就会更大的加重人为造成的数据延迟。
所以建议尽量减少层次,特别是汇总层一定要减少,最好不要超过两层。明细层可能多一点层次还好,会有这种系统明细的设计概念。
(2) 第二个比较大的不同点就是在于数据源的存储
建设离线数仓的时候,可能整个数仓都全部是建立在 Hive 表上,都是跑在 Hadoop 上。但是,在建设实时数仓的时候,同一份表,我们甚至可能会使用不同的方式进行存储。
比如常见的情况下,可能绝大多数的明细数据或者汇总数据都会存在 Kafka或者kudu 里面,但是像维度数据,可能会存在像 HBase 这样的 kv 存储的系统中,实际上可能汇总数据也会存进去。
- 数据来源尽可能统一
- 利用分区保证数据局部有序
- 确保针对流式小文件的异步合并
首先第一个建设要点就是 ODS 层,其实 ODS 层建设可能跟仓库不一定有必然的关系,只要使用 Spark Streaming 开发程序,就必然都要有实时的数据源。目前主要的实时数据源是消息队列,如 Kafka。而我们目前接触到的数据源,主要还是以 binlog、SDK数据和埋点日志为主。
以总结的经验来看,有三点需要强调:
首先就是数据源要统一
- 第一个是实时的数据源要自己统一,比如交易数据要不从binlog里来接,要不从SDK发送过来,不能存在两部分发送的情况。要不然对数仓数据的质量存在着潜在的威胁。
- 第二个统一是指实时和离线的统一。不管离线还是实时,计算逻辑和数据来源要完全一致,不能让使用方对数据产生误解。
另外一点数据乱序的问题
在采集数据的时候会有一个比较大的问题,可能同一条数据,由于分区的存在,这条数据先发生的状态后消费到,后发生的状态先消费到。解决起来很简单,利用kafka的分区局部有序的机制就可以了。
最后一点是流式写HDFS时的小文件问题
Spark Structure Streaming准确讲是微批次处理,最初是直接写到HDFS上的,结果就是会不断产生小文件。那么要保证有程序在不断异步的去处理这些小文件,合并同时移动到表中。处理的时候要注意避开正在写的文件,否则直接导致实时任务的失败。
解决原始数据中数据存在噪声、不完整和数据形式不统一的情况。形成规范,脱敏,统一的数据源。如果可能的话尽可能和离线保持一致。
模型规范化
在我看来,dw层最主要就是一个模型规范化问题。规范每个公司或者团队都有,但是否能落实到实际开发中是一个重点。
在实时的数仓建设当中,我们要特别强调模型的规范化,是因为实施数仓有一个特点,就是本身实时作业是一个 24 小时调度的状态,所以当修改一个字段的时候,可能要付出的运维代价会很高。 在离线数仓中,可能改了某一个表,只要一天之内把下游的作业也改了,就不会出什么问题。但是实时数仓就不一样了,只要改了上游的表结构,下游作业必须是能够正确解析上游数据的情况下才可以。
另外使用像 kafka 这样的系统,它本身并不是结构化的存储,没有元数据的概念,也不可能像改表一样,直接把之前不规范的表名、表类型改规范。要在事后进行规范代价会很大。所以建议一定要在建设之初就尽快把这些模型的规范化落地,避免后续要投入非常大的代价进行治理。
数据漂移问题
这个问题是由上面异步合并移动小文件的程序导致的。因为小文件移动肯定是按照物理时间,并且程序调度有时间间隔,出现的问题就是昨天临近24点的业务数据可能会被移动到今天,也就是所谓的数据漂移。那这个数据的处理就是在dw层的解析逻辑上,分区范围扩大,选择业务时间为昨天,这样就能把漂移的数据过滤掉。
将实时数据写入 Hive,使用离线数据持续验证实时数据的准确性。
当建设完一个数仓之后,尤其是第一次建立之后,一定会非常怀疑自己数据到底准不准。在此之前的验证方式就是人为的去仓库里去查,然后来看数据对不对。在后续的建设过程中我们发现每天这样人为去对比太累了,而且耗时。
我们就采取了一个方案,把中间层的表写到 Hive 里面去,然后利用离线数据丰富的质量验证工具去对比离线和实时同一模型的数据差异,最后根据设定的阈值进行监控报警。这个方案虽然并不能及时的发现实时数据的问题,但是可以帮助你在上线前了解实时模型的准确程度。然后进行任务的改造,不断提高数据的准确率。另外这个方案还可以检验离线数据的准确性。