Skip to content

Commit

Permalink
Add support for plaintext clients in TLS cluster (redis#8587)
Browse files Browse the repository at this point in the history
The cluster bus is established over TLS or non-TLS depending on the configuration tls-cluster. The client ports distributed in the cluster and sent to clients are assumed to be TLS or non-TLS also depending on tls-cluster.

The cluster bus is now extended to also contain the non-TLS port of clients in a TLS cluster, when available. The non-TLS port of a cluster node, when available, is sent to clients connected without TLS in responses to CLUSTER SLOTS, CLUSTER NODES, CLUSTER SLAVES and MOVED and ASK redirects, instead of the TLS port.

The user was able to override the client port by defining cluster-announce-port. Now cluster-announce-tls-port is added, so the user can define an alternative announce port for both TLS and non-TLS clients.

Fixes redis#8134
  • Loading branch information
zuiderkwast authored Mar 30, 2021
1 parent 8cbd858 commit 5629dbe
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 44 deletions.
18 changes: 12 additions & 6 deletions redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1500,16 +1500,21 @@ lua-time-limit 5000
#
# In order to make Redis Cluster working in such environments, a static
# configuration where each node knows its public address is needed. The
# following two options are used for this scope, and are:
# following four options are used for this scope, and are:
#
# * cluster-announce-ip
# * cluster-announce-port
# * cluster-announce-tls-port
# * cluster-announce-bus-port
#
# Each instructs the node about its address, client port, and cluster message
# bus port. The information is then published in the header of the bus packets
# so that other nodes will be able to correctly map the address of the node
# publishing the information.
# Each instructs the node about its address, client ports (for connections
# without and with TLS) and cluster message bus port. The information is then
# published in the header of the bus packets so that other nodes will be able to
# correctly map the address of the node publishing the information.
#
# If cluster-tls is set to yes and cluster-announce-tls-port is omitted or set
# to zero, then cluster-announce-port refers to the TLS port. Note also that
# cluster-announce-tls-port has no effect if cluster-tls is set to no.
#
# If the above options are not used, the normal Redis Cluster auto-detection
# will be used instead.
Expand All @@ -1522,7 +1527,8 @@ lua-time-limit 5000
# Example:
#
# cluster-announce-ip 10.1.1.5
# cluster-announce-port 6379
# cluster-announce-tls-port 6379
# cluster-announce-port 0
# cluster-announce-bus-port 6380

################################## SLOW LOG ###################################
Expand Down
97 changes: 71 additions & 26 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void clusterSendFail(char *nodename);
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
void clusterUpdateState(void);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
sds clusterGenNodesDescription(int filter);
sds clusterGenNodesDescription(int filter, int use_pport);
clusterNode *clusterLookupNode(const char *name);
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
int clusterAddSlot(clusterNode *n, int slot);
Expand Down Expand Up @@ -190,6 +190,9 @@ int clusterLoadConfig(char *filename) {
* base port. */
n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;

/* The plaintext port for client in a TLS cluster (n->pport) is not
* stored in nodes.conf. It is received later over the bus protocol. */

/* Parse flags */
p = s = argv[2];
while(p) {
Expand Down Expand Up @@ -336,7 +339,7 @@ int clusterSaveConfig(int do_fsync) {

/* Get the nodes description and concatenate our "vars" directive to
* save currentEpoch and lastVoteEpoch. */
ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE, 0);
ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
(unsigned long long) server.cluster->currentEpoch,
(unsigned long long) server.cluster->lastVoteEpoch);
Expand Down Expand Up @@ -437,6 +440,26 @@ int clusterLockConfig(char *filename) {
return C_OK;
}

/* Derives our ports to be announced in the cluster bus. */
void deriveAnnouncedPorts(int *announced_port, int *announced_pport,
int *announced_cport) {
int port = server.tls_cluster ? server.tls_port : server.port;
/* Default announced ports. */
*announced_port = port;
*announced_pport = server.tls_cluster ? server.port : 0;
*announced_cport = port + CLUSTER_PORT_INCR;
/* Config overriding announced ports. */
if (server.tls_cluster && server.cluster_announce_tls_port) {
*announced_port = server.cluster_announce_tls_port;
*announced_pport = server.cluster_announce_port;
} else if (server.cluster_announce_port) {
*announced_port = server.cluster_announce_port;
}
if (server.cluster_announce_bus_port) {
*announced_cport = server.cluster_announce_bus_port;
}
}

/* Some flags (currently just the NOFAILOVER flag) may need to be updated
* in the "myself" node based on the current configuration of the node,
* that may change at runtime via CONFIG SET. This function changes the
Expand Down Expand Up @@ -524,14 +547,9 @@ void clusterInit(void) {
memset(server.cluster->slots_keys_count,0,
sizeof(server.cluster->slots_keys_count));

/* Set myself->port / cport to my listening ports, we'll just need to
/* Set myself->port/cport/pport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
myself->port = port;
myself->cport = port+CLUSTER_PORT_INCR;
if (server.cluster_announce_port)
myself->port = server.cluster_announce_port;
if (server.cluster_announce_bus_port)
myself->cport = server.cluster_announce_bus_port;
deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);

server.cluster->mf_end = 0;
resetManualFailover();
Expand Down Expand Up @@ -782,6 +800,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
memset(node->ip,0,sizeof(node->ip));
node->port = 0;
node->cport = 0;
node->pport = 0;
node->fail_reports = listCreate();
node->voted_time = 0;
node->orphaned_time = 0;
Expand Down Expand Up @@ -1488,6 +1507,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
if (node->link) freeClusterLink(node->link);
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
node->port = ntohs(g->port);
node->pport = ntohs(g->pport);
node->cport = ntohs(g->cport);
node->flags &= ~CLUSTER_NODE_NOADDR;
}
Expand All @@ -1509,6 +1529,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
node = createClusterNode(g->nodename, flags);
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
node->port = ntohs(g->port);
node->pport = ntohs(g->pport);
node->cport = ntohs(g->cport);
clusterAddNode(node);
}
Expand Down Expand Up @@ -1548,6 +1569,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
{
char ip[NET_IP_STR_LEN] = {0};
int port = ntohs(hdr->port);
int pport = ntohs(hdr->pport);
int cport = ntohs(hdr->cport);

/* We don't proceed if the link is the same as the sender link, as this
Expand All @@ -1559,12 +1581,13 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
if (link == node->link) return 0;

nodeIp2String(ip,link,hdr->myip);
if (node->port == port && node->cport == cport &&
if (node->port == port && node->cport == cport && node->pport == pport &&
strcmp(ip,node->ip) == 0) return 0;

/* IP / port is different, update it. */
memcpy(node->ip,ip,sizeof(ip));
node->port = port;
node->pport = pport;
node->cport = cport;
if (node->link) freeClusterLink(node->link);
node->flags &= ~CLUSTER_NODE_NOADDR;
Expand Down Expand Up @@ -1862,6 +1885,7 @@ int clusterProcessPacket(clusterLink *link) {
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
nodeIp2String(node->ip,link,hdr->myip);
node->port = ntohs(hdr->port);
node->pport = ntohs(hdr->pport);
node->cport = ntohs(hdr->cport);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
Expand Down Expand Up @@ -1924,6 +1948,7 @@ int clusterProcessPacket(clusterLink *link) {
link->node->flags |= CLUSTER_NODE_NOADDR;
link->node->ip[0] = '\0';
link->node->port = 0;
link->node->pport = 0;
link->node->cport = 0;
freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
Expand Down Expand Up @@ -2423,19 +2448,16 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
hdr->myip[NET_IP_STR_LEN-1] = '\0';
}

/* Handle cluster-announce-port as well. */
int port = server.tls_cluster ? server.tls_port : server.port;
int announced_port = server.cluster_announce_port ?
server.cluster_announce_port : port;
int announced_cport = server.cluster_announce_bus_port ?
server.cluster_announce_bus_port :
(port + CLUSTER_PORT_INCR);
/* Handle cluster-announce-[tls-|bus-]port. */
int announced_port, announced_pport, announced_cport;
deriveAnnouncedPorts(&announced_port, &announced_pport, &announced_cport);

memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
memset(hdr->slaveof,0,CLUSTER_NAMELEN);
if (myself->slaveof != NULL)
memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
hdr->port = htons(announced_port);
hdr->pport = htons(announced_pport);
hdr->cport = htons(announced_cport);
hdr->flags = htons(myself->flags);
hdr->state = server.cluster->state;
Expand Down Expand Up @@ -2492,6 +2514,7 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
gossip->port = htons(n->port);
gossip->cport = htons(n->cport);
gossip->flags = htons(n->flags);
gossip->pport = htons(n->pport);
gossip->notused1 = 0;
}

Expand Down Expand Up @@ -4130,15 +4153,16 @@ sds representClusterNodeFlags(sds ci, uint16_t flags) {
* See clusterGenNodesDescription() top comment for more information.
*
* The function returns the string representation as an SDS string. */
sds clusterGenNodeDescription(clusterNode *node) {
sds clusterGenNodeDescription(clusterNode *node, int use_pport) {
int j, start;
sds ci;
int port = use_pport && node->pport ? node->pport : node->port;

/* Node coordinates */
ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN);
ci = sdscatfmt(ci," %s:%i@%i ",
node->ip,
node->port,
port,
node->cport);

/* Flags */
Expand Down Expand Up @@ -4249,10 +4273,13 @@ void clusterGenNodesSlotsInfo(int filter) {
* include all the known nodes in the representation, including nodes in
* the HANDSHAKE state.
*
* Setting use_pport to 1 in a TLS cluster makes the result contain the
* plaintext client port rather then the TLS client port of each node.
*
* The representation obtained using this function is used for the output
* of the CLUSTER NODES function, and as format for the cluster
* configuration file (nodes.conf) for a given node. */
sds clusterGenNodesDescription(int filter) {
sds clusterGenNodesDescription(int filter, int use_pport) {
sds ci = sdsempty(), ni;
dictIterator *di;
dictEntry *de;
Expand All @@ -4265,7 +4292,7 @@ sds clusterGenNodesDescription(int filter) {
clusterNode *node = dictGetVal(de);

if (node->flags & filter) continue;
ni = clusterGenNodeDescription(node);
ni = clusterGenNodeDescription(node, use_pport);
ci = sdscatsds(ci,ni);
sdsfree(ni);
ci = sdscatlen(ci,"\n",1);
Expand Down Expand Up @@ -4319,7 +4346,10 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
addReplyLongLong(c, end_slot);
addReplyArrayLen(c, 3);
addReplyBulkCString(c, node->ip);
addReplyLongLong(c, node->port);
/* Report non-TLS ports to non-TLS client in TLS cluster if available. */
int use_pport = (server.tls_cluster &&
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
addReplyLongLong(c, use_pport && node->pport ? node->pport : node->port);
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);

/* Remaining nodes in reply are replicas for slot range */
Expand All @@ -4329,7 +4359,10 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
if (nodeFailed(node->slaves[i])) continue;
addReplyArrayLen(c, 3);
addReplyBulkCString(c, node->slaves[i]->ip);
addReplyLongLong(c, node->slaves[i]->port);
/* Report slave's non-TLS port to non-TLS client in TLS cluster */
addReplyLongLong(c, (use_pport && node->slaves[i]->pport ?
node->slaves[i]->pport :
node->slaves[i]->port));
addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
nested_elements++;
}
Expand Down Expand Up @@ -4458,7 +4491,11 @@ NULL
}
} else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
/* CLUSTER NODES */
sds nodes = clusterGenNodesDescription(0);
/* Report plaintext ports, only if cluster is TLS but client is known to
* be non-TLS). */
int use_pport = (server.tls_cluster &&
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
sds nodes = clusterGenNodesDescription(0, use_pport);
addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
sdsfree(nodes);
} else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
Expand Down Expand Up @@ -4834,9 +4871,12 @@ NULL
return;
}

/* Use plaintext port if cluster is TLS but client is non-TLS. */
int use_pport = (server.tls_cluster &&
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
addReplyArrayLen(c,n->numslaves);
for (j = 0; j < n->numslaves; j++) {
sds ni = clusterGenNodeDescription(n->slaves[j]);
sds ni = clusterGenNodeDescription(n->slaves[j], use_pport);
addReplyBulkCString(c,ni);
sdsfree(ni);
}
Expand Down Expand Up @@ -5892,10 +5932,15 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
} else if (error_code == CLUSTER_REDIR_MOVED ||
error_code == CLUSTER_REDIR_ASK)
{
/* Redirect to IP:port. Include plaintext port if cluster is TLS but
* client is non-TLS. */
int use_pport = (server.tls_cluster &&
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
int port = use_pport && n->pport ? n->pport : n->port;
addReplyErrorSds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d",
(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));
hashslot, n->ip, port));
} else {
serverPanic("getNodeByQuery() unknown error.");
}
Expand Down
10 changes: 7 additions & 3 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ typedef struct clusterNode {
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
int port; /* Latest known clients port of this node */
int port; /* Latest known clients port (TLS or plain). */
int pport; /* Latest known clients plaintext port. Only used
if the main clients port is for TLS. */
int cport; /* Latest known cluster port of this node. */
clusterLink *link; /* TCP/IP link with this node */
list *fail_reports; /* List of nodes signaling this as failing */
Expand Down Expand Up @@ -194,7 +196,8 @@ typedef struct {
uint16_t port; /* base port last time it was seen */
uint16_t cport; /* cluster port last time it was seen */
uint16_t flags; /* node->flags copy */
uint32_t notused1;
uint16_t pport; /* plaintext-port, when base port is TLS */
uint16_t notused1;
} clusterMsgDataGossip;

typedef struct {
Expand Down Expand Up @@ -267,7 +270,8 @@ typedef struct {
unsigned char myslots[CLUSTER_SLOTS/8];
char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
char notused1[34]; /* 34 bytes reserved for future usage. */
char notused1[32]; /* 32 bytes reserved for future usage. */
uint16_t pport; /* Sender TCP plaintext port, if base port is TLS */
uint16_t cport; /* Sender TCP cluster bus port */
uint16_t flags; /* Sender node flags */
unsigned char state; /* Cluster state from the POV of the sender */
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2495,6 +2495,7 @@ standardConfig configs[] = {
createIntConfig("tcp-backlog", NULL, IMMUTABLE_CONFIG, 0, INT_MAX, server.tcp_backlog, 511, INTEGER_CONFIG, NULL, NULL), /* TCP listen backlog. */
createIntConfig("cluster-announce-bus-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_bus_port, 0, INTEGER_CONFIG, NULL, NULL), /* Default: Use +10000 offset. */
createIntConfig("cluster-announce-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_port, 0, INTEGER_CONFIG, NULL, NULL), /* Use server.port */
createIntConfig("cluster-announce-tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_tls_port, 0, INTEGER_CONFIG, NULL, NULL), /* Use server.tls_port */
createIntConfig("repl-timeout", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_timeout, 60, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-ping-replica-period", "repl-ping-slave-period", MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_ping_slave_period, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL),
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1548,6 +1548,7 @@ struct redisServer {
if the master is in failure state. */
char *cluster_announce_ip; /* IP address to announce on cluster bus. */
int cluster_announce_port; /* base port to announce on cluster bus. */
int cluster_announce_tls_port; /* TLS port to announce on cluster bus. */
int cluster_announce_bus_port; /* bus port to announce on cluster bus. */
int cluster_module_flags; /* Set of flags that Redis modules are able
to set in order to suppress certain
Expand Down
17 changes: 15 additions & 2 deletions tests/cluster/tests/04-resharding.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ proc process_is_running {pid} {

set numkeys 50000
set numops 200000
set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
set start_node_port [get_instance_attrib redis 0 port]
set cluster [redis_cluster 127.0.0.1:$start_node_port]
if {$::tls} {
# setup a non-TLS cluster client to the TLS cluster
set plaintext_port [get_instance_attrib redis 0 plaintext-port]
set cluster_plaintext [redis_cluster 127.0.0.1:$plaintext_port 0]
puts "Testing TLS cluster on start node 127.0.0.1:$start_node_port, plaintext port $plaintext_port"
} else {
set cluster_plaintext $cluster
puts "Testing using non-TLS cluster"
}
catch {unset content}
array set content {}
set tribpid {}
Expand Down Expand Up @@ -94,8 +104,11 @@ test "Cluster consistency during live resharding" {
# This way we are able to stress Lua -> Redis command invocation
# as well, that has tests to prevent Lua to write into wrong
# hash slots.
if {$listid % 2} {
# We also use both TLS and plaintext connections.
if {$listid % 3 == 0} {
$cluster rpush $key $ele
} elseif {$listid % 3 == 1} {
$cluster_plaintext rpush $key $ele
} else {
$cluster eval {redis.call("rpush",KEYS[1],ARGV[1])} 1 $key $ele
}
Expand Down
Loading

0 comments on commit 5629dbe

Please sign in to comment.