Flink的RocksDB状态后端在vivo应用中,如何优化处理?

摘要:本文简要介绍了特征拼接在实时推荐中的重要作用,并讲述了vivo实时推荐系统中特征拼接模块的架构演进过程以及采用现有的“基于RocksDB的大状态解决方案”的原因,重点叙述了该方案所遇到的一系列问题,并给出了这些问题的现象以及解决方案。
作者: 互联网大数据团队- Chen Rui 本文简要介绍了特征拼接在实时推荐中的重要作用,并讲述了vivo实时推荐系统中特征拼接模块的架构演进过程以及采用现有的“基于RocksDB的大状态解决方案”的原因,重点叙述了该方案所遇到的一系列问题,包括TM Lost、RocksDB性能调优门槛高、TM初始化慢、状态远程存储HDFS RPC飙高等,并给出了这些问题的现象以及解决方案。 1分钟看图掌握核心观点👇 一、背景 在推荐系统中,样本拼接是衔接在线服务与算法模型的重要一个环节,主要职责是样本拼接和业务相关的ETL处理等,模块位置如下图红框所示。 推荐系统通过学习埋点数据来达到个性化精准推荐的目的,因此需要知道服务端推荐下发的内容,是否有一系列的行为(曝光,点击,播放,点赞,收藏,加购等等),把被推荐内容的埋点数据与当下的特征拼接起来的过程,一般称为样本拼接,一个简化的流程如下: 推荐的过程可以检验概括为以下几点: 后台服务rank 推荐内容给app客户端,同时把内容对应的特征快照保存起来; app接收到内容后,埋点日志被上报到消息中间件; 样本拼接负责将特征与埋点日志拼接起来,定义正负样本,格式转换; 模型接收样本训练,将使用最新的模型做推荐。 为了保证较高的拼接率和稳定性,我们的拼接架构也经过了长时间的迭代,这篇文章我将给大家介绍vivo特征拼接架构的发展历程、当前方案、当前方案遇到的问题和解决方案,以及未来的规划和展望,希望能帮助到业内的同学。 二、拼接方案选型 2.1 小时粒度拼接 小时拼接是将埋点日志和特征快照都保存到Hive并以小时分区,每小时调度一个Spark任务来处理两个表相应分区的数据做拼接,由于是小时拼接,实时性较低,Spark作业本身也依赖于上游Hive表小时分区生成,每个小时末尾的请求埋点有可能是落在当前小时,也有可能落在下个小时。举个例子:19点50分下发了一个视频,客户端在19:59分点击了,但是视频播放却是在20点03分完成的,这个时候就会存在拼接不上的问题。 2.2 基于 Redis 的流式拼接 为了提升拼接率,且达到实时拼接,节点故障容灾,完备监控等特性,Flink是一个很好的替代方案,也是最近几年比较主流的实现。最初在实时推荐场景中,Kafka中的特征快照通过Flink任务写入到Redis,另一个Flink任务消费曝光埋点数据和点击埋点数据并读取存在Redis中的特征快照数据做拼接,拼接后的数据作为拼接特征被写入到下游的Kafka中,提供给后续的算法做模型的训练,架构图如下: 经过一段时间实践,以上的方案出现了两个痛点: Redis中存储了几十T的数据,Redis的成本高; 业务数据流量会波动,经常需要DBA对Redis集群进行扩容,涉及大量数据的迁移,运维成本高。 2.3 基于 RocksDB 大状态流式拼接 为了解决基于Redis的作为中间数据的存储存在的问题,我们采用Flink状态来存储特征快照,整个架构中不再需要外部的Redis,由于我们需要存储的数据量达几十T,这里我们选用适合大数据量存储的RocksDB类型的状态后端,调整后架构更加简洁,如下图所示: 流程如下: 首先将曝光流点点击流以及特征在Flink 任务中做union并做keyby; 在processElement方法中如果接收到曝光流就将数据保存到state中,如果接收到曝光流就将数据保存到state中,如果接收到特征就去state中查询相应的曝光和点击数据; 如果能找到就发送到下游并将状态数据清理掉,没找到就将特征保存到state中,并注册一个定时器; 定时器触发时去state中查询相应的曝光和点击数据,如果找到就发到下游,并将状态数据清理掉。 由于RocksDB可以同时利用内存和磁盘来存储数据,所以对于内存的使用量大幅下降,由于RocksDB是嵌入式的数据库,每个TM上的RocksDB数据库只存储shuffe到该TM上的数据,无需再关注扩缩容的问题。当然随着数据上涨,Flink流式拼接在实际的生产过程中也遇到了一系列的问题,为了保证业务的可用性,我们花了较长的时间对这些问题进行攻克,目前任务稳定性达到99.99% ,拼接率长期稳定在99%以上,对拼接效果提升较大。下面我将列举我们遇到的问题和解决方案,希望能够帮助到业内的其他团队。 三、问题及解决方案 3.1 TM Lost问题 3.1.1 现象 在方案实施之初,我们发现这些特征拼接的任务频繁出现TM was Lost异常导致任务重启,我们看了日志,发现都是TM内存超出了YARN的内存限制被kill。
阅读全文