城的灯

海量数据下的精准营销工程实践

背景

精准营销从被提出到广泛运用,伴随着互联网技术飞速发展,尤其是大数据和人工智能技术的日趋成熟。精准营销也成为每一家互联网企业必然绕不开的课题,围绕它展开的业务不胜枚举。通常营销过程包含如下三部分:
① 营销活动前:确定营销的目标,
② 营销活动中:效果监控和跟踪,
③ 营销活动后:效果评估和优化。
本文主要分享笔者在精准营销工程化方面做的一些尝试和实践,侧重点在营销目标的确定。


现状

营销目标就是我们传统意义的圈人,筛选出一批跟营销计划匹配的人。最初圈人逻辑是通过标签圈人,后续又加入了算法圈人,这两种模式的数据核心都基于离线数仓,按天更新。这两种方式在初期能够满足业务需求,但是随着业务发展及精细化运营的需求,该模式短板越来越明显,例如转化率的提升越来越难、实时性不高、灵活度不够等。希望有一套既能兼容现有离线链路,同时又能引入实时特征的方案。


挑战

离线链路侧,每个签约品牌每天会基于MaxCompute,通过协同过滤的方式从经济体的数亿用户中召回和品牌群像具有相似度的人,粗召回人数大品牌数亿中小品牌数千万。通过粗召回的数据量乘以签约品牌数,就可以很轻易的预估出离线召回的数据量在百亿级别,并且随着签约品牌的新增该数据量还会增加。实时链路侧,用户的实时特征,例如交易特征、LBS特征、商品加购特征、店铺点击特征等,每天特征总量也维持在几十亿级别,并且特征会随着业务的发展越来越多。
面对如此庞大的数据量,每条数据多耗时一毫秒,那么在百亿量级下就会放大到数千小时;一条数据多1bit,百亿就会放大到1G。所以特征数据如何传递、处理、存储都需要精心设计,否则带来的不但是程序性能上的急剧下降,而且还会导致计算和存储成本的飞速飙升,这便是我们在整个系统建设过程中遇到最大挑战。


方案

通常召回策略有很多种,但是最常用的是基于内容匹配的召回和基于协同过滤的召回。基于内容配置的召回通常分为基于特征匹配和基于知识匹配,通常这种召回率较高,准确率低,适合冷启动环境。基于协同过滤的召回通常分为基于人、物、模型三种,通常这种召回准确度较高,但是存在冷启动问题。
我们的定位是离线协同过滤(模型)+离线特征+实时特征的多维度圈人,这样既整合了已有的成果,又引入了在线实时特征。由于离线协同过滤已经完成了基于品牌维度人的粗召回,再结合离线特征和实时特征便可以实现在线精召回。如果遇到冷启动问题,我们也可以利用已有的离线特征&实时特征并结合品牌信息通过知识匹配来解决。
整体的方案详见下图:
promodws
上图不但描述了整个数据流程,而且也包含了部分技术选型,每个部分的职责如下。
离线模块基于MaxCompute,绝大部分算法逻辑和所有的模型训练都在此完成,所有的离线特征都在此产出,数据通过OSS文件传递给上游。实时模块基于BLink,负责过滤和粗加工流式数据,数据分为平台数据和私域数据,所有的实时特征都在此产出,数据通过Kafka消息传递给上游。数据服务模块基于Java工程,其一是将离线和实时产出的特征写入Elasticsearch,其二是根据召回策略从Elasticsearch中查出符合规则的人再经过逻辑处理(算法二次打分/排序过滤)之后返回以供投放。效果分析模块复用底层的离线和实时能力,不仅仅是对投放效果跟踪分析,而且还为上层的投放决策和A/B Test提供数据支撑。决策服务是一个虚拟层,投放平台将营销活动的召回策略和投放策略告诉给数据服务,数据服务将召回的人按照投放决策来进行投放,可能会先试投一批人,如果转化率不够理想,投放决策就会停止后续的投放,从而调整召回决策或者算法逻辑然后再次尝试,直至达到预期目标。投放平台是整个产品的门面,商户运营人员都工作在此。


