Skip to content

Commit

Permalink
Add support for XCLAIM in StreamOperations
Browse files Browse the repository at this point in the history
  • Loading branch information
zielarz25 authored and jxblum committed Mar 3, 2023
1 parent a39b8b6 commit 9929053
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
Expand All @@ -32,6 +33,7 @@
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
Expand Down Expand Up @@ -141,6 +143,21 @@ public Mono<RecordId> add(Record<K, ?> record) {
return createMono(connection -> connection.xAdd(serializeRecord(input)));
}

@Override
public Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime,
RecordId... recordIds) {

return createFlux(connection -> connection.xClaim(rawKey(key), group, newOwner, minIdleTime, recordIds)
.map(this::deserializeRecord));
}

@Override
public Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions) {

return createFlux(
connection -> connection.xClaim(rawKey(key), group, newOwner, xClaimOptions).map(this::deserializeRecord));
}

@Override
public Mono<Long> delete(K key, RecordId... recordIds) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.data.redis.core;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -26,6 +27,7 @@
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
Expand Down Expand Up @@ -132,6 +134,35 @@ public RecordId add(Record<K, ?> record) {
return execute(connection -> connection.xAdd(binaryRecord));
}

@Override
public List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime,
RecordId... recordIds) {
byte[] rawKey = rawKey(key);

return execute(new RecordDeserializingRedisCallback() {

@Nullable
@Override
List<ByteRecord> inRedis(RedisConnection connection) {
return connection.streamCommands().xClaim(rawKey, group, newOwner, minIdleTime, recordIds);
}
});
}

@Override
public List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions) {
byte[] rawKey = rawKey(key);

return execute(new RecordDeserializingRedisCallback() {

@Nullable
@Override
List<ByteRecord> inRedis(RedisConnection connection) {
return connection.streamCommands().xClaim(rawKey, group, newOwner, xClaimOptions);
}
});
}

@Override
public Long delete(K key, RecordId... recordIds) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;

import org.reactivestreams.Publisher;

import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
Expand Down Expand Up @@ -126,6 +128,35 @@ default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record) {
*/
Mono<RecordId> add(Record<K, ?> record);

/**
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
*
* @param key the stream key.
* @param group name of the consumer group.
* @param newOwner name of the consumer claiming the message.
* @param minIdleTime idle time required for a message to be claimed.
* @param recordIds record IDs to be claimed
*
* @return the {@link Flux} of claimed MapRecords.
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
*/
Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime, RecordId... recordIds);

/**
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
*
* @param key the stream key.
* @param group name of the consumer group.
* @param newOwner name of the consumer claiming the message.
* @param xClaimOptions additional parameters for the CLAIM call.
*
* @return the {@link Flux} of claimed MapRecords.
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
*/
Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions);

/**
* Removes the specified records from the stream. Returns the number of records deleted, that may be different from
* the number of IDs passed in case certain IDs do not exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
Expand Down Expand Up @@ -119,6 +121,35 @@ default RecordId add(MapRecord<K, ? extends HK, ? extends HV> record) {
@Nullable
RecordId add(Record<K, ?> record);

/**
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
*
* @param key the stream key.
* @param group name of the consumer group.
* @param newOwner name of the consumer claiming the message.
* @param minIdleTime idle time required for a message to be claimed.
* @param recordIds record IDs to be claimed
*
* @return list of claimed MapRecords.
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
*/
List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime, RecordId... recordIds);

/**
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
*
* @param key the stream key.
* @param group name of the consumer group.
* @param newOwner name of the consumer claiming the message.
* @param xClaimOptions additional parameters for the CLAIM call.
*
* @return list of claimed MapRecords.
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
*/
List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions);

/**
* Removes the specified records from the stream. Returns the number of records deleted, that may be different from
* the number of IDs passed in case certain IDs do not exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

import org.junit.jupiter.api.BeforeEach;

Expand Down Expand Up @@ -358,4 +360,29 @@ void pendingShouldReadMessageDetails() {
}).verifyComplete();

}

@ParameterizedRedisTest // https://github.com/spring-projects/spring-data-redis/issues/2465
void claimShouldReadMessageDetails() {

K key = keyFactory.instance();
HK hashKey = hashKeyFactory.instance();
HV value = valueFactory.instance();

Map<HK, HV> content = Collections.singletonMap(hashKey, value);
RecordId messageId = streamOperations.add(key, content).block();

streamOperations.createGroup(key, ReadOffset.from("0-0"), "my-group").then().as(StepVerifier::create)
.verifyComplete();

streamOperations.read(Consumer.from("my-group", "my-consumer"), StreamOffset.create(key, ReadOffset.lastConsumed()))
.then().as(StepVerifier::create).verifyComplete();

streamOperations.claim(key, "my-group", "name", Duration.ZERO, messageId).as(StepVerifier::create)
.assertNext(claimed -> {
assertThat(claimed.getStream()).isEqualTo(key);
assertThat(claimed.getValue()).isEqualTo(content);
assertThat(claimed.getId()).isEqualTo(messageId);
}).verifyComplete();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assumptions.*;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -34,16 +35,7 @@
import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.test.condition.EnabledOnCommand;
import org.springframework.data.redis.test.condition.EnabledOnRedisDriver;
import org.springframework.data.redis.test.condition.EnabledOnRedisVersion;
Expand Down Expand Up @@ -72,7 +64,7 @@ public class DefaultStreamOperationsIntegrationTests<K, HK, HV> {
private final StreamOperations<K, HK, HV> streamOps;

public DefaultStreamOperationsIntegrationTests(RedisTemplate<K, ?> redisTemplate, ObjectFactory<K> keyFactory,
ObjectFactory<?> objectFactory) {
ObjectFactory<?> objectFactory) {

this.redisTemplate = redisTemplate;
this.connectionFactory = redisTemplate.getRequiredConnectionFactory();
Expand Down Expand Up @@ -420,4 +412,29 @@ void pendingShouldReadMessageDetails() {
assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
}

@ParameterizedRedisTest // https://github.com/spring-projects/spring-data-redis/issues/2465
void claimShouldReadMessageDetails() {

K key = keyFactory.instance();
HK hashKey = hashKeyFactory.instance();
HV value = hashValueFactory.instance();

RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, value));
streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group");
streamOps.read(Consumer.from("my-group", "name"), StreamOffset.create(key, ReadOffset.lastConsumed()));

List<MapRecord<K, HK, HV>> messages = streamOps.claim(key, "my-group", "new-owner", Duration.ZERO, messageId);

assertThat(messages).hasSize(1);

MapRecord<K, HK, HV> message = messages.get(0);

assertThat(message.getId()).isEqualTo(messageId);
assertThat(message.getStream()).isEqualTo(key);

if (!(key instanceof byte[] || value instanceof byte[])) {
assertThat(message.getValue()).containsEntry(hashKey, value);
}
}
}

0 comments on commit 9929053

Please sign in to comment.