Skip to content

Commit 2874412

Browse files
committed
JedisCluster, JedisPool, etc
1 parent 3a0f61a commit 2874412

File tree

70 files changed

+7250
-6151
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+7250
-6151
lines changed

src/main/java/redis/clients/jedis/ClusterPipeline.java

+5
Original file line numberDiff line numberDiff line change
@@ -2859,4 +2859,9 @@ public Response<SearchResult> ftSearch(byte[] indexName, Query query) {
28592859
throw new UnsupportedOperationException("Not supported yet.");
28602860
//return appendCommand(provider.getNode(key), commandObjects.ftCreate(indexName, query));
28612861
}
2862+
2863+
@Override
2864+
public Response<Long> waitReplicas(int replicas, long timeout) {
2865+
throw new UnsupportedOperationException("Not supported yet.");
2866+
}
28622867
}

src/main/java/redis/clients/jedis/Connection.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class Connection implements Closeable {
2828
private Socket socket;
2929
private RedisOutputStream outputStream;
3030
private RedisInputStream inputStream;
31+
private int soTimeout = 0;
3132
private int infiniteSoTimeout = 0;
3233
private boolean broken = false;
3334

@@ -40,7 +41,7 @@ public Connection(final String host, final int port) {
4041
}
4142

4243
public Connection(final HostAndPort hostAndPort) {
43-
this(hostAndPort, DefaultJedisClientConfig.builder().build());
44+
this(new DefaultJedisSocketFactory(hostAndPort));
4445
}
4546

4647
public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
@@ -55,6 +56,7 @@ public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clie
5556

5657
public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig, Pool<Connection> pool) {
5758
this.socketFactory = socketFactory;
59+
this.soTimeout = clientConfig.getSocketTimeoutMillis();
5860
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
5961
initializeFromClientConfig(clientConfig);
6062
this.memberOf = pool;
@@ -70,8 +72,13 @@ public String toString() {
7072
return "Connection{" + socketFactory + "}";
7173
}
7274

75+
public int getSoTimeout() {
76+
return soTimeout;
77+
}
78+
7379
public void setSoTimeout(int soTimeout) {
74-
socketFactory.setSocketTimeout(soTimeout);
80+
// socketFactory.setSocketTimeout(soTimeout);
81+
this.soTimeout = soTimeout;
7582
if (this.socket != null) {
7683
try {
7784
this.socket.setSoTimeout(soTimeout);
@@ -96,7 +103,8 @@ public void setTimeoutInfinite() {
96103

97104
public void rollbackTimeout() {
98105
try {
99-
socket.setSoTimeout(socketFactory.getSocketTimeout());
106+
// socket.setSoTimeout(socketFactory.getSocketTimeout());
107+
socket.setSoTimeout(this.soTimeout);
100108
} catch (SocketException ex) {
101109
broken = true;
102110
throw new JedisConnectionException(ex);
@@ -202,6 +210,7 @@ public void connect() throws JedisConnectionException {
202210
if (!isConnected()) {
203211
try {
204212
socket = socketFactory.createSocket();
213+
soTimeout = socket.getSoTimeout(); //?
205214

206215
outputStream = new RedisOutputStream(socket.getOutputStream());
207216
inputStream = new RedisInputStream(socket.getInputStream());
@@ -317,6 +326,10 @@ public Object getOne() {
317326
return readProtocolWithCheckingBroken();
318327
}
319328

329+
public void setBroken() {
330+
broken = true;
331+
}
332+
320333
public boolean isBroken() {
321334
return broken;
322335
}

src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -122,16 +122,16 @@ protected HostAndPort getSocketHostAndPort() {
122122
}
123123
return hap;
124124
}
125-
126-
@Override
127-
public void setSocketTimeout(int socketTimeout) {
128-
this.socketTimeout = socketTimeout;
129-
}
130-
131-
@Override
132-
public int getSocketTimeout() {
133-
return socketTimeout;
134-
}
125+
//
126+
// @Override
127+
// public void setSocketTimeout(int socketTimeout) {
128+
// this.socketTimeout = socketTimeout;
129+
// }
130+
//
131+
// @Override
132+
// public int getSocketTimeout() {
133+
// return socketTimeout;
134+
// }
135135

136136
@Override
137137
public String toString() {

src/main/java/redis/clients/jedis/Jedis.java

+84-47
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import java.io.Closeable;
99
import java.net.URI;
10-
import java.util.ArrayList;
1110
import java.util.Collections;
1211
import java.util.Iterator;
1312
import java.util.LinkedHashSet;
@@ -40,6 +39,8 @@ public class Jedis implements ServerCommands, DatabaseCommands, JedisCommands, J
4039
private final RedisCommandObjects commandObjects = new RedisCommandObjects();
4140
private int db = 0;
4241
private Transaction transaction = null;
42+
private boolean isInMulti = false;
43+
private boolean isInWatch = false;
4344
private Pipeline pipeline = null;
4445
protected static final byte[][] DUMMY_ARRAY = new byte[0][];
4546

@@ -69,7 +70,7 @@ public Jedis(final String uriString) {
6970
}
7071

7172
public Jedis(final HostAndPort hp) {
72-
this(hp, DefaultJedisClientConfig.builder().build());
73+
connection = new Connection(hp);
7374
}
7475

7576
public Jedis(final String host, final int port) {
@@ -302,7 +303,26 @@ public Jedis(final Connection connection) {
302303

303304
@Override
304305
public String toString() {
305-
return "BinaryJedis{" + connection + '}';
306+
return "Jedis{" + connection + '}';
307+
}
308+
309+
// Legacy
310+
public Connection getClient() {
311+
return getConnection();
312+
}
313+
314+
public Connection getConnection() {
315+
return connection;
316+
}
317+
318+
// Legacy
319+
public void connect() {
320+
connection.connect();
321+
}
322+
323+
// Legacy
324+
public void disconnect() {
325+
connection.disconnect();
306326
}
307327

308328
public boolean isConnected() {
@@ -324,6 +344,11 @@ public void resetState() {
324344
}
325345
//
326346
// connection.resetState();
347+
if (isInWatch) {
348+
connection.sendCommand(UNWATCH);
349+
connection.getStatusCodeReply();
350+
isInWatch = false;
351+
}
327352
}
328353

329354
transaction = null;
@@ -349,12 +374,33 @@ public void close() {
349374
}
350375
}
351376

352-
public Connection getConnection() {
353-
return connection;
377+
// Legacy
378+
public Transaction multi() {
379+
// client.multi();
380+
// client.getOne(); // expected OK
381+
// transaction = new Transaction(client);
382+
transaction = new Transaction(this);
383+
return transaction;
354384
}
355385

356-
public Connection getClient() {
357-
return getConnection();
386+
// Legacy
387+
public Pipeline pipelined() {
388+
// pipeline = new Pipeline();
389+
// pipeline.setClient(connection);
390+
pipeline = new Pipeline(this);
391+
return pipeline;
392+
}
393+
394+
// Legacy
395+
protected void checkIsInMultiOrPipeline() {
396+
// if (connection.isInMulti()) {
397+
if (transaction != null) {
398+
throw new IllegalStateException(
399+
"Cannot use Jedis when in Multi. Please use Transaction or reset jedis state.");
400+
} else if (pipeline != null && pipeline.hasPipelinedResponse()) {
401+
throw new IllegalStateException(
402+
"Cannot use Jedis when in Pipeline. Please use Pipeline or reset jedis state.");
403+
}
358404
}
359405

360406
public int getDB() {
@@ -483,6 +529,7 @@ public String quit() {
483529
connection.sendCommand(QUIT);
484530
String quitReturn = connection.getStatusCodeReply();
485531
connection.disconnect();
532+
connection.setBroken();
486533
return quitReturn;
487534
}
488535

@@ -2189,35 +2236,13 @@ public Set<Tuple> zpopmin(final byte[] key, final int count) {
21892236
return connection.executeCommand(commandObjects.zpopmin(key, count));
21902237
}
21912238

2192-
public Pipeline pipelined() {
2193-
// pipeline = new Pipeline();
2194-
// pipeline.setClient(connection);
2195-
pipeline = new Pipeline(connection);
2196-
return pipeline;
2197-
}
2198-
2199-
public Transaction multi() {
2200-
connection.sendCommand(MULTI);
2201-
connection.getOne(); // expected OK
2202-
transaction = new Transaction(connection);
2203-
return transaction;
2204-
}
2205-
2206-
protected void checkIsInMultiOrPipeline() {
2207-
// if (connection.isInMulti()) {
2208-
if (transaction != null) {
2209-
throw new IllegalStateException(
2210-
"Cannot use Jedis when in Multi. Please use Transaction or reset jedis state.");
2211-
} else if (pipeline != null && pipeline.hasPipelinedResponse()) {
2212-
throw new IllegalStateException(
2213-
"Cannot use Jedis when in Pipeline. Please use Pipeline or reset jedis state.");
2214-
}
2215-
}
2216-
22172239
public String watch(final byte[]... keys) {
22182240
checkIsInMultiOrPipeline();
22192241
connection.sendCommand(WATCH, keys);
2220-
return connection.getStatusCodeReply();
2242+
// return connection.getStatusCodeReply();
2243+
String status = connection.getStatusCodeReply();
2244+
isInWatch = true;
2245+
return status;
22212246
}
22222247

22232248
public String unwatch() {
@@ -3260,6 +3285,7 @@ public String shutdown() throws JedisException {
32603285
} catch (JedisConnectionException jce) {
32613286
// expected
32623287
status = null;
3288+
connection.setBroken();
32633289
}
32643290
return status;
32653291
}
@@ -3271,6 +3297,7 @@ public void shutdown(final SaveMode saveMode) throws JedisException {
32713297
throw new JedisException(connection.getStatusCodeReply());
32723298
} catch (JedisConnectionException jce) {
32733299
// expected
3300+
connection.setBroken();
32743301
}
32753302
}
32763303

@@ -3751,19 +3778,19 @@ public List<Object> slowlogGetBinary(final long entries) {
37513778

37523779
@Override
37533780
public Long objectRefcount(final byte[] key) {
3754-
connection.sendCommand(OBJECT, REFCOUNT);
3781+
connection.sendCommand(OBJECT, REFCOUNT.getRaw(), key);
37553782
return connection.getIntegerReply();
37563783
}
37573784

37583785
@Override
37593786
public byte[] objectEncoding(final byte[] key) {
3760-
connection.sendCommand(OBJECT, ENCODING);
3787+
connection.sendCommand(OBJECT, ENCODING.getRaw(), key);
37613788
return connection.getBinaryBulkReply();
37623789
}
37633790

37643791
@Override
37653792
public Long objectIdletime(final byte[] key) {
3766-
connection.sendCommand(OBJECT, IDLETIME);
3793+
connection.sendCommand(OBJECT, IDLETIME.getRaw(), key);
37673794
return connection.getIntegerReply();
37683795
}
37693796

@@ -3775,7 +3802,7 @@ public List<byte[]> objectHelpBinary() {
37753802

37763803
@Override
37773804
public Long objectFreq(final byte[] key) {
3778-
connection.sendCommand(OBJECT, FREQ);
3805+
connection.sendCommand(OBJECT, FREQ.getRaw(), key);
37793806
return connection.getIntegerReply();
37803807
}
37813808

@@ -3886,13 +3913,20 @@ public Long memoryUsage(final byte[] key) {
38863913
@Override
38873914
public Long memoryUsage(final byte[] key, final int samples) {
38883915
checkIsInMultiOrPipeline();
3889-
connection.sendCommand(MEMORY, USAGE.getRaw(), key, toByteArray(samples));
3916+
connection.sendCommand(MEMORY, USAGE.getRaw(), key, SAMPLES.getRaw(), toByteArray(samples));
38903917
return connection.getIntegerReply();
38913918
}
38923919

38933920
@Override
38943921
public String failover() {
3895-
return failover(null);
3922+
checkIsInMultiOrPipeline();
3923+
connection.sendCommand(FAILOVER);
3924+
connection.setTimeoutInfinite();
3925+
try {
3926+
return connection.getStatusCodeReply();
3927+
} finally {
3928+
connection.rollbackTimeout();
3929+
}
38963930
}
38973931

38983932
@Override
@@ -4142,7 +4176,7 @@ public String migrate(final String host, final int port, final int destinationDB
41424176
final int timeout, final MigrateParams params, final byte[]... keys) {
41434177
checkIsInMultiOrPipeline();
41444178
CommandArguments args = new CommandArguments(MIGRATE).add(host).add(port).add(new byte[0]).add(destinationDB)
4145-
.add(timeout).addParams(params).keys((Object[]) keys);
4179+
.add(timeout).addParams(params).add(Keyword.KEYS).keys((Object[]) keys);
41464180
connection.sendCommand(args);
41474181
return connection.getStatusCodeReply();
41484182
}
@@ -6250,7 +6284,10 @@ public Set<Tuple> zpopmin(final String key, final int count) {
62506284
public String watch(final String... keys) {
62516285
checkIsInMultiOrPipeline();
62526286
connection.sendCommand(WATCH, keys);
6253-
return connection.getStatusCodeReply();
6287+
// return connection.getStatusCodeReply();
6288+
String status = connection.getStatusCodeReply();
6289+
isInWatch = true;
6290+
return status;
62546291
}
62556292

62566293
/**
@@ -7490,19 +7527,19 @@ public List<Slowlog> slowlogGet(final long entries) {
74907527

74917528
@Override
74927529
public Long objectRefcount(final String key) {
7493-
connection.sendCommand(OBJECT, REFCOUNT);
7530+
connection.sendCommand(OBJECT, REFCOUNT.name(), key);
74947531
return connection.getIntegerReply();
74957532
}
74967533

74977534
@Override
74987535
public String objectEncoding(final String key) {
7499-
connection.sendCommand(OBJECT, ENCODING);
7536+
connection.sendCommand(OBJECT, ENCODING.name(), key);
75007537
return connection.getBulkReply();
75017538
}
75027539

75037540
@Override
75047541
public Long objectIdletime(final String key) {
7505-
connection.sendCommand(OBJECT, IDLETIME);
7542+
connection.sendCommand(OBJECT, IDLETIME.name(), key);
75067543
return connection.getIntegerReply();
75077544
}
75087545

@@ -7514,7 +7551,7 @@ public List<String> objectHelp() {
75147551

75157552
@Override
75167553
public Long objectFreq(final String key) {
7517-
connection.sendCommand(OBJECT, FREQ);
7554+
connection.sendCommand(OBJECT, FREQ.name(), key);
75187555
return connection.getIntegerReply();
75197556
}
75207557

@@ -7900,7 +7937,7 @@ public String clientList() {
79007937
@Override
79017938
public String clientList(ClientType type) {
79027939
checkIsInMultiOrPipeline();
7903-
connection.sendCommand(CLIENT, LIST.getRaw(), type.getRaw());
7940+
connection.sendCommand(CLIENT, LIST.getRaw(), Keyword.TYPE.getRaw(), type.getRaw());
79047941
return connection.getBulkReply();
79057942
}
79067943

@@ -7939,7 +7976,7 @@ public String migrate(final String host, final int port, final int destinationDB
79397976
final int timeout, final MigrateParams params, final String... keys) {
79407977
checkIsInMultiOrPipeline();
79417978
CommandArguments args = new CommandArguments(MIGRATE).add(host).add(port).add(new byte[0]).add(destinationDB)
7942-
.add(timeout).addParams(params).keys((Object[]) keys);
7979+
.add(timeout).addParams(params).add(Keyword.KEYS).keys((Object[]) keys);
79437980
connection.sendCommand(args);
79447981
return connection.getStatusCodeReply();
79457982
}

0 commit comments

Comments
 (0)