Skip to content

Commit 5c2a3fb

Browse files
committedNov 25, 2020
initial commit
0 parents  commit 5c2a3fb

21 files changed

+1317
-0
lines changed
 

‎.gitignore

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
target/
2+
pom.xml.tag
3+
pom.xml.releaseBackup
4+
pom.xml.versionsBackup
5+
pom.xml.next
6+
release.properties
7+
dependency-reduced-pom.xml
8+
buildNumber.properties
9+
.mvn/timing.properties
10+
11+
# Avoid ignoring Maven wrapper jar file (.jar files are usually ignored)
12+
!/.mvn/wrapper/maven-wrapper.jar
13+
/.classpath
14+
/.project
15+
/.settings/
16+
/.factorypath
17+
/.idea/
18+
/build/
19+
/.gradle/
20+
/bin/
21+
/dump.rdb
22+
.DS_Store

‎LICENSE

+177
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
2+
Apache License
3+
Version 2.0, January 2004
4+
http://www.apache.org/licenses/
5+
6+
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
7+
8+
1. Definitions.
9+
10+
"License" shall mean the terms and conditions for use, reproduction,
11+
and distribution as defined by Sections 1 through 9 of this document.
12+
13+
"Licensor" shall mean the copyright owner or entity authorized by
14+
the copyright owner that is granting the License.
15+
16+
"Legal Entity" shall mean the union of the acting entity and all
17+
other entities that control, are controlled by, or are under common
18+
control with that entity. For the purposes of this definition,
19+
"control" means (i) the power, direct or indirect, to cause the
20+
direction or management of such entity, whether by contract or
21+
otherwise, or (ii) ownership of fifty percent (50%) or more of the
22+
outstanding shares, or (iii) beneficial ownership of such entity.
23+
24+
"You" (or "Your") shall mean an individual or Legal Entity
25+
exercising permissions granted by this License.
26+
27+
"Source" form shall mean the preferred form for making modifications,
28+
including but not limited to software source code, documentation
29+
source, and configuration files.
30+
31+
"Object" form shall mean any form resulting from mechanical
32+
transformation or translation of a Source form, including but
33+
not limited to compiled object code, generated documentation,
34+
and conversions to other media types.
35+
36+
"Work" shall mean the work of authorship, whether in Source or
37+
Object form, made available under the License, as indicated by a
38+
copyright notice that is included in or attached to the work
39+
(an example is provided in the Appendix below).
40+
41+
"Derivative Works" shall mean any work, whether in Source or Object
42+
form, that is based on (or derived from) the Work and for which the
43+
editorial revisions, annotations, elaborations, or other modifications
44+
represent, as a whole, an original work of authorship. For the purposes
45+
of this License, Derivative Works shall not include works that remain
46+
separable from, or merely link (or bind by name) to the interfaces of,
47+
the Work and Derivative Works thereof.
48+
49+
"Contribution" shall mean any work of authorship, including
50+
the original version of the Work and any modifications or additions
51+
to that Work or Derivative Works thereof, that is intentionally
52+
submitted to Licensor for inclusion in the Work by the copyright owner
53+
or by an individual or Legal Entity authorized to submit on behalf of
54+
the copyright owner. For the purposes of this definition, "submitted"
55+
means any form of electronic, verbal, or written communication sent
56+
to the Licensor or its representatives, including but not limited to
57+
communication on electronic mailing lists, source code control systems,
58+
and issue tracking systems that are managed by, or on behalf of, the
59+
Licensor for the purpose of discussing and improving the Work, but
60+
excluding communication that is conspicuously marked or otherwise
61+
designated in writing by the copyright owner as "Not a Contribution."
62+
63+
"Contributor" shall mean Licensor and any individual or Legal Entity
64+
on behalf of whom a Contribution has been received by Licensor and
65+
subsequently incorporated within the Work.
66+
67+
2. Grant of Copyright License. Subject to the terms and conditions of
68+
this License, each Contributor hereby grants to You a perpetual,
69+
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
70+
copyright license to reproduce, prepare Derivative Works of,
71+
publicly display, publicly perform, sublicense, and distribute the
72+
Work and such Derivative Works in Source or Object form.
73+
74+
3. Grant of Patent License. Subject to the terms and conditions of
75+
this License, each Contributor hereby grants to You a perpetual,
76+
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
77+
(except as stated in this section) patent license to make, have made,
78+
use, offer to sell, sell, import, and otherwise transfer the Work,
79+
where such license applies only to those patent claims licensable
80+
by such Contributor that are necessarily infringed by their
81+
Contribution(s) alone or by combination of their Contribution(s)
82+
with the Work to which such Contribution(s) was submitted. If You
83+
institute patent litigation against any entity (including a
84+
cross-claim or counterclaim in a lawsuit) alleging that the Work
85+
or a Contribution incorporated within the Work constitutes direct
86+
or contributory patent infringement, then any patent licenses
87+
granted to You under this License for that Work shall terminate
88+
as of the date such litigation is filed.
89+
90+
4. Redistribution. You may reproduce and distribute copies of the
91+
Work or Derivative Works thereof in any medium, with or without
92+
modifications, and in Source or Object form, provided that You
93+
meet the following conditions:
94+
95+
(a) You must give any other recipients of the Work or
96+
Derivative Works a copy of this License; and
97+
98+
(b) You must cause any modified files to carry prominent notices
99+
stating that You changed the files; and
100+
101+
(c) You must retain, in the Source form of any Derivative Works
102+
that You distribute, all copyright, patent, trademark, and
103+
attribution notices from the Source form of the Work,
104+
excluding those notices that do not pertain to any part of
105+
the Derivative Works; and
106+
107+
(d) If the Work includes a "NOTICE" text file as part of its
108+
distribution, then any Derivative Works that You distribute must
109+
include a readable copy of the attribution notices contained
110+
within such NOTICE file, excluding those notices that do not
111+
pertain to any part of the Derivative Works, in at least one
112+
of the following places: within a NOTICE text file distributed
113+
as part of the Derivative Works; within the Source form or
114+
documentation, if provided along with the Derivative Works; or,
115+
within a display generated by the Derivative Works, if and
116+
wherever such third-party notices normally appear. The contents
117+
of the NOTICE file are for informational purposes only and
118+
do not modify the License. You may add Your own attribution
119+
notices within Derivative Works that You distribute, alongside
120+
or as an addendum to the NOTICE text from the Work, provided
121+
that such additional attribution notices cannot be construed
122+
as modifying the License.
123+
124+
You may add Your own copyright statement to Your modifications and
125+
may provide additional or different license terms and conditions
126+
for use, reproduction, or distribution of Your modifications, or
127+
for any such Derivative Works as a whole, provided Your use,
128+
reproduction, and distribution of the Work otherwise complies with
129+
the conditions stated in this License.
130+
131+
5. Submission of Contributions. Unless You explicitly state otherwise,
132+
any Contribution intentionally submitted for inclusion in the Work
133+
by You to the Licensor shall be under the terms and conditions of
134+
this License, without any additional terms or conditions.
135+
Notwithstanding the above, nothing herein shall supersede or modify
136+
the terms of any separate license agreement you may have executed
137+
with Licensor regarding such Contributions.
138+
139+
6. Trademarks. This License does not grant permission to use the trade
140+
names, trademarks, service marks, or product names of the Licensor,
141+
except as required for reasonable and customary use in describing the
142+
origin of the Work and reproducing the content of the NOTICE file.
143+
144+
7. Disclaimer of Warranty. Unless required by applicable law or
145+
agreed to in writing, Licensor provides the Work (and each
146+
Contributor provides its Contributions) on an "AS IS" BASIS,
147+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
148+
implied, including, without limitation, any warranties or conditions
149+
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
150+
PARTICULAR PURPOSE. You are solely responsible for determining the
151+
appropriateness of using or redistributing the Work and assume any
152+
risks associated with Your exercise of permissions under this License.
153+
154+
8. Limitation of Liability. In no event and under no legal theory,
155+
whether in tort (including negligence), contract, or otherwise,
156+
unless required by applicable law (such as deliberate and grossly
157+
negligent acts) or agreed to in writing, shall any Contributor be
158+
liable to You for damages, including any direct, indirect, special,
159+
incidental, or consequential damages of any character arising as a
160+
result of this License or out of the use or inability to use the
161+
Work (including but not limited to damages for loss of goodwill,
162+
work stoppage, computer failure or malfunction, or any and all
163+
other commercial damages or losses), even if such Contributor
164+
has been advised of the possibility of such damages.
165+
166+
9. Accepting Warranty or Additional Liability. While redistributing
167+
the Work or Derivative Works thereof, You may choose to offer,
168+
and charge a fee for, acceptance of support, warranty, indemnity,
169+
or other liability obligations and/or rights consistent with this
170+
License. However, in accepting such obligations, You may act only
171+
on Your own behalf and on Your sole responsibility, not on behalf
172+
of any other Contributor, and only if You agree to indemnify,
173+
defend, and hold each Contributor harmless for any liability
174+
incurred by, or claims asserted against, such Contributor by reason
175+
of your accepting any such warranty or additional liability.
176+
177+
END OF TERMS AND CONDITIONS

