Skip to content

Commit

Permalink
AMBARI-20743. Prototype server websocket endpoint with STOMP sub-prot…
Browse files Browse the repository at this point in the history
…ocol. (mpapirkovskyy)
  • Loading branch information
Myroslav Papirkovskyi committed Apr 13, 2017
1 parent 6b76fc9 commit 04f8c53
Show file tree
Hide file tree
Showing 35 changed files with 944 additions and 303 deletions.
6 changes: 6 additions & 0 deletions ambari-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,12 @@
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.7</version>
</dependency>

<dependency>
<groupId>com.sun.grizzly</groupId>
<artifactId>grizzly-comet-org.apache.ambari.server.controller.utilities.webserver</artifactId>
Expand Down
9 changes: 7 additions & 2 deletions ambari-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<useIncrementalCompilation>false</useIncrementalCompilation>
<compilerArgs>
<arg>-Xlint:${xlint}</arg>
Expand Down Expand Up @@ -1292,7 +1294,6 @@
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
Expand All @@ -1306,6 +1307,10 @@
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-core</artifactId>
Expand Down Expand Up @@ -1516,7 +1521,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.1.4</version>
<version>2.8.7</version>
</dependency>
<dependency>
<groupId>net.sf.ehcache</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void init(HeartBeatHandler instance) {
/**
* Explicitly start HH
*/
public static void statHeartBeatHandler() {
public static void startHeartBeatHandler() {
hh.start();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.agent.stomp;

import javax.ws.rs.WebApplicationException;

import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.HeartBeat;
import org.apache.ambari.server.agent.HeartBeatHandler;
import org.apache.ambari.server.agent.HeartBeatResponse;
import org.apache.ambari.server.agent.Register;
import org.apache.ambari.server.agent.RegistrationResponse;
import org.apache.ambari.server.agent.RegistrationStatus;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;

import com.google.inject.Injector;

@Controller
@SendToUser("/")
@MessageMapping("/")
public class HeartbeatController {
private static Log LOG = LogFactory.getLog(HeartbeatController.class);
private final HeartBeatHandler hh;

public HeartbeatController(Injector injector) {
hh = injector.getInstance(HeartBeatHandler.class);
}

@SubscribeMapping("/register")
public RegistrationResponse register(Register message)
throws WebApplicationException, InvalidStateTransitionException {
/* Call into the heartbeat handler */

RegistrationResponse response = null;
try {
response = hh.handleRegistration(message);
LOG.debug("Sending registration response " + response);
} catch (AmbariException ex) {
response = new RegistrationResponse();
response.setResponseId(-1);
response.setResponseStatus(RegistrationStatus.FAILED);
response.setExitstatus(1);
response.setLog(ex.getMessage());
return response;
}
return response;
}

@SubscribeMapping("/heartbeat")
public HeartBeatResponse heartbeat(HeartBeat message) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received Heartbeat message " + message);
}
HeartBeatResponse heartBeatResponse;
try {
heartBeatResponse = hh.handleHeartBeat(message);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat response with response id " + heartBeatResponse.getResponseId());
LOG.debug("Response details " + heartBeatResponse);
}
} catch (Exception e) {
LOG.warn("Error in HeartBeat", e);
throw new WebApplicationException(500);
}
return heartBeatResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import org.apache.ambari.server.security.authorization.jwt.JwtAuthenticationProperties;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.server.AbstractHttpConnection;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.ErrorHandler;

Expand All @@ -52,10 +53,10 @@ public AmbariErrorHandler(@Named("prettyGson") Gson prettyGson, Configuration co

@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
AbstractHttpConnection connection = AbstractHttpConnection.getCurrentConnection();
HttpChannel connection = HttpConnection.getCurrentConnection().getHttpChannel();
connection.getRequest().setHandled(true);

response.setContentType(MimeTypes.TEXT_PLAIN);
response.setContentType(MimeTypes.Type.TEXT_PLAIN.asString());

Map<String, Object> errorMap = new LinkedHashMap<>();
int code = connection.getResponse().getStatus();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.api;

import java.lang.annotation.Annotation;
import java.util.Map;

import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.handler.annotation.support.DestinationVariableMethodArgumentResolver;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.annotation.support.MissingSessionUserException;
import org.springframework.messaging.simp.annotation.support.SendToMethodReturnValueHandler;
import org.springframework.util.ObjectUtils;
import org.springframework.util.PropertyPlaceholderHelper;

public class AmbariSendToMethodReturnValueHandler extends SendToMethodReturnValueHandler {
private final SimpMessageSendingOperations messagingTemplate;
private final boolean annotationRequired;
private PropertyPlaceholderHelper placeholderHelper = new PropertyPlaceholderHelper("{", "}", null, false);
private String defaultUserDestinationPrefix = "/queue";
private String defaultDestinationPrefix = "/topic";
public static final String CORRELATION_ID_HEADER = "correlationId";
public static final String NATIVE_HEADERS = "nativeHeaders";
public AmbariSendToMethodReturnValueHandler(SimpMessageSendingOperations messagingTemplate, boolean annotationRequired) {
super(messagingTemplate, annotationRequired);
this.messagingTemplate = messagingTemplate;
this.annotationRequired = annotationRequired;
}

@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message) throws Exception {
if (returnValue == null) {
return;
}

MessageHeaders headers = message.getHeaders();
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
PropertyPlaceholderHelper.PlaceholderResolver varResolver = initVarResolver(headers);
Object annotation = findAnnotation(returnType);

String correlationId = getCorrelationId(message);
if (annotation != null && annotation instanceof SendToUser) {
SendToUser sendToUser = (SendToUser) annotation;
boolean broadcast = sendToUser.broadcast();
String user = getUserName(message, headers);
if (user == null) {
if (sessionId == null) {
throw new MissingSessionUserException(message);
}
user = sessionId;
broadcast = false;
}
String[] destinations = getTargetDestinations(sendToUser, message, this.defaultUserDestinationPrefix);
for (String destination : destinations) {
destination = this.placeholderHelper.replacePlaceholders(destination, varResolver);
if (broadcast) {
this.messagingTemplate.convertAndSendToUser(
user, destination, returnValue, createHeaders(null, returnType, correlationId));
}
else {
this.messagingTemplate.convertAndSendToUser(
user, destination, returnValue, createHeaders(sessionId, returnType, correlationId));
}
}
}
else {
SendTo sendTo = (SendTo) annotation;
String[] destinations = getTargetDestinations(sendTo, message, this.defaultDestinationPrefix);
for (String destination : destinations) {
destination = this.placeholderHelper.replacePlaceholders(destination, varResolver);
this.messagingTemplate.convertAndSend(destination, returnValue, createHeaders(sessionId, returnType, correlationId));
}
}
}

private String getCorrelationId(Message<?> message) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.wrap(message);
return headerAccessor.getFirstNativeHeader(CORRELATION_ID_HEADER);
}

private MessageHeaders createHeaders(String sessionId, MethodParameter returnType, String correlationId) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
if (getHeaderInitializer() != null) {
getHeaderInitializer().initHeaders(headerAccessor);
}
if (sessionId != null) {
headerAccessor.setSessionId(sessionId);
}
headerAccessor.setHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER, returnType);
headerAccessor.setLeaveMutable(true);
headerAccessor.addNativeHeader(CORRELATION_ID_HEADER, correlationId);
return headerAccessor.getMessageHeaders();
}

