Appearance
Redis 流 - Stream
Redis 流简介
Redis 流是一种数据结构,其作用类似于仅追加日志,但也实施了多项操作来克服典型仅追加日志的一些限制。其中包括 O(1) 时间的随机访问和复杂的消费策略,例如消费者组。 您可以使用流实时记录和同时联合事件。 Redis 流使用案例的示例包括:
- 事件溯源(例如,跟踪用户操作、点击等)
- 传感器监控(例如,来自现场设备的读数)
- 通知(例如,将每个用户的通知记录存储在单独的数据流中)
Redis 为每个流条目生成一个唯一的 ID。 您可以使用这些 ID 稍后检索其关联的条目,或者读取和处理流中的所有后续条目。请注意,由于这些 ID 与时间相关,因此此处显示的 ID 可能会有所不同,并且与您在自己的 Redis 实例中看到的 ID 不同。
Redis 流支持多种修剪策略(以防止流无限增长)和多种使用策略(请参阅 XREAD、XREADGROUP 和 XRANGE)。
基本命令
- XADD 将新条目添加到流中。
- XREAD 读取一个或多个条目,从给定位置开始并在时间上向前移动。
- XRANGE 返回两个提供的条目 ID 之间的条目范围。
- XLEN 返回流的长度。
请参阅 stream 命令的完整列表。
例子
- 当我们的赛车手通过检查点时,我们会为每个赛车手添加一个流条目,其中包括赛车手的姓名、速度、位置和位置 ID:
bash
> XADD race:france * rider Castilla speed 30.2 position 1 location_id 1
"1692632086370-0"
> XADD race:france * rider Norem speed 28.8 position 3 location_id 1
"1692632094485-0"
> XADD race:france * rider Prickett speed 29.7 position 2 location_id 1
"1692632102976-0"
python
res1 = r.xadd(
"race:france",
{"rider": "Castilla", "speed": 30.2, "position": 1, "location_id": 1},
)
print(res1) # >>> 1692629576966-0
res2 = r.xadd(
"race:france",
{"rider": "Norem", "speed": 28.8, "position": 3, "location_id": 1},
)
print(res2) # >>> 1692629594113-0
res3 = r.xadd(
"race:france",
{"rider": "Prickett", "speed": 29.7, "position": 2, "location_id": 1},
)
print(res3) # >>> 1692629613374-0
js
const res1 = await client.xAdd(
'race:france', '*', {
'rider': 'Castilla',
'speed': '30.2',
'position': '1',
'location_id': '1'
}
);
console.log(res1); // >>> 1700073067968-0 N.B. actual values will differ from these examples
const res2 = await client.xAdd(
'race:france', '*', {
'rider': 'Norem',
'speed': '28.8',
'position': '3',
'location_id': '1'
},
);
console.log(res2); // >>> 1692629594113-0
const res3 = await client.xAdd(
'race:france', '*', {
'rider': 'Prickett',
'speed': '29.7',
'position': '2',
'location_id': '1'
},
);
console.log(res3); // >>> 1692629613374-0
java
StreamEntryID res1 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Castilla");put("speed","30.2");put("position","1");put("location_id","1");}} , XAddParams.xAddParams());
System.out.println(res1); // >>> 1701760582225-0
StreamEntryID res2 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Norem");put("speed","28.8");put("position","3");put("location_id","1");}} , XAddParams.xAddParams());
System.out.println(res2); // >>> 1701760582225-1
StreamEntryID res3 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Prickett");put("speed","29.7");put("position","2");put("location_id","1");}} , XAddParams.xAddParams());
System.out.println(res3); // >>> 1701760582226-0
go
res1, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "race:france",
Values: map[string]interface{}{
"rider": "Castilla",
"speed": 30.2,
"position": 1,
"location_id": 1,
},
}).Result()
if err != nil {
panic(err)
}
// fmt.Println(res1) // >>> 1692632086370-0
res2, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "race:france",
Values: map[string]interface{}{
"rider": "Norem",
"speed": 28.8,
"position": 3,
"location_id": 1,
},
}).Result()
if err != nil {
panic(err)
}
// fmt.PrintLn(res2) // >>> 1692632094485-0
res3, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "race:france",
Values: map[string]interface{}{
"rider": "Prickett",
"speed": 29.7,
"position": 2,
"location_id": 1,
},
}).Result()
if err != nil {
panic(err)
}
// fmt.Println(res3) // >>> 1692632102976-0
csharp
RedisValue res1 = db.StreamAdd(
"race:france",
new NameValueEntry[] {
new NameValueEntry("rider", "Castilla"),
new NameValueEntry("speed", 30.2),
new NameValueEntry("position", 1),
new NameValueEntry("location_id", 1)
}
);
Console.WriteLine(res1); // >>> 1712668482289-0
RedisValue res2 = db.StreamAdd(
"race:france",
new NameValueEntry[] {
new NameValueEntry("rider", "Norem"),
new NameValueEntry("speed", 28.8),
new NameValueEntry("position", 3),
new NameValueEntry("location_id", 1)
}
);
Console.WriteLine(res2); // >>> 1712668766534-1
RedisValue res3 = db.StreamAdd(
"race:france",
new NameValueEntry[]{
new NameValueEntry("rider", "Prickett"),
new NameValueEntry("speed", 29.7),
new NameValueEntry("position", 2),
new NameValueEntry("location_id", 1)
}
);
Console.WriteLine(res3); // >>> 1712669055705-0
读取两个从 ID 开始的流条目 :1692632086370-0
bash
> XRANGE race:france 1692632086370-0 + COUNT 2
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"
python
res4 = r.xrange("race:france", "1691765278160-0", "+", 2)
print(
res4
) # >>> [
# ('1692629576966-0',
# {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}
# ),
# ('1692629594113-0',
# {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}
# )
# ]
js
const res4 = await client.xRange('race:france', '1691765278160-0', '+', {COUNT: 2});
console.log(res4); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'})]
java
List<StreamEntry> res4 = jedis.xrange("race:france","1701760582225-0","+",2);
System.out.println(res4); // >>> [1701760841292-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701760841292-1 {rider=Norem, speed=28.8, location_id=1, position=3}]
go
res4, err := rdb.XRangeN(ctx, "race:france", "1691765278160-0", "+", 2).Result()
if err != nil {
panic(err)
}
fmt.Println(res4)
// >>> [{1692632086370-0 map[location_id:1 position:1 rider:Castilla...
csharp
StreamEntry[] res4 = db.StreamRange("race:france", "1712668482289-0", "+", 2);
foreach (StreamEntry entry in res4)
{
Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]");
}
// >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1]
// >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1]
- 从流的末尾开始,最多读取 100 个新的流条目,如果未写入任何条目,则阻塞长达 300 毫秒:
bash
> XREAD COUNT 100 BLOCK 300 STREAMS race:france $
(nil)
python
res5 = r.xread(streams={"race:france": 0}, count=100, block=300)
print(
res5
)
# >>> [
# ['race:france',
# [('1692629576966-0',
# {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}
# ),
# ('1692629594113-0',
# {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}
# ),
# ('1692629613374-0',
# {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'}
# )]
# ]
# ]
js
const res5 = await client.xRead({
key: 'race:france',
id: '0-0'
}, {
count: 100,
block: 300
});
console.log(res5); // >>> [['race:france', [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}), ('1692629613374-0', {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'})]]]
java
List<Map.Entry<String, List<StreamEntry>>> res5= jedis.xread(XReadParams.xReadParams().block(300).count(100),new HashMap<String,StreamEntryID>(){{put("race:france",new StreamEntryID());}});
System.out.println(
res5
); // >>> [race:france=[1701761996660-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701761996661-0 {rider=Norem, speed=28.8, location_id=1, position=3}, 1701761996661-1 {rider=Prickett, speed=29.7, location_id=1, position=2}]]
go
res5, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"race:france", "0"},
Count: 100,
Block: 300,
}).Result()
if err != nil {
panic(err)
}
fmt.Println(res5)
// >>> // [{race:france [{1692632086370-0 map[location_id:1 position:1...
csharp
StreamEntry[] res5 = db.StreamRead("race:france", 0, 100);
foreach (StreamEntry entry in res4)
{
Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]");
}
// >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1]
// >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1]
// >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1]
性能
向流中添加条目是 O(1)。 访问任何单个条目都是 O(n),其中 n 是 ID 的长度。 由于流 ID 通常很短且长度固定,因此这实际上可以减少为恒定时间查找。 有关原因的详细信息,请注意 streams 是作为基数树实现的。
简而言之,Redis 流提供高效的插入和读取。 有关详细信息,请参阅每个命令的时间复杂度。
Streams 基础知识
Streams 是一种仅追加的数据结构。基本写入命令(称为 XADD)将新条目追加到指定的流中。
每个流条目由一个或多个字段-值对组成,有点像字典或 Redis 哈希:
bash
> XADD race:france * rider Castilla speed 29.9 position 1 location_id 2
"1692632147973-0"
python
res6 = r.xadd(
"race:france",
{"rider": "Castilla", "speed": 29.9, "position": 1, "location_id": 2},
)
print(res6) # >>> 1692629676124-0
js
const res6 = await client.xAdd(
'race:france', '*', {
'rider': 'Castilla',
'speed': '29.9',
'position': '1',
'location_id': '2'
}
);
console.log(res6); // >>> 1692629676124-0
java
StreamEntryID res6 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Castilla");put("speed","29.9");put("position","2");put("location_id","1");}} , XAddParams.xAddParams());
System.out.println(res6); // >>> 1701762285679-0
go
res6, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "race:france",
Values: map[string]interface{}{
"rider": "Castilla",
"speed": 29.9,
"position": 1,
"location_id": 2,
},
}).Result()
if err != nil {
panic(err)
}
//fmt.Println(res6) // >>> 1692632147973-0
csharp
RedisValue res6 = db.StreamAdd(
"race:france",
new NameValueEntry[]{
new NameValueEntry("rider", "Castilla"),
new NameValueEntry("speed", 29.9),
new NameValueEntry("position", 1),
new NameValueEntry("location_id", 2)
}
);
Console.WriteLine(res6); // >>> 1712675674750-0
上述对 XADD 命令的调用使用自动生成的条目 ID 向流中的keyrace:france
处添加一个条目rider: Castilla, speed: 29.9, position: 1, location_id: 2
,该条目 ID 是命令返回的条目,特别是 1692632147973-0
。它获取 key namerace:france
作为其第一个参数,第二个参数是标识流中每个条目的条目 ID。但是,在本例中,我们传递*
是因为我们希望服务器为我们生成新 ID。每个新 ID 都将单调递增,因此更简单地说,与过去的所有条目相比,添加的每个新条目都将具有更高的 ID。服务器自动生成 ID 几乎总是您想要的,并且明确指定 ID 的原因非常罕见。我们稍后会详细讨论这个问题。每个 Stream 条目都有一个 ID 这一事实与日志文件的另一个相似之处,其中行号或文件内的字节偏移量可用于标识给定条目。回到我们的 XADD 示例,在键名称和 ID 之后,下一个参数是组成流条目的字段-值对。
只需使用 XLEN 命令即可获取 Stream 中的项目数:
bash
> XLEN race:france
(integer) 4
python
res7 = r.xlen("race:france")
print(res7) # >>> 4
js
const res7 = await client.xLen('race:france');
console.log(res7); // >>> 4
java
long res7 = jedis.xlen("race:france");
System.out.println(res7); // >>> 4
go
res7, err := rdb.XLen(ctx, "race:france").Result()
if err != nil {
panic(err)
}
fmt.Println(res7) // >>> 4
csharp
long res7 = db.StreamLength("race:france");
Console.WriteLine(res7); // >>> 4
条目 ID
XADD 命令返回的条目 ID 以及统一标识给定流中的每个条目,由两部分组成:
txt
<millisecondsTime>-<sequenceNumber>
毫秒时间部分实际上是生成流 ID 的本地 Redis 节点中的本地时间,但是如果当前毫秒时间恰好小于前一个输入时间,则使用前一个输入时间,因此如果时钟向后跳动,单调递增的 ID 属性仍然成立。序列号用于在同一毫秒内创建的条目。由于序列号是 64 位宽的,因此实际上,在同一毫秒内可以生成的条目数没有限制。
这种 ID 的格式乍一看可能看起来很奇怪,温和的读者可能会想知道为什么时间是 ID 的一部分。原因是 Redis 流支持按 ID 进行范围查询。由于 ID 与条目的生成时间相关,因此基本上可以免费查询时间范围。我们很快就会在介绍 XRANGE 命令时看到这一点。
如果出于某种原因,用户需要与时间无关但实际上与另一个外部系统 ID 关联的增量 ID,如前所述,XADD 命令可以采用显式 ID,而不是触发自动生成的通配符 ID*
,如以下示例所示:
bash
> XADD race:usa 0-1 racer Castilla
0-1
> XADD race:usa 0-2 racer Norem
0-2
python
res8 = r.xadd("race:usa", {"racer": "Castilla"}, id="0-1")
print(res8) # >>> 0-1
res9 = r.xadd("race:usa", {"racer": "Norem"}, id="0-2")
print(res9) # >>> 0-2
js
const res8 = await client.xAdd('race:usa', '0-1', {
'racer': 'Castilla'
});
console.log(res8); // >>> 0-1
const res9 = await client.xAdd('race:usa', '0-2', {
'racer': 'Norem'
});
console.log(res9); // >>> 0-2
java
StreamEntryID res8 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Castilla");}},XAddParams.xAddParams().id("0-1"));
System.out.println(res8); // >>> 0-1
StreamEntryID res9 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Norem");}},XAddParams.xAddParams().id("0-2"));
System.out.println(res9); // >>> 0-2
go
res8, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "race:usa",
Values: map[string]interface{}{
"racer": "Castilla",
},
ID: "0-1",
}).Result()
if err != nil {
panic(err)
}
fmt.Println(res8) // >>> 0-1
res9, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "race:usa",
Values: map[string]interface{}{
"racer": "Norem",
},
ID: "0-2",
}).Result()
if err != nil {
panic(err)
}
fmt.Println(res9) // >>> 0-2
csharp
RedisValue res8 = db.StreamAdd(
"race:usa",
new NameValueEntry[] {
new NameValueEntry("racer", "Castilla")
},
"0-1"
);
Console.WriteLine(res8); // >>> 0-1
RedisValue res9 = db.StreamAdd(
"race:usa",
new NameValueEntry[]{
new NameValueEntry("racer", "Norem")
},
"0-2"
);
Console.WriteLine(res9); // >>> 0-2
请注意,在这种情况下,最小 ID 为 0-1,并且该命令不会接受等于或小于前一个 ID 的 ID:
bash
> XADD race:usa 0-1 racer Prickett
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
python
try:
res10 = r.xadd("race:usa", {"racer": "Prickett"}, id="0-1")
print(res10) # >>> 0-1
except redis.exceptions.ResponseError as e:
print(e) # >>> WRONGID
js
try {
const res10 = await client.xAdd('race:usa', '0-1', {
'racer': 'Prickett'
});
console.log(res10); // >>> 0-1
} catch (error) {
console.error(error); // >>> WRONGID
}
java
try {
StreamEntryID res10 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Prickett");}},XAddParams.xAddParams().id("0-1"));
System.out.println(res10); // >>> 0-1
}
catch (JedisDataException e){
System.out.println(e); // >>> ERR The ID specified in XADD is equal or smaller than the target stream top item
}
go
res10, err := rdb.XAdd(ctx, &redis.XAddArgs{
Values: map[string]interface{}{
"racer": "Prickett",
},
ID: "0-1",
}).Result()
if err != nil {
// fmt.Println(err)
// >>> ERR The ID specified in XADD is equal or smaller than the target stream top item
}
csharp
try
{
RedisValue res10 = db.StreamAdd(
"race:usa",
new NameValueEntry[]{
new NameValueEntry("racer", "Prickett")
},
"0-1"
);
}
catch (RedisServerException ex)
{
Console.WriteLine(ex); // >>> ERR The ID specified in XADD is equal or smaller than the target stream top item
}
如果您运行的是 Redis 7 或更高版本,则还可以提供仅包含毫秒部分的显式 ID。在这种情况下,将自动生成 ID 的序列部分。为此,请使用以下语法:
bash
> XADD race:usa 0-* racer Prickett
0-3
python
# Not yet implemented
js
const res11a = await client.xAdd('race:usa', '0-*', { racer: 'Norem' });
console.log(res11a); // >>> 0-3
java
StreamEntryID res11 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Norem");}},XAddParams.xAddParams().id("0-*"));
System.out.println(res11);
go
res11, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "race:usa",
Values: map[string]interface{}{
"racer": "Prickett",
},
ID: "0-*",
}).Result()
if err != nil {
panic(err)
}
fmt.Println(res11) // >>> 0-3
csharp
RedisValue res11 = "";
Version version = muxer.GetServer("localhost:6379").Version;
if (version.Major >= 7)
{
res11 = db.StreamAdd(
"race:usa",
new NameValueEntry[]{
new NameValueEntry("rider", "Norem")
},
"0-*"
);
Console.WriteLine(res11); // >>> "0-3"
}
从 Streams 获取数据
现在我们终于能够通过 XADD 在我们的流中附加条目了。但是,虽然将数据附加到流中非常明显,但查询流以提取数据的方式并不那么明显。如果我们继续对日志文件进行类比,一种明显的方法是模仿我们通常使用 Unix 命令 ,也就是说,我们可以开始侦听以获取附加到流中的新消息。请注意,与 Redis 的阻止列表操作不同,在 Redis 的阻止列表操作中,给定元素将到达在 BLPOP 等流行样式操作中阻塞的单个客户端,对于流,我们希望多个使用者看到附加到流中的新消息(与许多进程可以看到添加到日志中的内容的方式相同)。使用传统术语,我们希望流能够将消息扇出到多个客户端。tail -ftail -f
但是,这只是一种可能的访问模式。我们还可以以完全不同的方式查看流:不是作为消息传递系统,而是作为时间序列存储。在这种情况下,可能附加新消息也很有用,但另一种自然的查询模式是按时间范围获取消息,或者使用游标迭代消息以增量检查所有历史记录。这绝对是另一种有用的访问模式。
最后,如果我们从使用者的角度来看流,我们可能希望以另一种方式访问该流,即作为消息流,该消息流可以分区到正在处理此类消息的多个使用者,以便使用者组只能看到到达单个流中的消息的子集。通过这种方式,可以在不同的使用者之间扩展消息处理,而无需单个使用者处理所有消息:每个使用者将只获得不同的消息来处理。这基本上就是 Kafka (TM) 对消费者组所做的。通过使用者组读取消息是从 Redis 流读取消息的另一种有趣模式。
Redis Streams 通过不同的命令支持上述所有三种查询模式。接下来的部分将介绍它们,从最简单和最直接使用的 Range 查询开始。
按范围查询:XRANGE 和 XREVRANGE
要按范围查询流,我们只需要指定两个 ID,start 和 end。返回的范围将包括 ID 为 start 或 end 的元素,因此范围是非独占的。这两个特殊 ID -
和 +
分别表示可能的最小 ID 和最大 ID。
bash
> XRANGE race:france - +
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"
3) 1) "1692632102976-0"
2) 1) "rider"
2) "Prickett"
3) "speed"
4) "29.7"
5) "position"
6) "2"
7) "location_id"
8) "1"
4) 1) "1692632147973-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "29.9"
5) "position"
6) "1"
7) "location_id"
8) "2"
python
res11 = r.xrange("race:france", "-", "+")
print(
res11
)
# >>> [
# ('1692629576966-0',
# {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}
# ),
# ('1692629594113-0',
# {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}
# ),
# ('1692629613374-0',
# {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'}
# ),
# ('1692629676124-0',
# {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'}
# )
# ]
js
const res11 = await client.xRange('race:france', '-', '+');
console.log(res11);
java
List<StreamEntry> res12 = jedis.xrange("race:france","-","+");
System.out.println(
res12
);
go
res12, err := rdb.XRange(ctx, "race:france", "-", "+").Result()
if err != nil {
panic(err)
}
fmt.Println(res12)
// >>> [{1692632086370-0 map[location_id:1 position:1 rider:Castilla...
csharp
StreamEntry[] res12 = db.StreamRange("race:france", "-", "+");
foreach (StreamEntry entry in res12)
{
Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]");
}
// >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1]
// >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1]
// >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1]
// >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2]
返回的每个条目都是一个包含两项的数组:ID 和字段-值对列表。我们已经说过条目 ID 与时间有关,因为-
字符左侧的部分是创建条目时创建流条目的本地节点的 Unix 时间(以毫秒为单位)(但请注意,流是使用完全指定的 XADD 命令复制的,因此副本将具有与主节点相同的 ID)。这意味着我可以使用 XRANGE 查询时间范围。但是,为了做到这一点,我可能想省略 ID 的序列部分:如果省略,则在范围的开头将假定为 0,而在结束部分,它将假定为可用的最大序列号。这样,仅使用 2 毫秒的 Unix 时间进行查询,我们就可以以包容性的方式获得在该时间范围内生成的所有条目。例如,如果我想查询一个 2 毫秒的周期,我可以使用:
bash
> XRANGE race:france 1692632086369 1692632086371
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
python
res12 = r.xrange("race:france", 1692629576965, 1692629576967)
print(
res12
)
# >>> [
# ('1692629576966-0',
# {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}
# )
# ]
js
const res12 = await client.xRange('race:france', '1692629576965', '1692629576967');
console.log(res12); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'})]
java
List<StreamEntry> res13 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()-1000),String.valueOf(System.currentTimeMillis()+1000));
System.out.println(
res13
);
go
res13, err := rdb.XRange(ctx, "race:france",
"1692632086369", "1692632086371",
).Result()
if err != nil {
panic(err)
}
fmt.Println(res13)
// >>> [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]}]
csharp
StreamEntry[] res13 = db.StreamRange("race:france", 1712668482289, 1712668482291);
foreach (StreamEntry entry in res13)
{
Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]");
}
// >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1]
我在这个范围内只有一个条目。但是,在实际数据集中,我可以查询小时范围,或者可能在短短两毫秒内包含许多项目,并且返回的结果可能非常大。因此,XRANGE 在末尾支持可选的 COUNT 选项。通过指定计数,我可以只获取前 N 项。如果我想要更多,我可以获取返回的最后一个 ID,将序列部分增加 1,然后再次查询。让我们在下面的示例中看到这一点。假设流race:france
中填充了 4 个项目。为了开始我的迭代,每个命令获取 2 个项目,我从完整范围开始,但计数为 2。
bash
> XRANGE race:france - + COUNT 2
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"
python
res13 = r.xrange("race:france", "-", "+", 2)
print(
res13
)
# >>> [
# ('1692629576966-0',
# {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}
# ),
# ('1692629594113-0',
# {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}
# )
# ]
js
const res14 = await client.xRange('race:france', '(1692629594113-0', '+', {COUNT: 2});
console.log(res14);
java
List<StreamEntry> res15 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()-1000)+"-0","+",2);
System.out.println(res15);
go
res14, err := rdb.XRangeN(ctx, "race:france", "-", "+", 2).Result()
if err != nil {
panic(err)
}
fmt.Println(res14)
csharp
StreamEntry[] res14 = db.StreamRange("race:france", "-", "+", 2);
foreach (StreamEntry entry in res14)
{
Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]");
}
要继续迭代接下来的两个项目,我必须选择返回的最后一个 ID,即 1692632094485-0
,并向其添加前缀(
。生成的独占范围间隔(在本例中(1692632094485-0
)现在可以用作下一个 XRANGE 调用的新 start 参数:
bash
> XRANGE race:france (1692632094485-0 + COUNT 2
1) 1) "1692632102976-0"
2) 1) "rider"
2) "Prickett"
3) "speed"
4) "29.7"
5) "position"
6) "2"
7) "location_id"
8) "1"
2) 1) "1692632147973-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "29.9"
5) "position"
6) "1"
7) "location_id"
8) "2"
python
res14 = r.xrange("race:france", "(1692629594113-0", "+", 2)
print(
res14
)
# >>> [
# ('1692629613374-0',
# {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'}
# ),
# ('1692629676124-0',
# {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'}
# )
# ]
js
const res14 = await client.xRange('race:france', '(1692629594113-0', '+', {COUNT: 2});
console.log(res14);
java
List<StreamEntry> res16 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()+1000)+"-0","+",2);
System.out.println(res16); // >>> []
go
res15, err := rdb.XRangeN(ctx, "race:france",
"(1692632094485-0", "+", 2,
).Result()
if err != nil {
panic(err)
}
fmt.Println(res15)
csharp
StreamEntry[] res15 = db.StreamRange("race:france", "(1712668766534-1", "+", 2);
foreach (StreamEntry entry in res15)
{
Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]");
}
// >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1]
// >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2]
现在我们已经从只有 4 个条目的流中检索了 4 个项目,如果我们尝试检索更多项目,我们将得到一个空数组
bash
> XRANGE race:france (1692632147973-0 + COUNT 2
(empty array)
python
res15 = r.xrange("race:france", "(1692629676124-0", "+", 2)
print(res15) # >>> []
js
const res15 = await client.xRange('race:france', '(1692629676124-0', '+', {COUNT: 2});
console.log(res15); // >>> []
java
List<StreamEntry> res16 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()+1000)+"-0","+",2);
System.out.println(res16); // >>> []
go
res16, err := rdb.XRangeN(ctx, "race:france",
"(1692632147973-0", "+", 2,
).Result()
if err != nil {
panic(err)
}
fmt.Println(res16)
// >>> []
csharp
StreamEntry[] res16 = db.StreamRange("race:france", "(1712675674750-0", "+", 2);
foreach (StreamEntry entry in res16)
{
Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]");
}
// >>> <empty array>
由于 XRANGE 复杂度是 O(log(N)) 来查找,然后 O(M) 来返回 M 个元素,因此如果计数较小,该命令具有对数时间复杂度,这意味着迭代的每一步都很快。因此,XRANGE 也是事实上的流迭代器,不需要 XSCAN 命令。
命令 XREVRANGE 等同于 XRANGE,但以相反的顺序返回元素,因此 XREVRANGE 的实际用途是检查 Stream 中的最后一项是什么:
bash
> XREVRANGE race:france + - COUNT 1
1) 1) "1692632147973-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "29.9"
5) "position"
6) "1"
7) "location_id"
8) "2"
python
res16 = r.xrevrange("race:france", "+", "-", 1)
print(
res16
)
# >>> [
# ('1692629676124-0',
# {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'}
# )
# ]
js
const res16 = await client.xRevRange('race:france', '+', '-', {COUNT: 1});
console.log(
res16
); // >>> [('1692629676124-0', {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'})]
java
List<StreamEntry> res17 = jedis.xrevrange("race:france","+","-",1);
System.out.println(res17); // >>> [1701765218592-0 {rider=Castilla, speed=29.9, location_id=1, position=2}]
go
res17, err := rdb.XRevRangeN(ctx, "race:france", "+", "-", 1).Result()
if err != nil {
panic(err)
}
fmt.Println(res17)
// >>> [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}]
csharp
StreamEntry[] res17 = db.StreamRange("race:france", "+", "-", 1, Order.Descending);
foreach (StreamEntry entry in res17)
{
Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]");
}
// >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2]
请注意,XREVRANGE 命令以相反的顺序获取 start 和 stop 参数。
使用 XREAD 侦听新项
当我们不想按流中的某个范围访问项目时,通常我们想要的是订阅到达流的新项目。这个概念可能与 Redis Pub/Sub 相关,在其中您订阅了一个频道,或者与 Redis 阻止列表相关,在其中您等待键获取要获取的新元素,但您使用流的方式存在根本差异:
- 一个流可以有多个客户端(使用者)等待数据。默认情况下,每个新项目都将交付给正在等待给定流中数据的每个使用者。此行为与阻止列表不同,在阻止列表中,每个使用者将获得不同的元素。但是,扇出到多个使用者的能力类似于 Pub/Sub。
- 在 Pub/Sub 中,消息是 fire-for-forged,无论如何都不会存储,而当使用阻止列表时,当客户端收到消息时,它会从列表中弹出(有效地删除),而流的工作方式则完全不同。所有消息都无限期地附加到流中(除非用户明确要求删除条目):不同的使用者将通过记住收到的最后一条消息的 ID 来了解新消息。
- Streams 使用者组提供了 Pub/Sub 或阻止列表无法实现的控制级别,同一流具有不同的组、已处理项目的明确确认、检查待处理项目的能力、未处理消息的声明以及每个客户端的连贯历史记录可见性,该客户端只能查看其私人过去的消息历史记录。
提供侦听到达流中的新消息的能力的命令称为 XREAD。它比 XRANGE 稍微复杂一些,因此我们将开始显示简单的表单,稍后将提供整个命令布局。
bash
> XREAD COUNT 2 STREAMS race:france 0
1) 1) "race:france"
2) 1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"
python
res17 = r.xread(streams={"race:france": 0}, count=2)
print(
res17
)
# >>> [
# ['race:france', [
# ('1692629576966-0',
# {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}
# ),
# ('1692629594113-0',
# {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}
# )
# ]
# ]
# ]
js
const res17 = await client.xRead({
key: 'race:france',
id: '0-0'
}, {
count: 2
});
console.log(res17);
java
List<Map.Entry<String, List<StreamEntry>>> res18= jedis.xread(XReadParams.xReadParams().count(2),new HashMap<String,StreamEntryID>(){{put("race:france",new StreamEntryID());}});
System.out.println(
res18
);
go
res18, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"race:france", "0"},
Count: 2,
}).Result()
if err != nil {
panic(err)
}
fmt.Println(res18)
csharp
StreamEntry[] res18 = db.StreamRead("race:france", 0, 2);
foreach (StreamEntry entry in res18)
{
Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]");
}
// >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1]
// >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1]
以上是 XREAD 的非阻塞形式。请注意,COUNT 选项不是强制性的,事实上,该命令的唯一强制性选项是 STREAMS 选项,它指定一个键列表以及调用消费者已经为每个流看到的相应最大 ID,以便该命令仅向客户端提供 ID 大于我们指定 ID 的消息。
在上面的命令中,我们编写了这样一个:我们希望 Stream 中的所有消息的 ID 都大于 。如上例所示,该命令返回 key name,因为实际上可以使用多个 key 调用此命令以同时从不同的流中读取。例如,我可以写:。请注意,在 STREAMS 选项之后,我们需要提供密钥名称,然后是 ID。因此,STREAMS 选项必须始终是最后一个选项。 任何其他选项都必须位于 STREAMS 选项之前。STREAMS race:france 0race:france0-0STREAMS race:france race:italy 0 0
除了 XREAD 可以一次访问多个流,并且我们能够指定我们拥有的最后一个 ID 来获取更新的消息之外,在这种简单的形式中,该命令与 XRANGE 相比并没有做什么不同的事情。然而,有趣的是,我们可以通过指定 BLOCK 参数轻松地将 XREAD 转换为阻塞命令:
bash
> XREAD BLOCK 0 STREAMS race:france $
请注意,在上面的示例中,除了删除 COUNT 之外,我还指定了超时为 0 毫秒的新 BLOCK 选项(这意味着永远不会超时)。此外,我没有为流mystream
传递普通 ID,而是传递了特殊 ID$
。这个特殊 ID 意味着 XREAD 应使用已存储在流mystream
中的最大 ID 作为最后一个 ID,以便从开始侦听开始,我们只接收新消息。这在某种程度上类似于 Unix 命令tail -f
请注意,当使用 BLOCK 选项时,我们不必使用 特殊 ID $
.我们可以使用任何有效的 ID。如果命令能够立即为我们的请求提供服务而不阻塞,它将这样做,否则它将阻塞。通常,如果我们想从新条目开始消费流,我们从 ID $
开始,然后我们继续使用收到的最后一条消息的 ID 进行下一次调用,依此类推。
XREAD 的阻塞形式也能够监听多个 Streams,只需指定多个 key 名称即可。如果请求可以同步提供,因为至少有一个流的元素大于我们指定的相应 ID,则它将返回结果。否则,该命令将阻塞并返回获取新数据的第一个流的项目(根据指定的 ID)。
与阻塞列表操作类似,从等待数据的 Client 端的角度来看,阻塞流读取是公平的,因为语义是 FIFO 样式。当有新项目可用时,第一个阻止给定流的客户端将是第一个被取消阻止的客户端。
XREAD 除了 COUNT 和 BLOCK 之外没有其他选项,因此它是一个非常基本的命令,具有特定目的,用于将使用者附加到一个或多个流。使用使用者组 API 可以使用更强大的使用流的功能,但是通过使用者组读取是由名为 XREADGROUP 的不同命令实现的,本指南的下一节将介绍。
消费群体
当手头的任务是使用来自不同客户端的相同流时,XREAD 已经提供了一种扇出到 N 个客户端的方法,也可能使用副本来提供更多的读取可扩展性。但是,在某些问题中,我们想要做的不是向多个客户端提供相同的消息流,而是向多个客户端提供来自同一流的不同消息子集。一个明显的有用情况是处理速度较慢的消息:拥有 N 个不同的 worker 来接收流的不同部分的能力使我们能够扩展消息处理,通过将不同的消息路由到准备做更多工作的不同 worker。
实际上,如果我们想象有三个使用者 C1、C2、C3 和一个包含消息 1、2、3、4、5、6、7 的流,那么我们想要的是根据下图提供消息:
txt
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
为了实现这一点,Redis 使用了一个称为使用者组的概念。从实施的角度来看,Redis 使用者组与 Kafka (TM) 使用者组没有任何关系,这一点非常重要。然而,它们在功能上是相似的,所以我决定保留 Kafka 的 (TM) 术语,因为它最初使这个想法流行起来。
Consumer Group 就像一个伪 Consumer,它从 Stream 中获取数据,实际上为多个 Consumer 提供服务,提供一定的保证:
- 每条消息都提供给不同的使用者,因此不可能将同一消息传递给多个使用者。
- 在使用者组中,使用者由名称标识,该名称是实现使用者的客户端必须选择的区分大小写的字符串。这意味着,即使在断开连接后,流使用者组也会保留所有状态,因为客户端将再次声明为同一使用者。但是,这也意味着由客户端提供唯一标识符。
- 每个使用者组都有第一个 ID 的概念,即从未使用过,因此,当使用者请求新消息时,它可以只提供以前未传送的消息。
- 但是,使用消息需要使用特定命令进行显式确认。Redis 将确认解释为:此消息已正确处理,因此可以将其从使用者组中逐出。
- 使用者组跟踪当前待处理的所有消息,即已传送给使用者组的某个使用者但尚未确认为已处理的消息。借助此功能,在访问流的消息历史记录时,每个使用者将只能看到已传送给它的消息。
在某种程度上,消费者组可以被想象成关于流的一定数量的状态:
txt
+----------------------------------------+
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" with pending messages |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" with pending messages |
| ... (and so forth) |
+----------------------------------------+
如果您从这个角度来看这一点,那么理解使用者组可以做什么、它如何能够只向使用者提供其待处理消息的历史记录以及请求新消息的使用者如何只获得大于last_delivered_id
的消息 ID 就非常简单了。同时,如果您将使用者组视为 Redis 流的辅助数据结构,很明显,单个流可以具有多个使用者组,这些使用者组具有不同的使用者集。实际上,同一个流甚至有可能让客户端在没有 XREAD 使用者组的情况下读取,而客户端通过 XREADGROUP 读取不同的使用者组。
现在,您可以放大以查看基本的使用者组命令。它们是:
- XGROUP 用于创建、销毁和管理使用者组。
- XREADGROUP 用于通过使用者组从流中读取数据。
- XACK 是允许使用者将待处理消息标记为已正确处理的命令。
创建消费组
假设我已经有一个 stream 类型的 keyrace:france
,为了创建一个使用者组,我只需要执行以下操作: