storm应用从数据源读取数据

有赞使用storm已经有将近3年时间,稳定支撑着实时统计、数据同步、对账、监控、风控等业务。订单实时统计是其中一个典型的业务,对数据准确性、性能等方面都有较高要求,也是上线时间最久的一个实时计算应用。通过订单实时统计,描述使用storm时,遇到的准确性、性能、可靠性等方面的问题。

 

订单实时统计的演进

 

第一版:流程走通

 

在使用storm之前,显示实时统计数据一般有两种方案:

 

在数据库里执行count、sum等聚合查询,是简单快速的实现方案,但容易出现慢查询。

 

在业务代码里对统计指标做累加,可以满足指标的快速查询,但统计逻辑耦合到业务代码,维护不方便,而且错误数据定位和修正不方便。

 

既要解耦业务和统计,也要满足指标快速查询,基于storm的实时计算方案可以满足这两点需求。

 

一个storm应用的基本结构有三部分:数据源、storm应用、结果集。storm应用从数据源读取数据,经过计算后,把结果持久化或发送消息给其他应用。

 

第一版的订单实时统计结构如下图。在数据源方面,最早尝试在业务代码里打日志的方式,但总有业务分支无法覆盖,采集的数据不全。我们的业务数据库是mysql,随后尝试基于mysql binlog的数据源,采用了阿里开源的canal,可以做到完整的收集业务数据变更。

 

在结果数据的处理上,我们把统计结果持久化到了mysql,并通过另一个后台应用的RESTful API对外提供服务,一个mysql就可以满足数据的读写需求。

 

为了提升实时统计应用吞吐量,需要提升消息的并发度。spout里设置了消息缓冲区,只要消息缓冲区不满,就会源源不断从消息源canal拉取数据,并把分发到多个bolt处理。

 

第二版:性能提升

 

第一版的性能瓶颈在统计结果持久化上。为了确保数据的准确性,把所有的统计指标持久化放在一个数据库事务里。一笔订单状态更新后,会在一个事务里有两类操作:

 

订单的历史状态也在数据库里存着,要与历史状态对比决定统计逻辑,并把最新的状态持久化。storm的应用本身是无状态的,需要使用存储设备记录状态信息

 

当大家知道实时计算好用后,各产品都希望有实时数据,统计逻辑越来越复杂。店铺、商品、用户等多个指标的写操作都是在一个事务里commit,这一简单粗暴的方式早期很好满足的统计需求,但是对于update操作持有锁时间过长,严重影响了并发能力。

 

为此做了数据库事务的瘦身:

 

去除历史状态的mysql持久化,而是通过单条binlog消息的前后状态对比,决定统计逻辑,这样就做到了统计逻辑上的无状态。但又产生了新问题,如何保证消息有且只有处理一次,为此引入了一个redis用于保存最近24小时内已成功处理的消息binlog偏移量,而storm的消息分发机制又可以保证相同消息总是能分配到一个bolt,避免线程安全问题。

 

统计业务拆分,先是线上业务和公司内部业务分离,随后又把线上业务按不同产品拆分。这个不仅仅是bolt级别的拆分,而是在spout就完全分开

 

随着统计应用拆分,在canal和storm应用之间加上消息队列。canal不支持多消费者,而实时统计业务也不用关系数据库底层迁移、主从切换等维护工作,加上消息队列能把底层数据的维护和性能优化交给更专业的团队来做。

 

热点数据在mysql里做了分桶。比如,通常一个店铺天级别的统计指标在mysql里是一行数据。如果这个店铺有突发的大量订单,会出现多个bolt同时去update这行数据,出现数据热点,mysql里该行数据的锁竞争异常激烈。我们把这样的热点数据做了分桶,实验证明在特定场景下可以有一个数量级吞吐量提升。

 

最终,第二版的订单实时统计结构如下,主要变化在于引入了MQ,并使用redis作为消息状态的存储。而且由最初的一个应用,被拆成了多个应用。