细节

多租户

SaaS离不开租户(品牌)的概念,不论是资源的分配、数据的安全,还是技术层面的分库分表。营销是需要成本的,特别是精准营销,需要大量的算力和存储来支撑;其次特定租户的营销具有低频、周期性的特点,让租户按需申请和结算,这样就降低了使用成本。如果租户没有营销活动时,我们还占用大量的资源去计算和存储,那么肯定会增加租户的沉没成本。当某一个品牌确定好营销时间段,我们会在营销活动前两天开始预热数据,Elasticsearch、Lindorm、Redis、MaxCompute、OSS、BLink等资源才开始真正被使用,这样就能做到按算力和存储的实际使用情况来收费。

OLAP引擎

OLAP引擎我们选择了ES(Elasticsearch),Elastic Stack在大数据领域的优异表现大家有目共睹,选择Elasticsearch作为我们多维OLAP引擎最主要的原因分如下几点。

Index templates

由于多租户的特性,商户营销前系统会为其分配好ES,这个可以理解为分配库。库分配好之后还需要表,表对应的就是ES的索引,这个时候就需要有一套自动建表逻辑,这个自动建表逻辑就可以通过Index templates来实现。Template一般会包含如下信息,表分多少张(shards)、数据保存几份(replicas)、表的生命周期(lifecycle)、表中字段以及字段类型等诸多信息(dynamic field mapping)。有了Index templates就将索引创建逻辑简化到一行代码,创建索引的时候ES会自动根据需要创建的索引名称选择匹配template来创建索引。

Dynamic field mapping

Schema-free的爽,大家或多或少都应该感受过。随着业务发展,表需要加字段这种情况估计大家都已经麻木了,尤其在数据分析领域,加字段不是一个两个的加而是几十几十的加。ES原生支持字段的自动创建及类型映射,字段类型是通过字段名来完成匹配。通过Dynamic field mapping + Index templates,便可以完全动态化的实现建表和加字段,极大提升了生产力和灵活度。

Nested datatype

传统的数据库里面,数据关系无外乎就三种,一对一、一对多和多对多。如果有数据有关联关系,通常通过主外键来建立数据联系,查询的时候通过join来获取完整信息。传统的关系型数据库中,很难做到一行记录中存储某个实体以及附属的信息。但是ES通过Nested数据类型提供了这种能力,它是NoSQL的一种高级特性。不但可以查询实体的时候直接带出附属信息,而是还可以在多个附属信息中查询满足条件的附属信息然后带出实体信息。例如用户(实体)一天内会有多个不同时间点的Geo Point(附属信息),一个签约品牌需要针对旗下的某几个门店做一场营销活动,需要选出最近3小时活跃在这些门店周边N公里的用户,不同的门店的筛选距离可能还不一样,nested type就能很完满的解决该问题。


特征存储

离线特征为什么不直接通过数据集成输出到ES?

首先是算法产出特征名具有业务意义所以很长,特征字段名需要映射为正确类型的短字段名;其次是字段值也需要转化,需要转化为符合字段类型的值;最后是复杂的索引管理,数据处理完成之后还有各种针对索引的操作(alias管理、index切换、replica设置等)。这样做可以以最低成本来存储及传输,因为任何小的优化在海量数据下都会被放大,而且通过代码可以更加灵活的控制处理逻辑。

离线特征产出存储我们选择了OSS,而不是输出到Lindorm/ADB等?

