Skip to content

Commit

Permalink
KNL-1336 Do a redirect to a different server.
Browse files Browse the repository at this point in the history
Rather than just removing the cookie we now have the option to pick 
another server at random in the cluster.
We also now use ENUMs in the DB.
  • Loading branch information
buckett committed Apr 1, 2015
1 parent 5396dee commit 5d9a99a
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,12 @@
# Default: 30
#session.cluster.minSecsAfterRebuild=30

# When a node is in shutdown should we redirect to another working node?
# When a node is in shutdown and this is true we pick another node in the cluster and redirect to that node by setting
# the cookie to point to that node, when this is false we just delete the cookie and let the load balancer decide
# Default: true
# cluster.redirect.random.node=false


# ########################################################################
# SERVLET CONTAINER
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.sakaiproject.cluster.api;

import java.util.Date;

/**
* This holds information about a node in the cluster.
*/
public interface ClusterNode {

/**
* Gets the status of a node.
* @return The current node status, or {@link org.sakaiproject.cluster.api.ClusterService.Status#UNKNOWN}
* if the status isn't known.
*/
ClusterService.Status getStatus();

/**
* Gets the server ID of a node.
* @return The server ID.
*/
String getServerId();

/**
* Gets when the status of the node was last updated.
* @return The date when the status of the node was last updated.
*/
Date getUpdated();
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ enum Status
* Get the statuses of the servers in the cluster.
* @return A Map of the servers with the value being the server status.
*/
Map<String, Status> getServerStatus();
Map<String, ClusterNode> getServerStatus();

/**
* Marks a server as being closed. This prevents new sessions from being started.
Expand Down
53 changes: 51 additions & 2 deletions kernel/api/src/main/java/org/sakaiproject/util/RequestFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.commons.fileupload.servlet.ServletFileUpload;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.sakaiproject.cluster.api.ClusterNode;
import org.sakaiproject.cluster.api.ClusterService;
import org.sakaiproject.cluster.api.ClusterService.Status;
import org.sakaiproject.component.api.ServerConfigurationService;
import org.sakaiproject.component.cover.ComponentManager;
import org.sakaiproject.event.api.UsageSession;
Expand Down Expand Up @@ -176,6 +179,10 @@ public class RequestFilter implements Filter

/** The tools allowed as lti provider **/
protected static final String SAKAI_BLTI_PROVIDER_TOOLS = "basiclti.provider.allowedtools";

/** The name of the Skaia property to say we should redirect to another node when in shutdown */
protected static final String SAKAI_CLUSTER_REDIRECT_RANDOM = "cluster.redirect.random.node";

/** Our log (commons). */
private static Log M_log = LogFactory.getLog(RequestFilter.class);
/** If true, we deliver the Sakai end user enterprise id as the remote user in each request. */
Expand Down Expand Up @@ -205,6 +212,8 @@ public class RequestFilter implements Filter

protected boolean m_displayModJkWarning = true;

protected boolean m_redirectRandomNode = true;

/** Default is to abort further upload processing if the max is exceeded. */
protected boolean m_uploadContinue = false;

Expand Down Expand Up @@ -555,7 +564,7 @@ protected void closingRedirect(HttpServletRequest req, HttpServletResponse res)
// We could check that we aren't in a redirect loop here, but if the load balancer doesn't know that
// a node is no longer responding to new sessions it may still be sending it new clients, and so after
// a couple of redirects it should hop off this node.
String value = "";
String value = getRedirectNode();
// set the cookie
Cookie c = new Cookie(cookieName, value);
c.setPath("/");
Expand All @@ -579,6 +588,43 @@ protected void closingRedirect(HttpServletRequest req, HttpServletResponse res)
res.sendRedirect(url.toString());
}

