内容简介:编辑 pom.xml 文件,添加依赖:Redis Sink 支持多种 Redis 环境:
更新至 Flink 1.8 版本
Apache Bahir 项目提供了基于 Flink DataStream API 的 Redis Connector,其内部使用了 Java Redis 客户端 jedis 实现了 Redis Sink。
依赖
编辑 pom.xml 文件,添加依赖:
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1-SNAPSHOT</version> </dependency>
配置
Redis Sink 支持多种 Redis 环境:
- 单节点
- 集群
- 哨兵
以单节点连接池配置为例:
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost(ip)
.setPassword(password)
.setMaxTotal(maxTotal)
.setMaxIdle(maxIdle)
.setMinIdle(minIdle)
build()
Redis Mapper
Redis Mapper 用于将输入数据映射为 Redis 操作,需要实现 RedisMapper<T>
接口。
RedisMapper 接口定义:
public interface RedisMapper<T> extends Function, Serializable {
RedisCommandDescription getCommandDescription(); // ①
String getKeyFromData(T data); // ②
String getValueFromData(T data); // ③
}
① Redis 命令描述。当前支持的 Redis 命令和数据类型对应,如下表所示:
| Redis 命令 | Redis 数据类型 |
|---|---|
| HSET | HASH |
| RPUSH,LPUSH | LIST |
| SADD | SET |
| PUBLISH | PUBSUB |
| SET | STRING |
| PFADD | HYPER LOG LOG |
| ZADD | SORTED SET |
| ZREM | SORTED SET |
所有的 Redis 命令定义在 RedisCommand
枚举类型中,所有的 Redis 数据类型定义在 RedisDataType
枚举类型中。
注意:数据类型为 RedisDataType.HASH
和 RedisDataType.SORTED_SET
,需要提供额外的键信息。
SET 命令,对应数据类型 STRING:
new RedisCommandDescription(RedisCommand.SET)
HSET 命令,对应数据类型 HASH,需要额外键:
new RedisCommandDescription(RedisCommand.HSET, key)
② 返回键。如果数据类型为 RedisDataType.HASH
和 RedisDataType.SORTED_SET
,这里返回的键值是数据类型的键值。
③ 返回值。
Redis Sink
有了配置信息和 Redis Mapper,就可以创建 Redis Sink 写入数据到 Redis 了:
stream.addSink(new RedisSink<>(
config,
mapper
));
参考
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
机器学习系统设计
[德] Willi Richert、Luis Pedro Coelho / 刘峰 / 人民邮电出版社 / 2014-7-1 / CNY 49.00
如今,机器学习正在互联网上下掀起热潮,而Python则是非常适合开发机器学习系统的一门优秀语言。作为动态语言,它支持快速探索和实验,并且针对Python的机器学习算法库的数量也与日俱增。本书最大的特色,就是结合实例分析教会读者如何通过机器学习解决实际问题。 本书将向读者展示如何从原始数据中发现模式,首先从Python与机器学习的关系讲起,再介绍一些库,然后就开始基于数据集进行比较正式的项目开......一起来看看 《机器学习系统设计》 这本书的介绍吧!