‎README.adoc

+126
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
= Mesclun
2+
:project-repo: redis-developer/mesclun
3+
:uri-repo: https://github.com/{project-repo}
4+
ifdef::env-github[]
5+
:badges:
6+
:tag: master
7+
:!toc-title:
8+
:tip-caption: :bulb:
9+
:note-caption: :paperclip:
10+
:important-caption: :heavy_exclamation_mark:
11+
:caution-caption: :fire:
12+
:warning-caption: :warning:
13+
endif::[]
14+
15+
// Badges
16+
ifdef::badges[]
17+
image:https://img.shields.io/github/license/redis-developer/mesclun.svg["License", link="https://github.com/redis-developer/mesclun"]
18+
image:https://img.shields.io/github/release/redis-developer/mesclun.svg["Latest", link="https://github.com/redis-developer/mesclun/releases/latest"]
19+
image:https://github.com/redis-developer/mesclun/workflows/CI/badge.svg["Actions", link="https://github.com/redis-developer/mesclun/workflows/CI/badge.svg"]
20+
image:https://codecov.io/gh/redis-developer/mesclun/branch/master/graph/badge.svg["Codecov", link="https://codecov.io/gh/redis-developer/mesclun"]
21+
image:https://img.shields.io/lgtm/grade/java/g/redis-developer/mesclun.svg?logo=lgtm&logoWidth=18["Language grade: Java", link="https://lgtm.com/projects/g/redis-developer/mesclun/context:java"]
22+
image:https://snyk.io/test/github/redis-developer/mesclun/badge.svg?targetFile=build.gradle["Known Vulnerabilities", link="https://snyk.io/test/github/redis-developer/mesclun?targetFile=build.gradle"]
23+
24+
image:https://img.shields.io/badge/Forum-RedisTimeSeries-blue["Forum", https://forum.redislabs.com/c/modules/redistimeseries/]
25+
endif::[]
26+
27+
Java clients for https://redislabs.com/community/oss-projects/[Redis modules] based on https://lettuce.io[Lettuce]
28+
29+
Latest release: https://github.com/redis-developer/mesclun/releases/latest
30+
31+
== Getting Started
32+
33+
Add Mesclun to your application dependencies:
34+
35+
.Gradle
36+
[source,groovy]
37+
----
38+
dependencies {
39+
implementation 'com.redislabs:mesclun:x.y.x'
40+
}
41+
----
42+
43+
.Maven
44+
[source,xml]
45+
----
46+
<dependency>
47+
<groupId>com.redislabs</groupId>
48+
<artifactId>mesclun</artifactId>
49+
<version>x.y.z</version>
50+
</dependency>
51+
----
52+
53+
== RedisTimeSeries
54+
55+
=== Basic Usage
56+
57+
[source,java]
58+
----
59+
RedisTimeSeriesClient client = RedisTimeSeriesClient.create(RedisURI.create(host, port)); // <1>
60+
StatefulRedisTimeSeriesConnection<String, String> connection = client.connect(); // <2>
61+
RedisTimeSeriesCommands<String, String> commands = connection.sync(); // <3>
62+
commands.create("temperature:3:11", CreateOptions.builder().retentionTime(6000).build(), Label.of("sensor_id", "2"), Label.of("area_id", "32")); <4>
63+
commands.add("temperature:3:11", 1548149181, 30); <5>
64+
----
65+
<1> Create a RedisTimeSeries client
66+
<2> Connect to RedisTimeSeries server
67+
<3> Use _sync_, _async_, or _reactive_ commands
68+
<4> Create a new time-series
69+
<5> Append a new sample to the series
70+
71+
=== Pipelining
72+
73+
[source,java]
74+
----
75+
RedisTimeSeriesClient client = RedisTimeSeriesClient.create(RedisURI.create(host, port)); // <1>
76+
StatefulRedisTimeSeriesConnection<String, String> connection = client.connect(); // <2>
77+
RedisTimeSeriesAsyncCommands<String, String> commands = connection.async(); // <3>
78+
commands.setAutoFlushCommands(false); // <4>
79+
List<RedisFuture<?>> futures = new ArrayList<>();
80+
for (Sample sample : samples) { // <5>
81+
RedisFuture<?> future = commands.add("temperature:3:11", sample.getTimestamp(), sample.getValue()); // <6>
82+
futures.add(future);
83+
}
84+
commands.flushCommands(); // <7>
85+
for (RedisFuture<?> future : futures) {
86+
try {
87+
future.get(1, TimeUnit.SECONDS); // <8>
88+
} catch (InterruptedException e) {
89+
log.debug("Command interrupted", e);
90+
} catch (ExecutionException e) {
91+
log.error("Could not execute command", e);
92+
} catch (TimeoutException e) {
93+
log.error("Command timed out", e);
94+
}
95+
}
96+
----
97+
<1> Create a RedisTimeSeries client
98+
<2> Connect to RedisTimeSeries server
99+
<3> Use async commands
100+
<4> Disable command auto-flush
101+
<5> Call commands to be executed in a pipeline
102+
<6> Add command execution future to the list
103+
<7> Flush commands
104+
<8> Wait for response from each future
105+
106+
=== Connection pooling
107+
108+
[source,java]
109+
----
110+
RedisTimeSeriesClient client = RedisTimeSeriesClient.create(RedisURI.create(host, port)); // <1>
111+
GenericObjectPoolConfig<StatefulRedisTimeSeriesConnection<String, String>> config = new GenericObjectPoolConfig<>(); // <2>
112+
config.setMaxTotal(8);
113+
GenericObjectPool<StatefulRedisTimeSeriesConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool(client::connect, config); // <3>
114+
// The connection pool can now be passed to worker threads
115+
try (StatefulRedisTimeSeriesConnection<String, String> connection = pool.borrowObject()) { // <4>
116+
RedisTimeSeriesCommands<String, String> commands = connection.sync(); // <5>
117+
commands.add("temperature:3:11", 1548149181, 30); // <6>
118+
}
119+
----
120+
<1> Create a RedisTimeSeries client
121+
<2> Create a pool configuration
122+
<3> Create the connection pool
123+
<4> In worker threads, get connections in a try-with statement to automatically return them to the pool
124+
<5> Use _sync_ commands
125+
<6> Execute commands
126+