/**
* This looks to find a node to redirect to or if it can't find one it just empties the cookie
* so the load balancer chooses.
* @return The cookie value for a different node.
*/
protected String getRedirectNode() {
if (m_redirectRandomNode) {
ClusterService clusterService = (ClusterService) ComponentManager.get(ClusterService.class);
Map<String, ClusterNode> nodes = clusterService.getServerStatus();
// There may be more than one node listed for each node ID, just list the latest ones.
Map<String, ClusterNode> latestNodes = new HashMap<>();
for (ClusterNode node: nodes.values()) {
ClusterNode latest = latestNodes.get(node.getServerId());
if (latest == null || latest.getUpdated().after(node.getUpdated())) {
latestNodes.put(node.getServerId(), node);
}
}
// This node shouldn't ever be included but it's better safe than sorry.
latestNodes.remove(System.getProperty(SAKAI_SERVERID));
// Remove all the non-running servers.
List<String> activeServers = new ArrayList<>(latestNodes.size());
for (ClusterNode node : latestNodes.values()) {
if (Status.RUNNING.equals(node.getStatus())) {
activeServers.add(node.getServerId());
}
}
// Pick a random remaining server if we have one.
if (!(activeServers.isEmpty())) {
Random random = new Random();
int i = random.nextInt(activeServers.size());
String serverId = activeServers.get(i);
return DOT + serverId;
}
}
return "";
}

