Skip to content

Commit

Permalink
Add flush method to ServerHttpResponse
Browse files Browse the repository at this point in the history
This is useful to make sure response headers are written to the
underlying response. It is also useful in conjunction with long
running, async requests and HTTP streaming, to ensure the Servlet
response buffer is sent to the client without additional delay and
also causes an IOException to be raised if the client has gone away.
  • Loading branch information
rstoyanchev committed Apr 15, 2013
1 parent db6f8f2 commit 3a2c15b
Show file tree
Hide file tree
Showing 25 changed files with 257 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,35 @@ public void completeAsync() {
// Implementation of AsyncListener methods
// ---------------------------------------------------------------------

@Override
public void onStartAsync(AsyncEvent event) throws IOException {
}

@Override
public void onError(AsyncEvent event) throws IOException {
}

@Override
public void onTimeout(AsyncEvent event) throws IOException {
for (Runnable handler : this.timeoutHandlers) {
handler.run();
try {
for (Runnable handler : this.timeoutHandlers) {
handler.run();
}
}
catch (Throwable t) {
// ignore
}
}

@Override
public void onComplete(AsyncEvent event) throws IOException {
for (Runnable handler : this.completionHandlers) {
handler.run();
try {
for (Runnable handler : this.completionHandlers) {
handler.run();
}
}
catch (Throwable t) {
// ignore
}
this.asyncContext = null;
this.asyncCompleted.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.http.server;

import java.io.Closeable;
import java.io.IOException;

import org.springframework.http.HttpOutputMessage;
import org.springframework.http.HttpStatus;
Expand All @@ -35,6 +36,11 @@ public interface ServerHttpResponse extends HttpOutputMessage, Closeable {
*/
void setStatusCode(HttpStatus status);

/**
* TODO
*/
void flush() throws IOException;

/**
* Close this response, freeing any resources created.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ public OutputStream getBody() throws IOException {
return this.servletResponse.getOutputStream();
}

@Override
public void flush() throws IOException {
writeCookies();
writeHeaders();
this.servletResponse.flushBuffer();
}

public void close() {
writeCookies();
writeHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.sockjs;

import java.io.IOException;



/**
Expand All @@ -25,7 +27,7 @@
*/
public interface SockJsSession {

void sendMessage(String text) throws Exception;
void sendMessage(String text) throws IOException;

void close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.springframework.sockjs.server;

import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.util.Date;
import java.util.concurrent.ScheduledFuture;

Expand Down Expand Up @@ -54,20 +56,25 @@ protected SockJsConfiguration getSockJsConfig() {
return this.sockJsConfig;
}

public final synchronized void sendMessage(String message) {
public final synchronized void sendMessage(String message) throws IOException {
Assert.isTrue(!isClosed(), "Cannot send a message, session has been closed");
sendMessageInternal(message);
}

protected abstract void sendMessageInternal(String message);
protected abstract void sendMessageInternal(String message) throws IOException;

public final synchronized void close() {
if (!isClosed()) {
logger.debug("Closing session");

if (isActive()) {
// deliver messages "in flight" before sending close frame
writeFrame(SockJsFrame.closeFrameGoAway());
try {
writeFrame(SockJsFrame.closeFrameGoAway());
}
catch (Exception e) {
// ignore
}
}

super.close();
Expand All @@ -83,26 +90,33 @@ public final synchronized void close() {
* For internal use within a TransportHandler and the (TransportHandler-specific)
* session sub-class. The frame is written only if the connection is active.
*/
protected void writeFrame(SockJsFrame frame) {
protected void writeFrame(SockJsFrame frame) throws IOException {
if (logger.isTraceEnabled()) {
logger.trace("Preparing to write " + frame);
}
try {
writeFrameInternal(frame);
}
catch (EOFException ex) {
logger.warn("Client went away. Terminating connection abruptly");
catch (IOException ex) {
if (ex instanceof EOFException || ex instanceof SocketException) {
logger.warn("Client went away. Terminating connection");
}
else {
logger.warn("Failed to send message. Terminating connection: " + ex.getMessage());
}
deactivate();
close();
throw ex;
}
catch (Throwable t) {
logger.warn("Failed to send message. Terminating connection abruptly: " + t.getMessage());
logger.warn("Failed to send message. Terminating connection: " + t.getMessage());
deactivate();
close();
throw new NestedSockJsRuntimeException("Failed to write frame " + frame, t);
}
}

protected abstract void writeFrameInternal(SockJsFrame frame) throws Exception;
protected abstract void writeFrameInternal(SockJsFrame frame) throws IOException;

/**
* Some {@link TransportHandler} types cannot detect if a client connection is closed
Expand All @@ -111,7 +125,7 @@ protected void writeFrame(SockJsFrame frame) {
*/
protected abstract void deactivate();

public synchronized void sendHeartbeat() {
public synchronized void sendHeartbeat() throws IOException {
if (isActive()) {
writeFrame(SockJsFrame.heartbeatFrame());
scheduleHeartbeat();
Expand All @@ -127,7 +141,12 @@ protected void scheduleHeartbeat() {
Date time = new Date(System.currentTimeMillis() + getSockJsConfig().getHeartbeatTime());
this.heartbeatTask = getSockJsConfig().getHeartbeatScheduler().schedule(new Runnable() {
public void run() {
sendHeartbeat();
try {
sendHeartbeat();
}
catch (IOException e) {
// ignore
}
}
}, time);
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,50 +214,57 @@ public final void handleRequest(ServerHttpRequest request, ServerHttpResponse re
request.getHeaders();
}
catch (IllegalArgumentException ex) {
// Ignore invalid Content-Type (TODO!!)
// Ignore invalid Content-Type (TODO)
}

if (sockJsPath.equals("") || sockJsPath.equals("/")) {
response.getHeaders().setContentType(new MediaType("text", "plain", Charset.forName("UTF-8")));
response.getBody().write("Welcome to SockJS!\n".getBytes("UTF-8"));
return;
}
else if (sockJsPath.equals("/info")) {
this.infoHandler.handle(request, response);
return;
}
else if (sockJsPath.matches("/iframe[0-9-.a-z_]*.html")) {
this.iframeHandler.handle(request, response);
return;
}
else if (sockJsPath.equals("/websocket")) {
handleRawWebSocket(request, response);
return;
}

String[] pathSegments = StringUtils.tokenizeToStringArray(sockJsPath.substring(1), "/");
if (pathSegments.length != 3) {
logger.debug("Expected /{server}/{session}/{transport} but got " + sockJsPath);
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}
try {
if (sockJsPath.equals("") || sockJsPath.equals("/")) {
response.getHeaders().setContentType(new MediaType("text", "plain", Charset.forName("UTF-8")));
response.getBody().write("Welcome to SockJS!\n".getBytes("UTF-8"));
return;
}
else if (sockJsPath.equals("/info")) {
this.infoHandler.handle(request, response);
return;
}
else if (sockJsPath.matches("/iframe[0-9-.a-z_]*.html")) {
this.iframeHandler.handle(request, response);
return;
}
else if (sockJsPath.equals("/websocket")) {
handleRawWebSocket(request, response);
return;
}

String serverId = pathSegments[0];
String sessionId = pathSegments[1];
String transport = pathSegments[2];
String[] pathSegments = StringUtils.tokenizeToStringArray(sockJsPath.substring(1), "/");
if (pathSegments.length != 3) {
logger.debug("Expected /{server}/{session}/{transport} but got " + sockJsPath);
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}

if (!validateRequest(serverId, sessionId, transport)) {
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}
String serverId = pathSegments[0];
String sessionId = pathSegments[1];
String transport = pathSegments[2];

handleRequestInternal(request, response, sessionId, TransportType.fromValue(transport));
if (!validateRequest(serverId, sessionId, transport)) {
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}

handleTransportRequest(request, response, sessionId, TransportType.fromValue(transport));
}
finally {
response.flush();
}
}

protected abstract void handleRawWebSocket(ServerHttpRequest request, ServerHttpResponse response)
throws Exception;

protected abstract void handleTransportRequest(ServerHttpRequest request, ServerHttpResponse response,
String sessionId, TransportType transportType) throws Exception;

protected boolean validateRequest(String serverId, String sessionId, String transport) {

if (!StringUtils.hasText(serverId) || !StringUtils.hasText(sessionId) || !StringUtils.hasText(transport)) {
Expand All @@ -279,9 +286,6 @@ protected boolean validateRequest(String serverId, String sessionId, String tran
return true;
}

protected abstract void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response,
String sessionId, TransportType transportType) throws Exception;

protected void addCorsHeaders(ServerHttpRequest request, ServerHttpResponse response, HttpMethod... httpMethods) {

String origin = request.getHeaders().getFirst("origin");
Expand Down Expand Up @@ -316,7 +320,6 @@ protected void sendMethodNotAllowed(ServerHttpResponse response, List<HttpMethod
logger.debug("Sending Method Not Allowed (405)");
response.setStatusCode(HttpStatus.METHOD_NOT_ALLOWED);
response.getHeaders().setAllow(new HashSet<HttpMethod>(httpMethods));
response.getBody(); // ensure headers are flushed (TODO!)
}


Expand Down Expand Up @@ -350,8 +353,6 @@ else if (HttpMethod.OPTIONS.equals(request.getMethod())) {

addCorsHeaders(request, response, HttpMethod.GET, HttpMethod.OPTIONS);
addCacheHeaders(response);

response.getBody(); // ensure headers are flushed (TODO!)
}
else {
sendMethodNotAllowed(response, Arrays.asList(HttpMethod.OPTIONS, HttpMethod.GET));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.sockjs.server;

import org.springframework.core.NestedRuntimeException;


/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
@SuppressWarnings("serial")
public class NestedSockJsRuntimeException extends NestedRuntimeException {


public NestedSockJsRuntimeException(String msg) {
super(msg);
}

public NestedSockJsRuntimeException(String msg, Throwable cause) {
super(msg, cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ public byte[] getContentBytes() {
}

public String toString() {
String quoted = this.content.replace("\n", "\\n").replace("\r", "\\r");
return "SockJsFrame content='" + quoted + "'";
String result = this.content;
if (result.length() > 80) {
result = result.substring(0, 80) + "...(truncated)";
}
return "SockJsFrame content='" + result.replace("\n", "\\n").replace("\r", "\\r") + "'";
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected void handleRawWebSocket(ServerHttpRequest request, ServerHttpResponse
}

@Override
protected void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response,
protected void handleTransportRequest(ServerHttpRequest request, ServerHttpResponse response,
String sessionId, TransportType transportType) throws Exception {

TransportHandler transportHandler = this.transportHandlers.get(transportType);
Expand All @@ -192,7 +192,6 @@ protected void handleRequestInternal(ServerHttpRequest request, ServerHttpRespon
response.setStatusCode(HttpStatus.NO_CONTENT);
addCorsHeaders(request, response, supportedMethod, HttpMethod.OPTIONS);
addCacheHeaders(response);
response.getBody(); // ensure headers are flushed (TODO!)
}
else {
List<HttpMethod> supportedMethods = Arrays.asList(supportedMethod);
Expand All @@ -214,7 +213,7 @@ protected void handleRequestInternal(ServerHttpRequest request, ServerHttpRespon
if (isJsessionIdCookieNeeded()) {
Cookie cookie = request.getCookies().getCookie("JSESSIONID");
String jsid = (cookie != null) ? cookie.getValue() : "dummy";
// TODO: Jetty sets Expires header, so bypass Cookie object for now
// TODO: bypass use of Cookie object (causes Jetty to set Expires header)
response.getHeaders().set("Set-Cookie", "JSESSIONID=" + jsid + ";path=/"); // TODO
}

Expand All @@ -223,8 +222,6 @@ protected void handleRequestInternal(ServerHttpRequest request, ServerHttpRespon
}

transportHandler.handleRequest(request, response, session);

response.close(); // ensure headers are flushed (TODO !!)
}

public SockJsSessionSupport getSockJsSession(String sessionId, TransportHandler transportHandler) {
Expand Down
Loading

0 comments on commit 3a2c15b

Please sign in to comment.