不选择Lindorm和ADB等DB的原因,首先是不论写入还是读取所付出的性能损耗还是存储成本都要比OSS大的多,毕竟数据库需要维护自己特有的一套数据结构来支持数据管理;其次是业务并不需要查询,需要的是每一个Brand前一个周期算好的全量离线特征。所以我们直接通过MaxCompute的输出到外表(OSS),BrandId+DateTime作为分区输出即可。需要注意的是品牌太多可能会导致分区数超过MaxCompute的分区限制,可以通过每次跑一部分品牌,多次输出来解决。MaxCompute在输出OSS的时候,数据格式建议使用orc/parquet,tsv/csv半结构化数据膨胀非常厉害,它们之间存储大小的差异是数量级的。如果业务需要用tsv/csv,可以采用gzip压缩,从而来优化文件大小。

实时特征为什么需要输出到Kafka?

主要原因就是Kafka相比RocketMQ/RabbbitMQ更适合大批量数据的写入,在大批量数据写入方面RocketMQ/RabbitMQ性能是不及Kafka的,RocketMQ/RabbitMQ更多的是关注在消息的安全/事务等特性。


数据服务

存储服务

职责
  • 将离线/实时特征索引到ES
  • 存储最近24小时的实时特征
索引切换流程

为了简化索引生命周期管理以及提升索引性能,每当新的离线特征生成之时,我们会用新的离线特征生成一份新的索引,最后当新索引完成之后替换掉老索引。为了实现整个数据处理过程和索引切换过程业务方式无感知,ES别名(Alias)起到了关键性的作用,正是由于它的存在才让这一切变的成本低廉。为了清晰的阐述整个数据索引流程,我们拿某一个品牌的分片索引切换来说明,见下图:
index_switch_flow
user_feature_brand1 业务查询所用别名
user_feature_brandId1_partition1_u 实时特征更新更新所用别名
user_feature_brandId1_partition1_i 离线特征写入和备份实时特征更新所用别名

流程如何编排?

MaxCompute离线特征生成之后,存储服务会查询出每个ES集群所支持的品牌,并根据其优先级排好序。然后每个ES集群只为优先级最高的品牌开启一个ScheduleX网格计算任务,每个ScheduleX任务会根据品牌的OSS文件来划分子任务,上一个做完再做下一个。一个集群只跑一个品牌,主要是防止ES压力过大导致在线服务的不可用。使用ScheduleX作为计算框架是为了简化子任务的分发、重试、并发控制等逻辑,这样不但可以充分利用分布式的算力而且还让我们从复杂的状态管理中抽身而出,聚焦在主任务的编排上。

为什么要保存最近24小时的实时特征?

虽然实时特征会实时写入到了ES之中,但是每次新的离线特征产出之后会触发新索引创建,新的索引中是没有实时特征的,所以我们需要使用备份的实时特征来更新到新的索引中。

为什么需要每次都重建索引?

不拿新的离线特征直接去更新已有的索引呢,其一是因为更新索引比重建新索引的要慢很多;其二要更新就需要对比新老数据,从而知道哪些数据变化了、哪些数据删除了、哪些数据新增了,会让MaxCompute任务逻辑非常复杂。

如何控制实时特征如何保存/如何控制生命周期?

海量实时特征的存储,Lindorm是最佳的存储介质。它相较于传统DB,一是存储/扩容成本的优势,二是宽表/动态列的支持,三是数据精细的生命周期控制。任何事情都不是完美的,使用时特别要注意rowkey的设计,否则会产生数据倾斜、数据热点和查询受限等诸多问题。

如何实现新旧实时特征融合及乐观锁控制?

用备份的旧实时特征去更新新索引,但新的实时特征却还在不断的流入;其次消息乱序,新数据先到老数据后到;如何保证老数据不会覆盖新数据?该问题分两个维度来讨论,首先是ES层面的乐观锁控制,其实是Lindorm层面的乐观锁控制。ES层面,在开始读取备份实时特征之前,首先通过ES的Alias来开启新时特征新老索引的双写,然后再读取备份的实时特征去更新新的索引,也就是说此时新老数据都在同时更新新的索引。非时间型的实时特征在ES中会保存为两个字段,一个是特征值一个是特征时间,通过特征时间来做乐观锁控制,通过ES的Update By Query API来更新。Lindorm层面的乐观锁控制也比较简单,通过指定时间戳写入便可以实现乐观锁控制,老的时间戳是不能覆盖新的时间戳数据的。

