在线机器学习与离线相比,在模型更新的时效性,模型的迭代周期,业务实验效果等方面有更好的表现。所以将机器学习从离线迁移到在线已经成为提升业务指标的一个有效的手段。
为何选择 Flink 来做在线的样本生成?
如何实现?
读取日志后,对数据做格式化,并且过滤掉不需要的字段和数据。
指定样本 join 的 key。例如:用户 id 和 内容 id 作 key。
输出的数据格式一般为 tuple2(K,V),K:参与 join 的 key。V:样本用到的字段。
使用 Flink 的 union 操作,将多个输入流叠加到一起,形成一个 DataStream。
为每个输入流指定一个可以区分的别名或者增加一个可以区分的字段。
对 join 的 key 做 keyby 操作。接上例,表示按照用户 id 和内容 id 对多个数据流做 join。
如果 key 存在数据倾斜的情况,建议对 key 加随机数后先聚合,去掉随机数后再次聚合。
定义一个Value State。
keyby后的process方法中,我们会重写processElement方法,在processElement方法中判断,如果value state为空,则new 一个新的state,并将数据写到value state中,并且为这条数据注册一个timer(timer会由Flink按key+timestamp自动去重),另外此处我们使用的是ProcessingTime(表示onTimer()在系统时间戳达到Timer设定的时间戳时触发)。如果不为空则按照拼接的策略,更新已经存在的结果。比如:时间窗口内 用户id1,内容id1的第一条日志数据没有点击行为,则这个字段为0,第二条点击数据进入后,将这个字段更新为1。当然除了更新操作,还有计数、累加、均值等各种操作。如何在process里区分数据是来自曝光还是点击呢,使用上面步骤③定义的别名。
重写onTimer方法,在onTimer方法中主要是定义定时器触发时执行的逻辑:从value state里获取到存入的数据,并将数据输出。然后执行state.clear。
样本从窗口输出的条件有2个:第一,timer到期。第二,业务需要的样本都拼接上了。
public class StateSampleFunction extends KeyedProcessFunction<String, Tuple2, ReturnSample> {
/**
* 这个状态是通过过程函数来维护,使用ValueState
*/
private ValueState state;
private Long timer = null;
public StateSampleFunction (String time){
timer = Long.valueOf(time);
}
@Override
public void open(Configuration parameters) throws Exception {
// 获取state
state = getRuntimeContext().getState(new ValueStateDescriptor<>("state", TypeInformation.of(new TypeHint< ReturnSample >() {})));
}
@Override
public void processElement(Tuple2value, Context context, Collector< ReturnSample > collector) throws Exception {
if (value.f0 == null){
return;
}
Object sampleValue = value.f1;
Long time = context.timerService().currentProcessingTime();
ReturnSample returnSample = state.value();
if (returnSample == null) {
returnSample = new ReturnSample();
returnSample.setKey(value.f0);
returnSample.setTime(time);
context.timerService().registerProcessingTimeTimer(time +timer);
}
// 更新点击数据到state里
if (sampleValue instanceof ClickLog){
ClickLog clickLog = (ClickLog)values;
returnSample =(ReturnSample) clickLog.setSample(returnSample);
}
state.update(returnSample);
}
/**
* @param timestamp
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector< ReturnSample > out) throws Exception {
ReturnSample value = state.value();
state.clear();
out.collect(value);
}
}
拼接后的数据需要按照在线训练作业的要求对数据做格式化,比如 json、CSV 等格式。
过滤:决定什么样的数据是合格的样本。例如:有真正阅读的内容才算是可用的样本。
StateBackend 的选取
样本的监控
作业失败监控
Failover 监控
Checkpoint 失败的监控
RocksDB 使用情况的监控
作业消费 Kafka 的 Comsumer Lag 的监控
作业反压的监控
拼接率监控
正样本监控
输出样本格式的监控
输出标签对应的值是否在正常范围
输入标签对应的值是否为 null
输出标签对应的值是否为空
样本的校验
在线和离线的相互校验
将在线样本从输出的 Kafka 中接入到 HDFS 上离线存储。并按照在线 join 的时间窗口来分区。
用同等条件下生成的离线样本和在线样本做对比
白名单用户的全流程校验
将白名单用户的日志和样本结果存入 ES 等实时数仓中,来做校验。
故障的处理
平台化
对应输入数据的数据清洗逻辑
样本输出前的数据清洗逻辑
输入 Kafka 的配置信息及对应数据清洗的 UDF 类
样本拼接的时间窗口
窗口内对字段的聚合操作
样本输出的 Kafka 配置信息及输出前数据清洗和格式化的 UDF 类
拼接率监控
正样本监控
输出样本格式的监控
输出标签对应的值是否在正常范围
输入标签对应的值是否为 null
输出标签对应的值是否为空
作者介绍:
曹富强,微博机器学习研发中心-高级系统工程师。现负责微博机器学习平台数据计算/数据存储模块,主要涉及实时计算 Flink、Storm、Spark Streaming,数据存储Kafka、Redis,离线计算 Hive、Spark 等。目前专注于 Flink/Kafka/Redis 在微博机器学习场景的应用,为机器学习提供框架,技术,应用层面的支持。
(点击可了解更多议题投递详情)