‎build.gradle

+161
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
plugins {
2+
id 'distribution'
3+
id 'java-library'
4+
id 'java-library-distribution'
5+
id 'maven-publish'
6+
id 'jacoco'
7+
id 'com.jfrog.bintray' version '1.8.5'
8+
id 'net.researchgate.release' version '2.8.1'
9+
id 'com.github.ben-manes.versions' version '0.36.0'
10+
id 'com.github.breadmoirai.github-release' version '2.2.12'
11+
}
12+
13+
group = 'com.redislabs'
14+
description = 'Java client for Redis Labs modules'
15+
sourceCompatibility = '1.8'
16+
targetCompatibility = '1.8'
17+
18+
java {
19+
withJavadocJar()
20+
withSourcesJar()
21+
}
22+
23+
jacocoTestReport {
24+
reports {
25+
xml.enabled true
26+
html.enabled false
27+
}
28+
}
29+
30+
repositories {
31+
jcenter()
32+
mavenCentral()
33+
mavenLocal()
34+
}
35+
36+
dependencies {
37+
api 'io.lettuce:lettuce-core:6.0.1.RELEASE'
38+
compileOnly 'org.projectlombok:lombok:1.18.16'
39+
annotationProcessor 'org.projectlombok:lombok:1.18.16'
40+
testImplementation 'org.slf4j:slf4j-simple:1.7.30'
41+
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.11.3'
42+
testImplementation 'io.projectreactor:reactor-test:3.4.0'
43+
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
44+
testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
45+
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.7.0'
46+
testImplementation 'org.testcontainers:testcontainers:1.15.0'
47+
testImplementation 'org.testcontainers:junit-jupiter:1.15.0'
48+
testImplementation 'org.apache.commons:commons-pool2:2.9.0'
49+
testCompileOnly 'org.projectlombok:lombok:1.18.16'
50+
testAnnotationProcessor 'org.projectlombok:lombok:1.18.16'
51+
}
52+
53+
test {
54+
useJUnitPlatform()
55+
}
56+
57+
distributions {
58+
main {
59+
contents {
60+
from (project.docsDir) {
61+
into 'docs'
62+
}
63+
from 'README.adoc'
64+
from 'LICENSE'
65+
}
66+
}
67+
}
68+
69+
distTar {
70+
compression = Compression.GZIP
71+
}
72+
73+
publishing {
74+
publications {
75+
mavenJava(MavenPublication) {
76+
from components.java
77+
versionMapping {
78+
usage('java-api') {
79+
fromResolutionOf('runtimeClasspath')
80+
}
81+
usage('java-runtime') {
82+
fromResolutionResult()
83+
}
84+
}
85+
pom {
86+
name = 'Mesclun'
87+
description = 'Java client for Redis Labs module'
88+
url = 'https://github.com/redis-developer/mesclun'
89+
licenses {
90+
license {
91+
name = 'The Apache License, Version 2.0'
92+
url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
93+
}
94+
}
95+
developers {
96+
developer {
97+
id = 'jruaux'
98+
name = 'Julien Ruaux'
99+
}
100+
}
101+
scm {
102+
connection = 'scm:git:git://github.com/redis-developer/mesclun.git'
103+
developerConnection = 'scm:git:git@github.com:redis-developer/mesclun.git'
104+
url = 'https://github.com/redis-developer/mesclun'
105+
}
106+
}
107+
}
108+
}
109+
}
110+
111+
bintray {
112+
user = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : ''
113+
key = project.hasProperty('bintrayKey') ? project.property('bintrayKey') : ''
114+
publications = ['mavenJava']
115+
publish = true
116+
pkg {
117+
repo = 'maven'
118+
name = project.name
119+
licenses = ['Apache-2.0']
120+
vcsUrl = 'https://github.com/redis-developer/mesclun.git'
121+
version {
122+
gpg {
123+
sign = true
124+
}
125+
mavenCentralSync {
126+
sync = true
127+
user = project.hasProperty('ossrhUsername') ? project.property('ossrhUsername') : ''
128+
password = project.hasProperty('ossrhPassword') ? project.property('ossrhPassword') : ''
129+
}
130+
}
131+
}
132+
}
133+
134+
githubRelease {
135+
token = project.hasProperty('githubToken') ? project.property('githubToken') : ''
136+
owner "redis-developer"
137+
repo "mesclun"
138+
releaseAssets distZip, distTar
139+
draft true
140+
body changelog()
141+
}
142+
143+
def isNonStable = { String version ->
144+
def nonStableKeyword = ['PREVIEW'].any { it -> version.toUpperCase().contains(it) }
145+
def stableKeyword = ['RELEASE', 'FINAL', 'GA', 'JRE8'].any { it -> version.toUpperCase().contains(it) }
146+
def regex = /^[0-9,.v-]+([.-]r)?$/
147+
return nonStableKeyword || (!stableKeyword && !(version ==~ regex))
148+
}
149+
150+
tasks.named("dependencyUpdates").configure {
151+
152+
rejectVersionIf {
153+
isNonStable(it.candidate.version) && !isNonStable(it.currentVersion)
154+
}
155+
156+
}
157+
158+
tasks.distZip.shouldRunAfter tasks.javadocJar
159+
check.dependsOn jacocoTestReport
160+
afterReleaseBuild.dependsOn ":githubRelease"
161+
afterReleaseBuild.dependsOn bintrayUpload

