diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java index fb9e1b8c14..b0f5937c0c 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.Map; @@ -32,6 +33,7 @@ import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.ReactiveStreamCommands; +import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; import org.springframework.data.redis.connection.convert.Converters; import org.springframework.data.redis.connection.stream.ByteBufferRecord; import org.springframework.data.redis.connection.stream.Consumer; @@ -141,6 +143,21 @@ public Mono add(Record record) { return createMono(connection -> connection.xAdd(serializeRecord(input))); } + @Override + public Flux> claim(K key, String group, String newOwner, Duration minIdleTime, + RecordId... recordIds) { + + return createFlux(connection -> connection.xClaim(rawKey(key), group, newOwner, minIdleTime, recordIds) + .map(this::deserializeRecord)); + } + + @Override + public Flux> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions) { + + return createFlux( + connection -> connection.xClaim(rawKey(key), group, newOwner, xClaimOptions).map(this::deserializeRecord)); + } + @Override public Mono delete(K key, RecordId... recordIds) { diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java index c9f6bd5371..a209276d36 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.core; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -26,6 +27,7 @@ import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; import org.springframework.data.redis.connection.stream.ByteRecord; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; @@ -132,6 +134,35 @@ public RecordId add(Record record) { return execute(connection -> connection.xAdd(binaryRecord)); } + @Override + public List> claim(K key, String group, String newOwner, Duration minIdleTime, + RecordId... recordIds) { + byte[] rawKey = rawKey(key); + + return execute(new RecordDeserializingRedisCallback() { + + @Nullable + @Override + List inRedis(RedisConnection connection) { + return connection.streamCommands().xClaim(rawKey, group, newOwner, minIdleTime, recordIds); + } + }); + } + + @Override + public List> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions) { + byte[] rawKey = rawKey(key); + + return execute(new RecordDeserializingRedisCallback() { + + @Nullable + @Override + List inRedis(RedisConnection connection) { + return connection.streamCommands().xClaim(rawKey, group, newOwner, xClaimOptions); + } + }); + } + @Override public Long delete(K key, RecordId... recordIds) { diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java index 09eae44a67..2c920c64a8 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java @@ -18,6 +18,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.Duration; import java.util.Arrays; import java.util.Map; @@ -25,6 +26,7 @@ import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; +import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.connection.stream.Record; import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer; @@ -126,6 +128,35 @@ default Mono add(MapRecord record) { */ Mono add(Record record); + /** + * Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument. + * The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM + * + * @param key the stream key. + * @param group name of the consumer group. + * @param newOwner name of the consumer claiming the message. + * @param minIdleTime idle time required for a message to be claimed. + * @param recordIds record IDs to be claimed + * + * @return the {@link Flux} of claimed MapRecords. + * @see Redis Documentation: XCLAIM + */ + Flux> claim(K key, String group, String newOwner, Duration minIdleTime, RecordId... recordIds); + + /** + * Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument. + * The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM + * + * @param key the stream key. + * @param group name of the consumer group. + * @param newOwner name of the consumer claiming the message. + * @param xClaimOptions additional parameters for the CLAIM call. + * + * @return the {@link Flux} of claimed MapRecords. + * @see Redis Documentation: XCLAIM + */ + Flux> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions); + /** * Removes the specified records from the stream. Returns the number of records deleted, that may be different from * the number of IDs passed in case certain IDs do not exist. diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java index 6a29d937a3..823afeb1cb 100644 --- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java @@ -17,12 +17,14 @@ import reactor.core.publisher.Mono; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Map; import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; +import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.connection.stream.Record; import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers; @@ -119,6 +121,35 @@ default RecordId add(MapRecord record) { @Nullable RecordId add(Record record); + /** + * Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument. + * The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM + * + * @param key the stream key. + * @param group name of the consumer group. + * @param newOwner name of the consumer claiming the message. + * @param minIdleTime idle time required for a message to be claimed. + * @param recordIds record IDs to be claimed + * + * @return list of claimed MapRecords. + * @see Redis Documentation: XCLAIM + */ + List> claim(K key, String group, String newOwner, Duration minIdleTime, RecordId... recordIds); + + /** + * Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument. + * The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM + * + * @param key the stream key. + * @param group name of the consumer group. + * @param newOwner name of the consumer claiming the message. + * @param xClaimOptions additional parameters for the CLAIM call. + * + * @return list of claimed MapRecords. + * @see Redis Documentation: XCLAIM + */ + List> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions); + /** * Removes the specified records from the stream. Returns the number of records deleted, that may be different from * the number of IDs passed in case certain IDs do not exist. diff --git a/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java index 7e801025b9..5c758dd2a8 100644 --- a/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java @@ -20,8 +20,10 @@ import reactor.test.StepVerifier; +import java.time.Duration; import java.util.Collection; import java.util.Collections; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; @@ -358,4 +360,29 @@ void pendingShouldReadMessageDetails() { }).verifyComplete(); } + + @ParameterizedRedisTest // https://github.com/spring-projects/spring-data-redis/issues/2465 + void claimShouldReadMessageDetails() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + Map content = Collections.singletonMap(hashKey, value); + RecordId messageId = streamOperations.add(key, content).block(); + + streamOperations.createGroup(key, ReadOffset.from("0-0"), "my-group").then().as(StepVerifier::create) + .verifyComplete(); + + streamOperations.read(Consumer.from("my-group", "my-consumer"), StreamOffset.create(key, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + streamOperations.claim(key, "my-group", "name", Duration.ZERO, messageId).as(StepVerifier::create) + .assertNext(claimed -> { + assertThat(claimed.getStream()).isEqualTo(key); + assertThat(claimed.getValue()).isEqualTo(content); + assertThat(claimed.getId()).isEqualTo(messageId); + }).verifyComplete(); + + } } diff --git a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java index 80aa27b9cf..e51e0c0561 100644 --- a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assumptions.*; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -34,16 +35,7 @@ import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension; -import org.springframework.data.redis.connection.stream.Consumer; -import org.springframework.data.redis.connection.stream.MapRecord; -import org.springframework.data.redis.connection.stream.ObjectRecord; -import org.springframework.data.redis.connection.stream.PendingMessages; -import org.springframework.data.redis.connection.stream.PendingMessagesSummary; -import org.springframework.data.redis.connection.stream.ReadOffset; -import org.springframework.data.redis.connection.stream.RecordId; -import org.springframework.data.redis.connection.stream.StreamOffset; -import org.springframework.data.redis.connection.stream.StreamReadOptions; -import org.springframework.data.redis.connection.stream.StreamRecords; +import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.test.condition.EnabledOnCommand; import org.springframework.data.redis.test.condition.EnabledOnRedisDriver; import org.springframework.data.redis.test.condition.EnabledOnRedisVersion; @@ -72,7 +64,7 @@ public class DefaultStreamOperationsIntegrationTests { private final StreamOperations streamOps; public DefaultStreamOperationsIntegrationTests(RedisTemplate redisTemplate, ObjectFactory keyFactory, - ObjectFactory objectFactory) { + ObjectFactory objectFactory) { this.redisTemplate = redisTemplate; this.connectionFactory = redisTemplate.getRequiredConnectionFactory(); @@ -420,4 +412,29 @@ void pendingShouldReadMessageDetails() { assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); } + + @ParameterizedRedisTest // https://github.com/spring-projects/spring-data-redis/issues/2465 + void claimShouldReadMessageDetails() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, value)); + streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group"); + streamOps.read(Consumer.from("my-group", "name"), StreamOffset.create(key, ReadOffset.lastConsumed())); + + List> messages = streamOps.claim(key, "my-group", "new-owner", Duration.ZERO, messageId); + + assertThat(messages).hasSize(1); + + MapRecord message = messages.get(0); + + assertThat(message.getId()).isEqualTo(messageId); + assertThat(message.getStream()).isEqualTo(key); + + if (!(key instanceof byte[] || value instanceof byte[])) { + assertThat(message.getValue()).containsEntry(hashKey, value); + } + } }