private Object findAnnotation(MethodParameter returnType) {
Annotation[] anns = new Annotation[4];
anns[0] = AnnotatedElementUtils.findMergedAnnotation(returnType.getMethod(), SendToUser.class);
anns[1] = AnnotatedElementUtils.findMergedAnnotation(returnType.getMethod(), SendTo.class);
anns[2] = AnnotatedElementUtils.findMergedAnnotation(returnType.getDeclaringClass(), SendToUser.class);
anns[3] = AnnotatedElementUtils.findMergedAnnotation(returnType.getDeclaringClass(), SendTo.class);

if (anns[0] != null && !ObjectUtils.isEmpty(((SendToUser) anns[0]).value())) {
return anns[0];
}
if (anns[1] != null && !ObjectUtils.isEmpty(((SendTo) anns[1]).value())) {
return anns[1];
}
if (anns[2] != null && !ObjectUtils.isEmpty(((SendToUser) anns[2]).value())) {
return anns[2];
}
if (anns[3] != null && !ObjectUtils.isEmpty(((SendTo) anns[3]).value())) {
return anns[3];
}

for (int i=0; i < 4; i++) {
if (anns[i] != null) {
return anns[i];
}
}

return null;
}

private PropertyPlaceholderHelper.PlaceholderResolver initVarResolver(MessageHeaders headers) {
String name = DestinationVariableMethodArgumentResolver.DESTINATION_TEMPLATE_VARIABLES_HEADER;
Map<String, String> vars = (Map<String, String>) headers.get(name);
return new DestinationVariablePlaceholderResolver(vars);
}

@Override
public String toString() {
return "AmbariSendToMethodReturnValueHandler [annotationRequired=" + annotationRequired + "]";
}

private static class DestinationVariablePlaceholderResolver implements PropertyPlaceholderHelper.PlaceholderResolver {

private final Map<String, String> vars;

public DestinationVariablePlaceholderResolver(Map<String, String> vars) {
this.vars = vars;
}

@Override
public String resolvePlaceholder(String placeholderName) {
return (this.vars != null ? this.vars.get(placeholderName) : null);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.api.stomp;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.annotation.SendToUser;

//@Controller
@MessageMapping("clusters")
@SendToUser(destinations = "/")
public class ClustersController {

}
Loading

0 comments on commit 04f8c53

Please sign in to comment.