‎gradle.properties

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
version=1.0.0-SNAPSHOT

‎settings.gradle

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/*
2+
* This file was generated by the Gradle 'init' task.
3+
*/
4+
5+
rootProject.name = 'mesclun'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
package com.redislabs.mesclun;
2+
3+
import java.time.Duration;
4+
5+
import com.redislabs.mesclun.api.StatefulRedisTimeSeriesConnection;
6+
import com.redislabs.mesclun.timeseries.impl.StatefulConnectionImpl;
7+
8+
import io.lettuce.core.ClientOptions;
9+
import io.lettuce.core.RedisChannelWriter;
10+
import io.lettuce.core.RedisClient;
11+
import io.lettuce.core.RedisFuture;
12+
import io.lettuce.core.RedisURI;
13+
import io.lettuce.core.codec.RedisCodec;
14+
import io.lettuce.core.internal.LettuceAssert;
15+
import io.lettuce.core.masterreplica.MasterReplica;
16+
import io.lettuce.core.protocol.PushHandler;
17+
import io.lettuce.core.resource.ClientResources;
18+
19+
/**
20+
* A scalable and thread-safe
21+
* <a href="http://redistimeseries.io/">RedisTimeSeries</a> client supporting
22+
* synchronous, asynchronous and reactive execution models. Multiple threads may
23+
* share one connection if they avoid blocking and transactional operations such
24+
* as BLPOP and MULTI/EXEC.
25+
* <p>
26+
* {@link RedisTimeSeriesClient} can be used with:
27+
* <ul>
28+
* <li>Redis Standalone</li>
29+
* </ul>
30+
*
31+
* <p>
32+
* {@link RedisTimeSeriesClient} is an expensive resource. It holds a set of
33+
* netty's {@link io.netty.channel.EventLoopGroup}'s that use multiple threads.
34+
* Reuse this instance as much as possible or share a {@link ClientResources}
35+
* instance amongst multiple client instances.
36+
*
37+
* @author Julien Ruaux
38+
* @see RedisURI
39+
* @see StatefulRedisTimeSeriesConnection
40+
* @see RedisFuture
41+
* @see reactor.core.publisher.Mono
42+
* @see reactor.core.publisher.Flux
43+
* @see RedisCodec
44+
* @see ClientOptions
45+
* @see ClientResources
46+
* @see MasterReplica
47+
*/
48+
public class RedisTimeSeriesClient extends RedisClient {
49+
50+
protected RedisTimeSeriesClient(ClientResources clientResources, RedisURI redisURI) {
51+
super(clientResources, redisURI);
52+
}
53+
54+
/**
55+
* Creates a uri-less RedisTimeSeriesClient. You can connect to different Redis
56+
* servers but you must supply a {@link RedisURI} on connecting. Methods without
57+
* having a {@link RedisURI} will fail with a
58+
* {@link java.lang.IllegalStateException}. Non-private constructor to make
59+
* {@link RedisTimeSeriesClient} proxyable.
60+
*/
61+
protected RedisTimeSeriesClient() {
62+
super();
63+
}
64+
65+
/**
66+
* Creates a uri-less RedisTimeSeriesClient with default {@link ClientResources}. You
67+
* can connect to different Redis servers but you must supply a {@link RedisURI}
68+
* on connecting. Methods without having a {@link RedisURI} will fail with a
69+
* {@link java.lang.IllegalStateException}.
70+
*
71+
* @return a new instance of {@link RedisTimeSeriesClient}
72+
*/
73+
public static RedisTimeSeriesClient create() {
74+
return new RedisTimeSeriesClient();
75+
}
76+
77+
/**
78+
* Create a new client that connects to the supplied {@link RedisURI uri} with
79+
* default {@link ClientResources}. You can connect to different Redis servers
80+
* but you must supply a {@link RedisURI} on connecting.
81+
*
82+
* @param redisURI the Redis URI, must not be {@code null}
83+
* @return a new instance of {@link RedisTimeSeriesClient}
84+
*/
85+
public static RedisTimeSeriesClient create(RedisURI redisURI) {
86+
assertNotNull(redisURI);
87+
return new RedisTimeSeriesClient(null, redisURI);
88+
}
89+
90+
/**
91+
* Create a new client that connects to the supplied uri with default
92+
* {@link ClientResources}. You can connect to different Redis servers but you
93+
* must supply a {@link RedisURI} on connecting.
94+
*
95+
* @param uri the Redis URI, must not be {@code null}
96+
* @return a new instance of {@link RedisTimeSeriesClient}
97+
*/
98+
public static RedisTimeSeriesClient create(String uri) {
99+
LettuceAssert.notEmpty(uri, "URI must not be empty");
100+
return new RedisTimeSeriesClient(null, RedisURI.create(uri));
101+
}
102+
103+
/**
104+
* Creates a uri-less RedisTimeSeriesClient with shared {@link ClientResources}. You
105+
* need to shut down the {@link ClientResources} upon shutting down your
106+
* application. You can connect to different Redis servers but you must supply a
107+
* {@link RedisURI} on connecting. Methods without having a {@link RedisURI}
108+
* will fail with a {@link java.lang.IllegalStateException}.
109+
*
110+
* @param clientResources the client resources, must not be {@code null}
111+
* @return a new instance of {@link RedisTimeSeriesClient}
112+
*/
113+
public static RedisTimeSeriesClient create(ClientResources clientResources) {
114+
assertNotNull(clientResources);
115+
return new RedisTimeSeriesClient(clientResources, new RedisURI());
116+
}
117+
118+
/**
119+
* Create a new client that connects to the supplied uri with shared
120+
* {@link ClientResources}.You need to shut down the {@link ClientResources}
121+
* upon shutting down your application. You can connect to different Redis
122+
* servers but you must supply a {@link RedisURI} on connecting.
123+
*
124+
* @param clientResources the client resources, must not be {@code null}
125+
* @param uri the Redis URI, must not be {@code null}
126+
*
127+
* @return a new instance of {@link RedisTimeSeriesClient}
128+
*/
129+
public static RedisTimeSeriesClient create(ClientResources clientResources, String uri) {
130+
assertNotNull(clientResources);
131+
LettuceAssert.notEmpty(uri, "URI must not be empty");
132+
return create(clientResources, RedisURI.create(uri));
133+
}
134+
135+
private static void assertNotNull(ClientResources clientResources) {
136+
LettuceAssert.notNull(clientResources, "ClientResources must not be null");
137+
}
138+
139+
/**
140+
* Create a new client that connects to the supplied {@link RedisURI uri} with
141+
* shared {@link ClientResources}. You need to shut down the
142+
* {@link ClientResources} upon shutting down your application.You can connect
143+
* to different Redis servers but you must supply a {@link RedisURI} on
144+
* connecting.
145+
*
146+
* @param clientResources the client resources, must not be {@code null}
147+
* @param redisURI the Redis URI, must not be {@code null}
148+
* @return a new instance of {@link RedisTimeSeriesClient}
149+
*/
150+
public static RedisTimeSeriesClient create(ClientResources clientResources, RedisURI redisURI) {
151+
assertNotNull(clientResources);
152+
assertNotNull(redisURI);
153+
return new RedisTimeSeriesClient(clientResources, redisURI);
154+
}
155+
156+
/**
157+
* Open a new connection to a RedisTimeSeries server that treats keys and values as
158+
* UTF-8 strings.
159+
*
160+
* @return A new stateful Redis connection
161+
*/
162+
@Override
163+
public StatefulRedisTimeSeriesConnection<String, String> connect() {
164+
return connect(newStringStringCodec());
165+
}
166+
167+
/**
168+
* Open a new connection to a RedisTimeSeries server. Use the supplied
169+
* {@link RedisCodec codec} to encode/decode keys and values.
170+
*
171+
* @param codec Use this codec to encode/decode keys and values, must not be
172+
* {@code null}
173+
* @param <K> Key type
174+
* @param <V> Value type
175+
* @return A new stateful Redis connection
176+
*/
177+
@Override
178+
public <K, V> StatefulRedisTimeSeriesConnection<K, V> connect(RedisCodec<K, V> codec) {
179+
return (StatefulRedisTimeSeriesConnection<K, V>) super.connect(codec);
180+
}
181+
182+
/**
183+
* Open a new connection to a RedisTimeSeries server using the supplied
184+
* {@link RedisURI} that treats keys and values as UTF-8 strings.
185+
*
186+
* @param redisURI the Redis server to connect to, must not be {@code null}
187+
* @return A new connection
188+
*/
189+
@Override
190+
public StatefulRedisTimeSeriesConnection<String, String> connect(RedisURI redisURI) {
191+
return (StatefulRedisTimeSeriesConnection<String, String>) super.connect(redisURI);
192+
}
193+
194+
/**
195+
* Open a new connection to a RedisTimeSeries server using the supplied
196+
* {@link RedisURI} and the supplied {@link RedisCodec codec} to encode/decode
197+
* keys.
198+
*
199+
* @param codec Use this codec to encode/decode keys and values, must not be
200+
* {@code null}
201+
* @param redisURI the Redis server to connect to, must not be {@code null}
202+
* @param <K> Key type
203+
* @param <V> Value type
204+
* @return A new connection
205+
*/
206+
@Override
207+
public <K, V> StatefulRedisTimeSeriesConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) {
208+
return (StatefulRedisTimeSeriesConnection<K, V>) super.connect(codec, redisURI);
209+
}
210+
211+
private static void assertNotNull(RedisURI redisURI) {
212+
LettuceAssert.notNull(redisURI, "RedisURI must not be null");
213+
}
214+
215+
/**
216+
* Create a new instance of {@link StatefulConnectionImpl} or a
217+
* subclass.
218+
* <p>
219+
* Subclasses of {@link RedisTimeSeriesClient} may override that method.
220+
*
221+
* @param channelWriter the channel writer
222+
* @param codec codec
223+
* @param timeout default timeout
224+
* @param <K> Key-Type
225+
* @param <V> Value Type
226+
* @return new instance of StatefulRedisTimeSeriesConnectionImpl
227+
*/
228+
@Override
229+
protected <K, V> StatefulConnectionImpl<K, V> newStatefulRedisConnection(RedisChannelWriter channelWriter,
230+
PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout) {
231+
return new StatefulConnectionImpl<>(channelWriter, pushHandler, codec, timeout);
232+
}
233+
234+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.redislabs.mesclun;
2+
3+
import com.redislabs.mesclun.timeseries.CreateOptions;
4+
import com.redislabs.mesclun.timeseries.Label;
5+
import com.redislabs.mesclun.timeseries.protocol.CommandKeyword;
6+
import com.redislabs.mesclun.timeseries.protocol.CommandType;
7+
8+
import io.lettuce.core.codec.RedisCodec;
9+
import io.lettuce.core.internal.LettuceAssert;
10+
import io.lettuce.core.output.CommandOutput;
11+
import io.lettuce.core.output.IntegerOutput;
12+
import io.lettuce.core.output.StatusOutput;
13+
import io.lettuce.core.protocol.BaseRedisCommandBuilder;
14+
import io.lettuce.core.protocol.Command;
15+
import io.lettuce.core.protocol.CommandArgs;
16+
17+
/**
18+
* Dedicated pub/sub command builder to build pub/sub commands.
19+
*/
20+
public class RedisTimeSeriesCommandBuilder<K, V> extends BaseRedisCommandBuilder<K, V> {
21+
22+
static final String MUST_NOT_BE_NULL = "must not be null";
23+
static final String MUST_NOT_BE_EMPTY = "must not be empty";
24+
25+
public RedisTimeSeriesCommandBuilder(RedisCodec<K, V> codec) {
26+
super(codec);
27+
}
28+
29+
private void assertNotNull(Object arg, String name) {
30+
LettuceAssert.notNull(arg, name + " " + MUST_NOT_BE_NULL);
31+
}
32+
33+
protected <A, B, T> Command<A, B, T> createCommand(CommandType type, CommandOutput<A, B, T> output,
34+
CommandArgs<A, B> args) {
35+
return new Command<>(type, output, args);
36+
}
37+
38+
public Command<K, V, String> create(K key, CreateOptions options, Label<K, V>[] labels) {
39+
CommandArgs<K, V> args = args(key);
40+
addOptions(args, options, labels);
41+
return createCommand(CommandType.CREATE, new StatusOutput<>(codec), args);
42+
}
43+
44+
public Command<K, V, Long> add(K key, long timestamp, double value, CreateOptions options, Label<K, V>[] labels) {
45+
CommandArgs<K, V> args = args(key);
46+
args.add(timestamp);
47+
args.add(value);
48+
addOptions(args, options, labels);
49+
return createCommand(CommandType.ADD, new IntegerOutput<>(codec), args);
50+
}
51+
52+
private void addOptions(CommandArgs<K, V> args, CreateOptions options, Label<K, V>[] labels) {
53+
if (options != null) {
54+
options.build(args);
55+
}
56+
if (labels != null && labels.length > 0) {
57+
args.add(CommandKeyword.LABELS);
58+
for (Label<K, V> label : labels) {
59+
args.addKey(label.getLabel());
60+
args.addValue(label.getValue());
61+
}
62+
}
63+
}
64+
65+
private CommandArgs<K, V> args(K key) {
66+
assertNotNull(key, "key");
67+
return new CommandArgs<>(codec).addKey(key);
68+
}
69+
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.redislabs.mesclun.api;
2+
3+
import com.redislabs.mesclun.api.async.RedisTimeSeriesAsyncCommands;
4+
import com.redislabs.mesclun.api.reactive.RedisTimeSeriesReactiveCommands;
5+
import com.redislabs.mesclun.api.sync.RedisTimeSeriesCommands;
6+
7+
import io.lettuce.core.api.StatefulRedisConnection;
8+
9+
public interface StatefulRedisTimeSeriesConnection<K, V> extends StatefulRedisConnection<K, V> {
10+
11+
RedisTimeSeriesCommands<K, V> sync();
12+
13+
RedisTimeSeriesAsyncCommands<K, V> async();
14+
15+
RedisTimeSeriesReactiveCommands<K, V> reactive();
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.redislabs.mesclun.api.async;
2+
3+
import com.redislabs.mesclun.api.StatefulRedisTimeSeriesConnection;
4+
import com.redislabs.mesclun.timeseries.CreateOptions;
5+
import com.redislabs.mesclun.timeseries.Label;
6+
7+
import io.lettuce.core.RedisFuture;
8+
import io.lettuce.core.api.async.RedisAsyncCommands;
9+
10+
public interface RedisTimeSeriesAsyncCommands<K, V> extends RedisAsyncCommands<K, V> {
11+
12+
StatefulRedisTimeSeriesConnection<K, V> getStatefulConnection();
13+
14+
@SuppressWarnings("unchecked")
15+
RedisFuture<String> create(K key, Label<K, V>... labels);
16+
17+
@SuppressWarnings("unchecked")
18+
RedisFuture<String> create(K key, CreateOptions options, Label<K, V>... labels);
19+
20+
@SuppressWarnings("unchecked")
21+
RedisFuture<Long> add(K key, long timestamp, double value, Label<K, V>... labels);
22+
23+
@SuppressWarnings("unchecked")
24+
RedisFuture<Long> add(K key, long timestamp, double value, CreateOptions options, Label<K, V>... labels);
25+
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.redislabs.mesclun.api.reactive;
2+
3+
import com.redislabs.mesclun.api.StatefulRedisTimeSeriesConnection;
4+
import com.redislabs.mesclun.timeseries.CreateOptions;
5+
import com.redislabs.mesclun.timeseries.Label;
6+
7+
import io.lettuce.core.api.reactive.RedisReactiveCommands;
8+
import reactor.core.publisher.Mono;
9+
10+
public interface RedisTimeSeriesReactiveCommands<K, V> extends RedisReactiveCommands<K, V> {
11+
12+
StatefulRedisTimeSeriesConnection<K, V> getStatefulConnection();
13+
14+
@SuppressWarnings("unchecked")
15+
Mono<String> create(K key, Label<K, V>... labels);
16+
17+
@SuppressWarnings("unchecked")
18+
Mono<String> create(K key, CreateOptions options, Label<K, V>... labels);
19+
20+
@SuppressWarnings("unchecked")
21+
Mono<Long> add(K key, long timestamp, double value, Label<K, V>... labels);
22+
23+
@SuppressWarnings("unchecked")
24+
Mono<Long> add(K key, long timestamp, double value, CreateOptions options, Label<K, V>... labels);
25+
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.redislabs.mesclun.api.sync;
2+
3+
import com.redislabs.mesclun.api.StatefulRedisTimeSeriesConnection;
4+
import com.redislabs.mesclun.timeseries.CreateOptions;
5+
import com.redislabs.mesclun.timeseries.Label;
6+
7+
import io.lettuce.core.api.sync.RedisCommands;
8+
9+
public interface RedisTimeSeriesCommands<K, V> extends RedisCommands<K, V> {
10+
11+
StatefulRedisTimeSeriesConnection<K, V> getStatefulConnection();
12+
13+
@SuppressWarnings("unchecked")
14+
String create(K key, Label<K, V>... labels);
15+
16+
@SuppressWarnings("unchecked")
17+
String create(K key, CreateOptions options, Label<K, V>... labels);
18+
19+
@SuppressWarnings("unchecked")
20+
Long add(K key, long timestamp, double value, Label<K, V>... labels);
21+
22+
@SuppressWarnings("unchecked")
23+
Long add(K key, long timestamp, double value, CreateOptions options, Label<K, V>... labels);
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.redislabs.mesclun.timeseries;
2+
3+
import static com.redislabs.mesclun.timeseries.protocol.CommandKeyword.CHUNK_SIZE;
4+
import static com.redislabs.mesclun.timeseries.protocol.CommandKeyword.ON_DUPLICATE;
5+
import static com.redislabs.mesclun.timeseries.protocol.CommandKeyword.RETENTION;
6+
import static com.redislabs.mesclun.timeseries.protocol.CommandKeyword.UNCOMPRESSED;
7+
8+
import io.lettuce.core.CompositeArgument;
9+
import io.lettuce.core.protocol.CommandArgs;
10+
import lombok.Builder;
11+
import lombok.Data;
12+
13+
@Data
14+
@Builder
15+
public class CreateOptions implements CompositeArgument {
16+
17+
private Long retentionTime;
18+
private boolean uncompressed;
19+
private Long chunkSize;
20+
private DuplicatePolicy policy;
21+
22+
@Override
23+
public <K, V> void build(CommandArgs<K, V> args) {
24+
if (retentionTime != null) {
25+
args.add(RETENTION);
26+
args.add(retentionTime);
27+
}
28+
if (uncompressed) {
29+
args.add(UNCOMPRESSED);
30+
}
31+
if (chunkSize != null) {
32+
args.add(CHUNK_SIZE);
33+
args.add(chunkSize);
34+
}
35+
if (policy != null) {
36+
args.add(ON_DUPLICATE);
37+
args.add(policy.name());
38+
}
39+
}
40+
41+
public static class CreateOptionsBuilder {
42+
43+
public CreateOptionsBuilder retentionTime(long retentionTime) {
44+
this.retentionTime = retentionTime;
45+
return this;
46+
}
47+
48+
public CreateOptionsBuilder chunkSize(long chunkSize) {
49+
this.chunkSize = chunkSize;
50+
return this;
51+
}
52+
}
53+
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.redislabs.mesclun.timeseries;
2+
3+
public enum DuplicatePolicy {
4+
5+
BLOCK, FIRST, LAST, MIN, MAX, SUM;
6+
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.redislabs.mesclun.timeseries;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Data;
5+
import lombok.NonNull;
6+
7+
@Data
8+
@AllArgsConstructor
9+
public class Label<K, V> {
10+
11+
@NonNull
12+
private K label;
13+
@NonNull
14+
private V value;
15+
16+
public static <K, V> Label<K, V> of(K key, V value) {
17+
return new Label<>(key, value);
18+
}
19+
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.redislabs.mesclun.timeseries.impl;
2+
3+
import com.redislabs.mesclun.RedisTimeSeriesCommandBuilder;
4+
import com.redislabs.mesclun.api.StatefulRedisTimeSeriesConnection;
5+
import com.redislabs.mesclun.api.async.RedisTimeSeriesAsyncCommands;
6+
import com.redislabs.mesclun.timeseries.CreateOptions;
7+
import com.redislabs.mesclun.timeseries.Label;
8+
9+
import io.lettuce.core.RedisAsyncCommandsImpl;
10+
import io.lettuce.core.RedisFuture;
11+
import io.lettuce.core.codec.RedisCodec;
12+
13+
public class AsyncCommandsImpl<K, V> extends RedisAsyncCommandsImpl<K, V>
14+
implements RedisTimeSeriesAsyncCommands<K, V> {
15+
16+
private final StatefulRedisTimeSeriesConnection<K, V> connection;
17+
private final RedisTimeSeriesCommandBuilder<K, V> commandBuilder;
18+
19+
public AsyncCommandsImpl(StatefulRedisTimeSeriesConnection<K, V> connection,
20+
RedisCodec<K, V> codec) {
21+
super(connection, codec);
22+
this.connection = connection;
23+
this.commandBuilder = new RedisTimeSeriesCommandBuilder<>(codec);
24+
}
25+
26+
@Override
27+
public StatefulRedisTimeSeriesConnection<K, V> getStatefulConnection() {
28+
return connection;
29+
}
30+
31+
@SuppressWarnings("unchecked")
32+
@Override
33+
public RedisFuture<String> create(K key, Label<K, V>... labels) {
34+
return dispatch(commandBuilder.create(key, null, labels));
35+
}
36+
37+
@SuppressWarnings("unchecked")
38+
@Override
39+
public RedisFuture<String> create(K key, CreateOptions options, Label<K, V>... labels) {
40+
return dispatch(commandBuilder.create(key, options, labels));
41+
}
42+
43+
@SuppressWarnings("unchecked")
44+
@Override
45+
public RedisFuture<Long> add(K key, long timestamp, double value, Label<K, V>... labels) {
46+
return dispatch(commandBuilder.add(key, timestamp, value, null, labels));
47+
}
48+
49+
@SuppressWarnings("unchecked")
50+
@Override
51+
public RedisFuture<Long> add(K key, long timestamp, double value, CreateOptions options, Label<K, V>... labels) {
52+
return dispatch(commandBuilder.add(key, timestamp, value, options, labels));
53+
}
54+
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.redislabs.mesclun.timeseries.impl;
2+
3+
import com.redislabs.mesclun.RedisTimeSeriesCommandBuilder;
4+
import com.redislabs.mesclun.api.StatefulRedisTimeSeriesConnection;
5+
import com.redislabs.mesclun.api.reactive.RedisTimeSeriesReactiveCommands;
6+
import com.redislabs.mesclun.timeseries.CreateOptions;
7+
import com.redislabs.mesclun.timeseries.Label;
8+
9+
import io.lettuce.core.RedisReactiveCommandsImpl;
10+
import io.lettuce.core.codec.RedisCodec;
11+
import reactor.core.publisher.Mono;
12+
13+
public class ReactiveCommandsImpl<K, V> extends RedisReactiveCommandsImpl<K, V>
14+
implements RedisTimeSeriesReactiveCommands<K, V> {
15+
16+
private final StatefulRedisTimeSeriesConnection<K, V> connection;
17+
private final RedisTimeSeriesCommandBuilder<K, V> commandBuilder;
18+
19+
public ReactiveCommandsImpl(StatefulRedisTimeSeriesConnection<K, V> connection, RedisCodec<K, V> codec) {
20+
super(connection, codec);
21+
this.connection = connection;
22+
this.commandBuilder = new RedisTimeSeriesCommandBuilder<>(codec);
23+
}
24+
25+
@Override
26+
public StatefulRedisTimeSeriesConnection<K, V> getStatefulConnection() {
27+
return connection;
28+
}
29+
30+
@SuppressWarnings("unchecked")
31+
@Override
32+
public Mono<String> create(K key, CreateOptions options, Label<K, V>... labels) {
33+
return createMono(() -> commandBuilder.create(key, options, labels));
34+
}
35+
36+
@SuppressWarnings("unchecked")
37+
@Override
38+
public Mono<String> create(K key, Label<K, V>... labels) {
39+
return createMono(() -> commandBuilder.create(key, null, labels));
40+
}
41+
42+
@SuppressWarnings("unchecked")
43+
@Override
44+
public Mono<Long> add(K key, long timestamp, double value, CreateOptions options, Label<K, V>... labels) {
45+
return createMono(() -> commandBuilder.add(key, timestamp, value, options, labels));
46+
}
47+
48+
@SuppressWarnings("unchecked")
49+
@Override
50+
public Mono<Long> add(K key, long timestamp, double value, Label<K, V>... labels) {
51+
return createMono(() -> commandBuilder.add(key, timestamp, value, null, labels));
52+
}
53+
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2011-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.redislabs.mesclun.timeseries.impl;
17+
18+
import java.time.Duration;
19+
20+
import com.redislabs.mesclun.api.StatefulRedisTimeSeriesConnection;
21+
import com.redislabs.mesclun.api.async.RedisTimeSeriesAsyncCommands;
22+
import com.redislabs.mesclun.api.reactive.RedisTimeSeriesReactiveCommands;
23+
import com.redislabs.mesclun.api.sync.RedisTimeSeriesCommands;
24+
25+
import io.lettuce.core.RedisChannelWriter;
26+
import io.lettuce.core.RedisReactiveCommandsImpl;
27+
import io.lettuce.core.StatefulRedisConnectionImpl;
28+
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
29+
import io.lettuce.core.codec.RedisCodec;
30+
import io.lettuce.core.protocol.ConnectionWatchdog;
31+
import io.lettuce.core.protocol.PushHandler;
32+
33+
/**
34+
* A thread-safe connection to a RedisTimeSeries server. Multiple threads may share
35+
* one {@link StatefulConnectionImpl}
36+
*
37+
* A {@link ConnectionWatchdog} monitors each connection and reconnects
38+
* automatically until {@link #close} is called. All pending commands will be
39+
* (re)sent after successful reconnection.
40+
*
41+
* @param <K> Key type.
42+
* @param <V> Value type.
43+
* @author Mark Paluch
44+
* @author Julien Ruaux
45+
*/
46+
public class StatefulConnectionImpl<K, V> extends StatefulRedisConnectionImpl<K, V>
47+
implements StatefulRedisTimeSeriesConnection<K, V> {
48+
49+
/**
50+
* Initialize a new connection.
51+
*
52+
* @param writer the channel writer.
53+
* @param pushHandler the handler for push notifications.
54+
* @param codec Codec used to encode/decode keys and values.
55+
* @param timeout Maximum time to wait for a response.
56+
*/
57+
public StatefulConnectionImpl(RedisChannelWriter writer, PushHandler pushHandler, RedisCodec<K, V> codec,
58+
Duration timeout) {
59+
super(writer, pushHandler, codec, timeout);
60+
}
61+
62+
/**
63+
* Create a new instance of {@link AsyncCommandsImpl}. Can be
64+
* overriden to extend.
65+
*/
66+
@Override
67+
protected AsyncCommandsImpl<K, V> newRedisAsyncCommandsImpl() {
68+
return new AsyncCommandsImpl<>(this, codec);
69+
}
70+
71+
/**
72+
* Create a new instance of {@link ReactiveCommandsImpl}. Can be
73+
* overriden to extend.
74+
*
75+
*/
76+
@Override
77+
protected RedisReactiveCommandsImpl<K, V> newRedisReactiveCommandsImpl() {
78+
return new ReactiveCommandsImpl<>(this, codec);
79+
}
80+
81+
/**
82+
* Create a new instance of {@link RedisTimeSeriesCommands}. Can be overriden to
83+
* extend.
84+
*
85+
* @return a new instance
86+
*/
87+
@Override
88+
protected RedisTimeSeriesCommands<K, V> newRedisSyncCommandsImpl() {
89+
return syncHandler(async(), RedisTimeSeriesCommands.class, RedisClusterCommands.class);
90+
}
91+
92+
@Override
93+
public RedisTimeSeriesAsyncCommands<K, V> async() {
94+
return (RedisTimeSeriesAsyncCommands<K, V>) super.async();
95+
}
96+
97+
@Override
98+
public RedisTimeSeriesCommands<K, V> sync() {
99+
return (RedisTimeSeriesCommands<K, V>) super.sync();
100+
}
101+
102+
@Override
103+
public RedisTimeSeriesReactiveCommands<K, V> reactive() {
104+
return (RedisTimeSeriesReactiveCommands<K, V>) super.reactive();
105+
}
106+
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.redislabs.mesclun.timeseries.protocol;
2+
3+
import java.nio.charset.StandardCharsets;
4+
5+
import io.lettuce.core.protocol.ProtocolKeyword;
6+
7+
public enum CommandKeyword implements ProtocolKeyword {
8+
9+
RETENTION, UNCOMPRESSED, CHUNK_SIZE, ON_DUPLICATE, LABELS;
10+
11+
public final byte[] bytes;
12+
13+
CommandKeyword() {
14+
bytes = name().getBytes(StandardCharsets.US_ASCII);
15+
}
16+
17+
@Override
18+
public byte[] getBytes() {
19+
return bytes;
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.redislabs.mesclun.timeseries.protocol;
2+
3+
import java.nio.charset.StandardCharsets;
4+
5+
import io.lettuce.core.protocol.ProtocolKeyword;
6+
7+
/**
8+
* RedisTimeSeries commands.
9+
*
10+
* @author Julien Ruaux
11+
*/
12+
public enum CommandType implements ProtocolKeyword {
13+
14+
ADD, CREATE;
15+
16+
private final static String PREFIX = "TS.";
17+
18+
public final byte[] bytes;
19+
20+
CommandType() {
21+
bytes = (PREFIX + name()).getBytes(StandardCharsets.US_ASCII);
22+
}
23+
24+
@Override
25+
public byte[] getBytes() {
26+
return bytes;
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.redislabs.mesclun;
2+
3+
import org.junit.jupiter.api.AfterEach;
4+
import org.junit.jupiter.api.Assertions;
5+
import org.junit.jupiter.api.BeforeEach;
6+
import org.junit.jupiter.api.Test;
7+
import org.testcontainers.containers.GenericContainer;
8+
import org.testcontainers.junit.jupiter.Container;
9+
import org.testcontainers.junit.jupiter.Testcontainers;
10+
import org.testcontainers.utility.DockerImageName;
11+
12+
import com.redislabs.mesclun.api.StatefulRedisTimeSeriesConnection;
13+
import com.redislabs.mesclun.api.async.RedisTimeSeriesAsyncCommands;
14+
import com.redislabs.mesclun.api.reactive.RedisTimeSeriesReactiveCommands;
15+
import com.redislabs.mesclun.api.sync.RedisTimeSeriesCommands;
16+
import com.redislabs.mesclun.timeseries.CreateOptions;
17+
import com.redislabs.mesclun.timeseries.Label;
18+
19+
import io.lettuce.core.RedisURI;
20+
21+
@Testcontainers
22+
public class TestTimeSeries {
23+
24+
private RedisTimeSeriesClient client;
25+
protected StatefulRedisTimeSeriesConnection<String, String> connection;
26+
protected RedisTimeSeriesCommands<String, String> sync;
27+
protected RedisTimeSeriesAsyncCommands<String, String> async;
28+
protected RedisTimeSeriesReactiveCommands<String, String> reactive;
29+
30+
protected String host;
31+
protected int port;
32+
33+
@Container
34+
@SuppressWarnings("rawtypes")
35+
public static final GenericContainer REDISTIMESERIES = new GenericContainer(
36+
DockerImageName.parse("redislabs/redistimeseries")).withExposedPorts(6379);
37+
38+
@BeforeEach
39+
public void setup() {
40+
host = REDISTIMESERIES.getHost();
41+
port = REDISTIMESERIES.getFirstMappedPort();
42+
client = RedisTimeSeriesClient.create(RedisURI.create(host, port));
43+
connection = client.connect();
44+
sync = connection.sync();
45+
async = connection.async();
46+
reactive = connection.reactive();
47+
sync.flushall();
48+
}
49+
50+
@AfterEach
51+
public void teardown() {
52+
if (connection != null) {
53+
connection.close();
54+
}
55+
if (client != null) {
56+
client.shutdown();
57+
}
58+
}
59+
60+
@SuppressWarnings("unchecked")
61+
@Test
62+
public void testCreate() {
63+
// temperature:3:11 RETENTION 6000 LABELS sensor_id 2 area_id 32
64+
String status = sync.create("temperature:3:11", CreateOptions.builder().retentionTime(6000).build(),
65+
Label.of("sensor_id", "2"), Label.of("area_id", "32"));
66+
Assertions.assertEquals("OK", status);
67+
}
68+
69+
@SuppressWarnings("unchecked")
70+
@Test
71+
public void testAdd() {
72+
// TS.CREATE temperature:3:11 RETENTION 6000 LABELS sensor_id 2 area_id 32
73+
sync.create("temperature:3:11", CreateOptions.builder().retentionTime(6000).build(), Label.of("sensor_id", "2"),
74+
Label.of("area_id", "32"));
75+
// TS.ADD temperature:3:11 1548149181 30
76+
Long add1 = sync.add("temperature:3:11", 1548149181, 30);
77+
Assertions.assertEquals(1548149181, add1);
78+
// TS.ADD temperature:3:11 1548149191 42
79+
Long add2 = sync.add("temperature:3:11", 1548149191, 42);
80+
Assertions.assertEquals(1548149191, add2);
81+
82+
}
83+
}

0 commit comments

Comments
 (0)
Please sign in to comment.