精品推荐:Redis 5 新特性之Stream

纯洁的微笑 2018-07-25 13:53:23 ⋅ 617 阅读

1. 前言

redis 5 中有一个重大新特性:stream。

stream 是一个日志形式的存储结构,可以往里追加数据,每条数据都会生成一个时间戳ID,stream 也有便捷的读取数据的模型。

stream 的特性使其适合做消息队列和时间序列存储。

下面通过实践来深入了解stream,由于内容较长,我也准备了PDF版本,文章底部有下载地址。

2. 安装环境

需要使用最新的5.0版本,这里使用 docker redis 容器:

 
  1. docker run --name redis5 -p 6379:6379 -d redis:5.0-rc3

redis 客户端:

 
  1. docker run -it --link redis5:redis --rm redis redis-cli -h redis -p 6379

启动后进入交互命令行:

 
  1. redis:6379>

3. 实践

3.1 向stream添加元素

stream 元素可以是一个或多个键值对,添加:

 
  1. redis:6379> XADD mystream * sensor-id 1234 temperature 19.8

  2. 1531989605376-0

解析:

  • mystream 是 stream的key

  • * 所在位置的参数的含义是元素ID,* 表示由系统自动生成一个元素ID

  • 添加的元素包含2个键值对,sensor-id 1234 和 temperature 19.8

  • 返回值是新增元素的ID,由时间戳和递增数字构成

获取Stream中元素的数量:

 
  1. redis:6379> XLEN mystream

  2. (integer) 1

3.2 范围查询

需要指定起止ID,相当于给一个时间范围:

 
  1. redis:6379> XRANGE mystream 1531989605376 1531989605377

  2. 1) 1) 1531989605376-0

  3.   2) 1) "sensor-id"

  4.      2) "1234"

  5.      3) "temperature"

  6.      4) "19.8"

可以使用 - 代表最小ID, + 代表最大ID:

 
  1. redis:6379> XRANGE mystream - +

  2. 1) 1) 1531989605376-0

  3.   2) 1) "sensor-id"

  4.      2) "1234"

  5.      3) "temperature"

  6.      4) "19.8"

当返回元素太多时,可以限定返回结果数量,就像数据库查询时的分页,通过 COUNT 参数指定:

 
  1. redis:6379> XRANGE mystream - + COUNT 2

  2. 1) 1) 1531989605376-0

  3.   2) 1) "sensor-id"

  4.      2) "1234"

  5.      3) "temperature"

  6.      4) "19.8"

还可以反向查询,使用 XREVRANGE 命令即可,用法与 XRANGE 相同。

3.3 监听 stream 的新元素

 
  1. redis:6379> XREAD COUNT 2 STREAMS mystream 0

  2. 1) 1) "mystream"

  3.   2) 1) 1) 1531989605376-0

  4.         2) 1) "sensor-id"

  5.            2) "1234"

  6.            3) "temperature"

  7.            4) "19.8"

STREAMS 后面的 mystream 指定的是目标 stream 的 key, 0 是指最小的ID,就是获取指定stream中的大于指定ID的元素, COUNT 指获取的数量

可以一起指定多个stream,例如 STREAMS mystream otherstream00

阻塞监听

在客户端1中执行:

 
  1. redis:6379> XREAD BLOCK 0 STREAMS mystream $

会进入等待状态。

在客户端2中添加元素:

 
  1. redis:6379> XADD mystream * test 1

客户端1中会显示刚刚添加的元素:

 
  1. 1) 1) "mystream"

  2.   2) 1) 1) 1531994510562-0

  3.         2) 1) "test"

  4.            2) "1"

BLOCK 表示阻塞, 0 是指定超时时间,0 表示永不超时, $ 表示stream中的最大ID。

3.4 消费者组

当stream量很大,或者消费者处理过程比较耗时的时候,只有一个消费者的话压力就比较大了,redis stream 提供了消费者组的概念,可以让多个消费者处理同一个stream,可以实现负债均衡。

例如有3个消费者 C1、C2、C3,stream 中有7个消息元素,那么消费的分配就是:

 
  1. 1 -> C1

  2. 2 -> C2

  3. 3 -> C3

  4. 4 -> C1

  5. 5 -> C2

  6. 6 -> C3

  7. 7 -> C1