Update By Query命中率不高,很多无效的Query如何解决?

为了高效的插入,我们没有做_id层面的去重,所以更新的时候需要先查再更新。Update By Query相较于纯Update会多一次查询,别小看多的这一次查询,因为哪怕每条记录更新一毫秒,当乘以十亿的时候这就是相当恐怖的一个延迟了,并且这还是不考虑到一个人属于多个品牌的情况。这还不是问题的关键,问题的关键是不知道某个用户是否在某个品牌下,只有做一次查询才知道,如果存在就更新,如果不在就忽略,忽略的这次查询就是无效成本。例如当某个用户的经纬度发生了变化时,需要更新所有品牌下该用户的经纬度特征。
由于我们并不知道该用户是否在某个品牌下,所以对于中小品牌Update By Query会发现大量的Query少量的Update,出现这种情况是由于用户被算法粗筛排除在该品牌之外。大量无效的查询加重了ES的负载,是否存在成本更为低廉、性能更优的方式来替代ES查询,在更新之前就知道该用户是否存在于该品牌之下呢?布隆过滤器理所当然的成为了首选方案,为了解决分布式的需求,我们选择了阿里云的Redis布隆过滤器实现,不过需要注意的是受限于Redis单机4G内存的限制,如果存在空间需求大于2G的布隆过滤器,最好提前做好分片逻辑规划。有了布隆过滤器,在用品牌的离线数据创建索引的时候将用户的ID写入到品牌的布隆过滤器中,在后续的更新时,先去布隆过滤器中判断该用户是否存在,存在的话我们再去做Update By Query。

其它

存储服务是整个设计中难点所在。存储服务最大的瓶颈又在ES,为了获得极致的性能,当然离不开ES常规的性能优化,例如调大refresh_interval/translog .flush_threshold_size/translog.sync_interval、调低number_of_replicas、translog.durability同步改异步、bulk操作(client版本需大于7.5.0,否则死锁)等。性能只是众多技术细节中的一小部分,还有品牌优先级控制、索引模板设计、索引管理、布隆过滤器管理、Lindorm表结构设计、ES算力水位控制、数据生命周期管理等,都需要细致的考虑和设计。

召回&排序

该服务主要分为查询场景管理和查询结果算法排序两部分。不同的查询场景就是不同的Query DSL,通过JEXL/Java Template来提供动态DSL能力。算法排序算子是通过一套规范的接口设计,让算法可以通过plugin的形式集成到业务逻辑中。

为什么不用ES默认的JSON动态模板?

JSON格式的DSL动态模板,只能支持简单的参数动态化,没有条件表达式(可选条件)及循环表达式(动态参数),而JEXL/Java Template却能完美的解决该问题。根据业务层传递的搜索条件,执行条件判断及动态参数赋值从而最终生成用于ES搜索用的DSL脚本。

为什么还需要过滤/排序算子?

特殊情况下,通过ES召回的还需要再进行一次过滤/精排,这个时候需要跟算法同学共建,让算法同学的算法能够运行在工程代码之上,所以抽象出了一套过滤排序算子规范。


效果分析

效果分析也分为实时和离线两条链路,实时链路主要用来做A/B Test和投放决策,离线链路主要负责产出投放效果报表所需的各种数据。好的效果分析离不开完善的埋点策略和数据采集工具支撑,否则就会出现数据映射偏差。


总结

上述所讲还是偏一些技术细节,为什么花大量的篇幅来阐述这些细节,因为任何一个小的缺陷在海量数据下都会放大。当然还有很多没有说到或者说明白,例如资源如何实现动态分配,效果分析如何影响投放决策等,后续有机会会一一补上。