/**
* If any of these files exist, delete them.
*
Expand Down Expand Up @@ -606,6 +652,7 @@ public void init(FilterConfig filterConfig) throws ServletException
// sakai.properties settings to system properties - see SakaiPropertyPromoter()
ServerConfigurationService configService = org.sakaiproject.component.cover.ServerConfigurationService.getInstance();


// knl-640
appUrl = configService.getString("serverUrl", null);
chsDomain = configService.getString("content.chs.serverName", null);
Expand Down Expand Up @@ -772,10 +819,12 @@ else if ("tool".equalsIgnoreCase(s))
// retrieve option to enable or disable cookie HttpOnly
m_cookieHttpOnly = configService.getBoolean(SAKAI_COOKIE_HTTP_ONLY, true);

m_UACompatible = configService.getString(SAKAI_UA_COMPATIBLE,null);
m_UACompatible = configService.getString(SAKAI_UA_COMPATIBLE, null);

isLTIProviderAllowed = (configService.getString(SAKAI_BLTI_PROVIDER_TOOLS,null)!=null);

m_redirectRandomNode = configService.getBoolean(SAKAI_CLUSTER_REDIRECT_RANDOM, true);

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.sakaiproject.cluster.impl;

import org.sakaiproject.cluster.api.ClusterNode;
import org.sakaiproject.cluster.api.ClusterService;

import java.util.Date;

/**
* Simple immutable implementation of ClusterNode.
*/
public class ClusterNodeImpl implements ClusterNode {

private final String serverId;
private final ClusterService.Status status;
private final Date updated;


public ClusterNodeImpl(String serverId, ClusterService.Status status, Date updated) {
this.serverId = serverId;
this.status = status;
this.updated = updated;
}

@Override
public ClusterService.Status getStatus() {
return status;
}

@Override
public String getServerId() {
return serverId;
}

@Override
public Date getUpdated() {
return updated;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.sakaiproject.cluster.api.ClusterNode;
import org.sakaiproject.cluster.api.ClusterService;
import org.sakaiproject.component.api.ServerConfigurationService;
import org.sakaiproject.component.cover.ComponentManager;
Expand Down Expand Up @@ -293,21 +294,23 @@ public List<String> getServers()
}

@Override
public Map<String, Status> getServerStatus()
public Map<String, ClusterNode> getServerStatus()
{
String statement = clusterServiceSql.getListServerStatusSql();
final Map<String, Status> servers = new HashMap<String, Status>();
final Map<String, ClusterNode> servers = new HashMap<>();
m_sqlService.dbRead(statement, null, new SqlReader()
{
@Override
public Object readSqlResultRecord(ResultSet result) throws SqlReaderFinishedException
{
try
{
String serverInstanceId = result.getString("SERVER_ID_INSTANCE");
String serverId = result.getString("SERVER_ID");
Date updateTime = result.getTimestamp("UPDATE_TIME");
Status status = parseStatus(result.getString("STATUS"));
// Happy to put null into status? or should we convert to UNKNOWN?
servers.put(serverId, status);
ClusterNode node = new ClusterNodeImpl(serverId, status, updateTime);
servers.put(serverInstanceId, node);
}
catch (SQLException e)
{
Expand All @@ -317,14 +320,15 @@ public Object readSqlResultRecord(ResultSet result) throws SqlReaderFinishedExce
}
});
// Always override DB status with memory version.
Status dbStatus = servers.put(m_serverConfigurationService.getServerIdInstance(), status);
ClusterNode dbStatus = servers.put(m_serverConfigurationService.getServerIdInstance(),
new ClusterNodeImpl(m_serverConfigurationService.getServerId(), status, new Date()));
if (dbStatus == null)
{
M_log.warn("Failed to find ourselves in the cluster: "+ m_serverConfigurationService.getServerIdInstance());
}
else if (!status.equals(dbStatus))
else if (!status.equals(dbStatus.getStatus()))
{
M_log.warn("In memory status ("+ status+ ") different to DB ("+ dbStatus+ ")");
M_log.warn("In memory status ("+ status+ ") different to DB ("+ dbStatus.getStatus()+ ")");
}
return servers;
}
Expand Down Expand Up @@ -388,9 +392,10 @@ public void start()

// register in the cluster table
String statement = clusterServiceSql.getInsertServerSql();
Object fields[] = new Object[2];
Object fields[] = new Object[3];
fields[0] = m_serverConfigurationService.getServerIdInstance();
fields[1] = Status.STARTING.toString();
fields[2] = m_serverConfigurationService.getServerId();

boolean ok = m_sqlService.dbWrite(statement, fields);
if (!ok)
Expand Down Expand Up @@ -591,9 +596,10 @@ private void updateOurStatus(String serverIdInstance)
M_log.warn("run(): server has been closed in cluster table, reopened: " + serverIdInstance);

statement = clusterServiceSql.getInsertServerSql();
fields = new Object[2];
fields = new Object[3];
fields[0] = serverIdInstance;
fields[1] = status;
fields[2] = m_serverConfigurationService.getServerId();
boolean ok = m_sqlService.dbWrite(statement, fields);
if (!ok)
{
Expand All @@ -606,9 +612,10 @@ private void updateOurStatus(String serverIdInstance)
{
// register that this app server is alive and well
statement = clusterServiceSql.getUpdateServerSql();
fields = new Object[2];
fields = new Object[3];
fields[0] = status;
fields[1] = serverIdInstance;
fields[1] = m_serverConfigurationService.getServerId();
fields[2] = serverIdInstance;
boolean ok = m_sqlService.dbWrite(statement, fields);
if (!ok)
{
Expand Down Expand Up @@ -641,7 +648,7 @@ public Object readSqlResultRecord(ResultSet result) throws SqlReaderFinishedExce

private Status parseStatus(String statusString)
{
Status status = null;
Status status = Status.UNKNOWN;
if (statusString != null)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ public String getOrphanedLockSessionsSql()
*/
public String getDeleteServerSql()
{
return "delete from SAKAI_CLUSTER where SERVER_ID = ?";
return "delete from SAKAI_CLUSTER where SERVER_ID_INSTANCE = ?";
}

/**
* returns the sql statement for inserting a server id into the sakai_cluster table.
*/
public String getInsertServerSql()
{
return "insert into SAKAI_CLUSTER (SERVER_ID,UPDATE_TIME, STATUS) values (?, " + sqlTimestamp() + ", ?)";
return "insert into SAKAI_CLUSTER (SERVER_ID_INSTANCE, UPDATE_TIME, STATUS, SERVER_ID) values (?, " + sqlTimestamp() + ", ?, ?)";
}

/**
Expand All @@ -69,37 +69,37 @@ public String getInsertServerSql()
*/
public String getListExpiredServers(long timeout)
{
return "select SERVER_ID from SAKAI_CLUSTER where SERVER_ID != ? and DATEDIFF('ss', UPDATE_TIME, CURRENT_TIMESTAMP) >= " + timeout;
return "select SERVER_ID_INSTANCE from SAKAI_CLUSTER where SERVER_ID_INSTANCE != ? and DATEDIFF('ss', UPDATE_TIME, CURRENT_TIMESTAMP) >= " + timeout;
}

/**
* returns the sql statement for obtaining a list of sakai servers from the sakai_cluster table in server_id order.
*/
public String getListServersSql()
{
return "select SERVER_ID from SAKAI_CLUSTER order by SERVER_ID asc";
return "select SERVER_ID_INSTANCE from SAKAI_CLUSTER order by SERVER_ID_INSTANCE asc";
}

/**
* returns the sql statement for retrieving a particular server from the sakai_cluster table.
*/
public String getReadServerSql()
{
return "select SERVER_ID, STATUS from SAKAI_CLUSTER where SERVER_ID = ?";
return "select SERVER_ID_INSTANCE, STATUS from SAKAI_CLUSTER where SERVER_ID_INSTANCE = ?";
}

/**
* returns the sql statement for updating a server in the sakai_cluster table.
*/
public String getUpdateServerSql()
{
return "update SAKAI_CLUSTER set UPDATE_TIME = " + sqlTimestamp() + ", STATUS = ? where SERVER_ID = ?";
return "update SAKAI_CLUSTER set UPDATE_TIME = " + sqlTimestamp() + ", STATUS = ?, SERVER_ID = ? where SERVER_ID_INSTANCE = ?";
}

@Override
public String getListServerStatusSql()
{
return "SELECT SERVER_ID, STATUS FROM SAKAI_CLUSTER ORDER BY SERVER_ID ASC";
return "SELECT SERVER_ID_INSTANCE, STATUS, SERVER_ID, UPDATE_TIME FROM SAKAI_CLUSTER ORDER BY SERVER_ID_INSTANCE ASC";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class SakaiClusterServiceSqlMySql extends SakaiClusterServiceSqlDefault
*/
public String getListExpiredServers(long timeout)
{
return "select SERVER_ID from SAKAI_CLUSTER where SERVER_ID != ? and UPDATE_TIME < CURRENT_TIMESTAMP() - INTERVAL " + timeout + " SECOND";
return "select SERVER_ID_INSTANCE from SAKAI_CLUSTER where SERVER_ID_INSTANCE != ? and UPDATE_TIME < CURRENT_TIMESTAMP() - INTERVAL " + timeout + " SECOND";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class SakaiClusterServiceSqlOracle extends SakaiClusterServiceSqlDefault
*/
public String getListExpiredServers(long timeout)
{
return "select SERVER_ID from SAKAI_CLUSTER where SERVER_ID != ? and UPDATE_TIME < (CURRENT_TIMESTAMP - "
return "select SERVER_ID_INSTANCE from SAKAI_CLUSTER where SERVER_ID_INSTANCE != ? and UPDATE_TIME < (CURRENT_TIMESTAMP - "
+ ((float) timeout / (float) (60 * 60 * 24)) + " )";
}
}
6 changes: 4 additions & 2 deletions kernel/kernel-impl/src/main/sql/hsqldb/sakai_cluster.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

CREATE TABLE SAKAI_CLUSTER
(
SERVER_ID VARCHAR (64),
SERVER_ID_INSTANCE VARCHAR (64),
UPDATE_TIME DATETIME,
PRIMARY KEY (SERVER_ID)
STATUS VARCHAR(8), -- No enums for us here.
SERVER_ID VARCHAR(64)
PRIMARY KEY (SERVER_ID_INSTANCE)
);
Loading

0 comments on commit 5d9a99a

Please sign in to comment.