3.4.1 创建消费者组

 
  1. redis:6379> XGROUP CREATE mystream mygroup01 $

  2. OK

针对 mystream 这个 stream 创建了一个消费者组,名字为 mygroup01$ 表示读取目前最大ID之后的元素。

3.4.2 添加测试数据

添加几条新数据:

 
  1. redis:6379> XADD mystream * message apple

  2. 1531999977149-0

  3. redis:6379> XADD mystream * message orange

  4. 1531999980272-0

  5. redis:6379> XADD mystream * message strawberry

  6. 1531999983493-0

  7. redis:6379> XADD mystream * message apricot

  8. 1531999988458-0

  9. redis:6379> XADD mystream * message banana

  10. 1531999991782-0

3.4.3 通过消费者组读数据

 
  1. redis:6379> XREADGROUP GROUP mygroup01 Alice COUNT 1 STREAMS mystream >

  2. 1) 1) "mystream"

  3.   2) 1) 1) 1531999977149-0

  4.         2) 1) "message"

  5.            2) "apple"

Alice 是组成员的名字,> 的含义是:到目前为止没有被组内成员读取过的数据。

可以看到,组成员不需要提前创建,第一次使用时自动创建。

下面再创建1个成员来读取数据:

 
  1. redis:6379> XREADGROUP GROUP mygroup01 Bob COUNT 1 STREAMS mystream >

  2. 1) 1) "mystream"

  3.   2) 1) 1) 1531999980272-0

  4.         2) 1) "message"

  5.            2) "orange"

3.4.4 消费历史

 
  1. redis:6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream 0

  2. 1) 1) "mystream"

  3.   2) 1) 1) 1531999977149-0

  4.         2) 1) "message"

  5.            2) "apple"

这里最后指定的ID是 0,这样可以拿到悬而未决的历史数据,就是:自己曾经消费过,但没有发送消费确认的历史数据,这样可以让我们做故障恢复后的完善工作。

3.4.5 消费确认

 
  1. redis:6379> XACK mystream mygroup01 1531999977149-0

  2. (integer) 1

1531999977149-0Alice 消费的那条 apple 数据,再查看下 Alice 的消费历史:

 
  1. redis:6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream 0

  2. 1) 1) "mystream"

  3.   2) (empty list or set)

已经空了。

3.4.6 失败处理

通过上面可以了解到,当某个消费者出现问题然后恢复了之后,可以拿到自己还没有确认过的消息数据,这个一个安全保障机制,但如果这个出问题的消费者再也恢复不了了怎么办?他的那些还没确认过的消息数据是不是就没办法处理了?

redis stream 提供了这种情况的处理办法,通过2个步骤来解决:

  1. 查出所有已传递但未确认的消息数据

  2. 变更这些数据的所有者

这样就可以让新的消费者来处理这些数据了。

列出未处理的数据:

 
  1. redis:6379> XPENDING mystream mygroup01 - + 10

  2. 1) 1) 1531999980272-0

  3.   2) "Bob"

  4.   3) (integer) 45126376

  5.   4) (integer) 2

  6. 2) 1) 1531999983493-0

  7.   2) "Tom"

  8.   3) (integer) 867475

  9.   4) (integer) 1

可以看到有2条数据未处理,列出了每条数据的 ID、所有者、此条消息的闲置时间(毫秒)、此消息被传递的次数。

声明变更所有者:

 
  1. redis:6379> XCLAIM mystream mygroup01 Gates 3600 1531999980272-0 1531999983493-0

  2. 1) 1) 1531999980272-0

  3.   2) 1) "message"

  4.      2) "orange"

  5. 2) 1) 1531999983493-0

  6.   2) 1) "message"

  7.      2) "strawberry"

把指定2个ID的消息给了Gates3600 是指最小闲置时间,就是把指定消息中闲置时间大于3600的分配给Gates,注意Gates是全新的消费者,之前没有声明过,说明分配给新的消费者也是可以的。

查询一下Gates现在未处理的数据:

 
  1. redis:6379> XREADGROUP GROUP mygroup01 Gates STREAMS mystream 0

  2. 1) 1) "mystream"

  3.   2) 1) 1) 1531999980272-0

  4.         2) 1) "message"

  5.            2) "orange"

  6.      2) 1) 1531999983493-0

  7.         2) 1) "message"

  8.            2) "strawberry"

可以看到新分配的2条数据。

3.5 查看 stream 相关信息

基本信息:

 
  1. redis:6379> XINFO STREAM mystream

  2. 1) length

  3. 2) (integer) 15

  4. 3) radix-tree-keys

  5. 4) (integer) 1

  6. 5) radix-tree-nodes

  7. 6) (integer) 2

  8. 7) groups

  9. 8) (integer) 2

  10. 9) first-entry

  11. 10) 1) 1531989605376-0

  12.    2) 1) "sensor-id"

  13.       2) "1234"

  14.       3) "temperature"

  15.       4) "19.8"

  16. 11) last-entry

  17. 12) 1) 1531999991782-0

  18.    2) 1) "message"

  19.       2) "banana"

消费组信息:

 
  1. redis:6379> XINFO GROUPS mystream

  2. 1) 1) name

  3.   2) "mygroup"

  4.   3) consumers

  5.   4) (integer) 3

  6.   5) pending

  7.   6) (integer) 5

  8. 2) 1) name

  9.   2) "mygroup01"

  10.   3) consumers

  11.   4) (integer) 4

  12.   5) pending

  13.   6) (integer) 2

某个组中消费者的信息:

 
  1. redis:6379> XINFO CONSUMERS mystream mygroup

  2. 1) 1) name

  3.   2) "Alice"

  4.   3) pending

  5.   4) (integer) 3

  6.   5) idle

  7.   6) (integer) 2483388

  8. 2) 1) name

  9.   2) "Bob"

  10.   3) pending

  11.   4) (integer) 2

  12.   5) idle

  13.   6) (integer) 48453755

  14. 3) 1) name

  15.   2) "Gates"

  16.   3) pending

  17.   4) (integer) 0

  18.   5) idle

  19.   6) (integer) 2385114

3.7 删除消息数据

先查一下现有数据:

 
  1. redis:6379> XRANGE mystream - + COUNT 2

  2. 1) 1) 1531989605376-0

  3.   2) 1) "sensor-id"

  4.      2) "1234"

  5.      3) "temperature"

  6.      4) "19.8"

  7. 2) 1) 1531994510562-0

  8.   2) 1) "test"

  9.      2) "1"

删除第一条数据:

 
  1. redis:6379> XDEL mystream 1531989605376-0

  2. (integer) 1

再次查看,之前的第一条数据已经没有了:

 
  1. redis:6379> XRANGE mystream - + COUNT 2

  2. 1) 1) 1531994510562-0

  3.   2) 1) "test"

  4.      2) "1"

  5. 2) 1) 1531994516257-0

  6.   2) 1) "test"

  7.      2) "2"

注意:XDEL 并不是真正的从内存中删除,只是做了标识,不会回收内存

3.8 设置stream最大长度

添加数据,同时指定了最大长度为2:

 
  1. redis:6379> XADD mystream MAXLEN 2 * value 1

  2. 1532049865028-0

  3. redis:6379> XADD mystream MAXLEN 2 * value 2

  4. 1532049872075-0

  5. redis:6379> XADD mystream MAXLEN 2 * value 3

  6. 1532049877554-0

上面添加了3条数据,下面看一下stream 的长度和现在的内容:

 
  1. redis:6379> XLEN mystream

  2. (integer) 2

  3. redis:6379> XRANGE mystream - +

  4. 1) 1) 1532049872075-0

  5.   2) 1) "value"

  6.      2) "2"

  7. 2) 1) 1532049877554-0

  8.   2) 1) "value"

  9.      2) "3"

可以看到只有2条数据。

4. 小结

以上就是 redis stream 的基础操作,实践一遍之后就会对 stream 有个全面的了解。

PDF 下载地址:

https://pan.baidu.com/s/1j91evQWfJHFxMXftTxSi6A


---------------END----------------

后续的内容同样精彩

长按关注“IT实战联盟”哦




全部评论: 0

    我有话说:

    「收藏版」JDK1.8工作中最常用的14个Stream详细小示例

    一:简介 java.util.Stream 表示能应用在一组元素上一次执行的操作序列。Stream 操作分为中间操作或者最终操作两种,最终操作返回一特定类型的计算结果,而中间操作返回Stream本身

    Redis系列九 推荐系统-布隆过滤器

    存在时,那就肯定不存在. 特性: 与哈希表不同,...

    精品推荐Redis主从复制

    持久化保证了即使 redis 服务重启也会丢失数据,因为 redis 服务重启后会将硬盘上持久化的数据恢复到内存中,但是当 redis 服务器的硬盘损坏了可能会导致数据丢失,如果通过 redis

    Redisson 3.13.6 发布,官方推荐Redis 客户端

    Redisson 3.13.6 已发布,这是一个 Java 编写的 Redis 客户端,具备驻内存数据网格(In-Memory Data Grid)功能,并获得了 Redis 的官方推荐

    Redisson 3.13.5 发布,官方推荐Redis 客户端

    Redisson 3.13.5 已发布,这是一个 Java 编写的 Redis 客户端,具备驻内存数据网格(In-Memory Data Grid)功能,并获得了 Redis 的官方推荐。 主要更新

    Redis系列四 锁

      本文目标 1. 熟悉乐观锁ABA概念 2. 理解掌握redis事务以及watch回滚; 3. 实战redis锁 乐观锁 乐观锁是一种不会阻塞其他线程并发的机制,它不会使用数据库的

    为什么单线程的Redis能够达到百万级的QPS?

    作者:在江湖中coding链接:https://juejin.im/post/5e6097846fb9a07c9f3fe744 性能测试报告 查看了下阿里云 Redis 的性能测试报告如下,能够

    Redis系列七 Debug Lua

      调试redis+lua 学了lua的基本语法,了解了redis+lua的配套用法,但是却不知道怎么断点调试。学就学全面点, 官网中有dubug相关说明。地址:Redis Lua

    Redis系列六 Lua

      本文目标 学习lua基本语法 能够采用redis+lua lua 基本语法 Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用

    Redis系列一 基本用法&应用场景

        说明 redis的最基本使用方法以及使用场景。 字符串 // stringasync function stringFun() { const [key

    Jedis 3.5.0 发布,Redis 的 Java 客户端开发包

    Jedis 3.5.0 发布了。Jedis 是 Redis 官方推荐的面向 Java 的操作 Redis 客户端,与版本的 Redis 完全兼容。 本次更新内容包括: 增强功能 通过 COUNT

    Redis系列二:位图实战,实现打卡签到

    前言 如果要统计一篇文章的阅读量,可以直接使用 Redis 的 incr 指令来完成。 如果要求阅读量必须按用户去重,那就可以使用 set 来记录阅读了这篇文章的所有用户 id,获取

    推荐一款功能强大,开源免费的H5可视化编辑器

    H5-Dooring 是一款功能强大,开源免费的H5可视化页面配置解决方案,致力于提供一套简单方便、专业可靠、无限可能的H5落地页最佳实践。技术栈以react为主, 后台采用nodejs开发. 预览

    精品推荐:从数据存储角度聊聊Redis为何这么快

    Redis是一个由ANSI C语言编写,性能优秀、支持网络、可持久化的K-K内存数据库,并提供多种语言的API。它常用的类型主要是 String、List、Hash、Set、ZSet 这5

    精品推荐:JDFlutter | 京东技术中台一代跨平台开发框架

    DFlutter 是商城共享技术部-多端融合技术部推出一代跨平台开发框架,可快速集成至现有 Android/iOS 工程,开发者可借助 JDFlutter 平台快速完成 Flutter 业务开发。

    dubbogo v1.5.6 发布,性能提升并带来多项特性

    dubbogo 社区近期发布了 dubbogo v1.5.6。该版本和 dubbo 2.7.8 对齐,提供了命令行工具,并提供了多种加载配置的方式。 相关改进实在太多,本文只列出相关重大

    精品推荐」优秀跨平台Redis可视化客户端工具—RedisViewer

    自荐RedisViewer一个有情怀的跨平台Redis可视化客户端工具

    Redis系列八 抢红包

      本文概述 掌握红包的两种常见生成算法 掌握lua+redis 实现原子性抢红包 项目中还有mysql相关内容 了解jmeter的基本用法 遗留问题 redis同步DB时机问题

    Java 实战篇-JDK9特性体验

    JDK9 已经出来好几个月了,我们一起来了解一下JDK9的一些特性