Skip to content

Commit

Permalink
Rafactor pks adapter, add create endpoint service
Browse files Browse the repository at this point in the history
- Refactoring existing code
- Added create endpoint service dealing with accepting the certificates
- Add logic allowing validating connection by making get plans call
- Allow connect to PKS anonymously - without providing credentials
- Moving some classes to more appropriate modules, still work is left
- When importing certificate store the host uri in a field named origin
- Added list plans operation to the adapter and remote client

Change-Id: I73badac77856896e2ed07653609dfd0676d965cc
Reviewed-on: https://bellevue-ci.eng.vmware.com:8080/36288
Reviewed-by: Miroslav Shipkovenski <[email protected]>
Bellevue-Verified: e_vcoauto_glob_1 <[email protected]>
Closures-Verified: e_vcoauto_glob_1 <[email protected]>
CS-Verified: e_vcoauto_glob_1 <[email protected]>
Upgrade-Verified: e_vcoauto_glob_1 <[email protected]>
  • Loading branch information
lazarin committed Jun 18, 2018
1 parent 6eac3c9 commit 6ee5c8e
Show file tree
Hide file tree
Showing 21 changed files with 704 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2018 VMware, Inc. All Rights Reserved.
*
* This product is licensed to you under the Apache License, Version 2.0 (the "License").
* You may not use this product except in compliance with the License.
*
* This product may include a number of subcomponents with separate copyright notices
* and license terms. Your use of these subcomponents is subject to the terms and
* conditions of the subcomponent's license, as noted in the LICENSE file.
*/

package com.vmware.admiral.adapter.pks;

/**
* Set of constants related with PKS
*/
public interface PKSConstants {

String CLUSTER_NAME_PROP_NAME = "__clusterName";
String VALIDATE_CONNECTION = "validate_connection";
String CREDENTIALS_LINK = "credentials";

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public enum PKSOperationType {
GET_CLUSTER("PKS.GetCluster"),
CREATE_USER("PKS.CreateUser"),
CREATE_CLUSTER("PKS.CreateCluster"),
DELETE_CLUSTER("PKS.DeleteCluster");
DELETE_CLUSTER("PKS.DeleteCluster"),
LIST_PLANS("PKS.ListPlans");

public final String id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@ public class PKSContext {
public URI pksAPIUri;
public String accessToken;
public String refreshToken;
public long expireMillisTime;
public long expireMillisTime = 0;

public static PKSContext create(PKSEndpointService.Endpoint endpoint,
UAATokenResponse uaaTokenResponse) {
PKSContext pksContext = new PKSContext();
pksContext.accessToken = uaaTokenResponse.accessToken;
pksContext.refreshToken = uaaTokenResponse.refreshToken;
pksContext.pksUAAUri = URI.create(endpoint.uaaEndpoint);
pksContext.pksAPIUri = URI.create(endpoint.apiEndpoint);
pksContext.expireMillisTime = calculateExpireTime(uaaTokenResponse.expiresIn);
if (uaaTokenResponse != null) {
pksContext.accessToken = uaaTokenResponse.accessToken;
pksContext.refreshToken = uaaTokenResponse.refreshToken;
pksContext.expireMillisTime = calculateExpireTime(uaaTokenResponse.expiresIn);
}

return pksContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@

package com.vmware.admiral.adapter.pks.service;

import java.net.URI;
import static com.vmware.admiral.adapter.pks.PKSConstants.CLUSTER_NAME_PROP_NAME;
import static com.vmware.admiral.adapter.pks.PKSConstants.VALIDATE_CONNECTION;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
Expand All @@ -25,6 +27,7 @@
import com.google.common.cache.CacheBuilder;

import com.vmware.admiral.adapter.common.AdapterRequest;
import com.vmware.admiral.adapter.pks.PKSConstants;
import com.vmware.admiral.adapter.pks.PKSContext;
import com.vmware.admiral.adapter.pks.PKSException;
import com.vmware.admiral.adapter.pks.PKSOperationType;
Expand All @@ -36,7 +39,7 @@
import com.vmware.admiral.common.util.DeferredUtils;
import com.vmware.admiral.common.util.PropertyUtils;
import com.vmware.admiral.common.util.ServerX509TrustManager;
import com.vmware.admiral.compute.pks.PKSEndpointService;
import com.vmware.admiral.compute.pks.PKSEndpointService.Endpoint;
import com.vmware.photon.controller.model.resources.ComputeService.ComputeState;
import com.vmware.photon.controller.model.security.util.AuthCredentialsType;
import com.vmware.photon.controller.model.security.util.EncryptionUtils;
Expand All @@ -50,8 +53,6 @@ public class PKSAdapterService extends StatelessService {

public static final String SELF_LINK = ManagementUriParts.ADAPTER_PKS;

public static final String CLUSTER_NAME_PROP_NAME = "__clusterName";

private static final long MAINTENANCE_INTERVAL_MICROS = Long.getLong(
"dcp.management.docker.adapter.periodic.maintenance.period.micros",
TimeUnit.SECONDS.toMicros(10));
Expand All @@ -63,10 +64,10 @@ public class PKSAdapterService extends StatelessService {

private static final long REMOVE_TOKEN_BEFORE_EXPIRE_MILLIS = 2 * MAINTENANCE_INTERVAL_MICROS;

public static class RequestContext extends AdapterRequest {
private static class RequestContext {
public Operation operation;
public List<String> tenantLinks;
public PKSEndpointService.Endpoint endpoint;
public AdapterRequest request;
public Endpoint endpoint;
public ComputeState computeState;
}

Expand All @@ -93,11 +94,11 @@ public void handleStop(Operation op) {

@Override
public void handlePatch(Operation op) {
RequestContext ctx = op.getBody(RequestContext.class);
ctx.validate();
RequestContext ctx = new RequestContext();
ctx.request = getRequest(op);
ctx.operation = op;

getEndpoint(ctx.resourceReference)
getEndpoint(ctx)
.thenAccept(e -> ctx.endpoint = e)
.thenCompose(aVoid -> processOperationWithRetry(ctx))
.exceptionally(t -> {
Expand Down Expand Up @@ -137,7 +138,7 @@ public void handlePeriodicMaintenance(Operation op) {
}

private DeferredResult<Void> processOperationWithRetry(RequestContext ctx) {
PKSOperationType ot = PKSOperationType.instanceById(ctx.operationTypeId);
PKSOperationType ot = PKSOperationType.instanceById(ctx.request.operationTypeId);
logInfo("Received [%s] for endpoint [%s]", ot.getDisplayName(), ctx.endpoint.name);

DeferredResult<Void> result = new DeferredResult<>();
Expand All @@ -164,26 +165,8 @@ private Function<RetriableTask<Void>, DeferredResult<Void>> processOperationTask
return (task) -> {
DeferredResult<Void> result = new DeferredResult<>();
try {
switch (operationType) {
case LIST_CLUSTERS:
result = pksListClusters(ctx);
break;
case GET_CLUSTER:
result = pksGetCluster(ctx);
break;
case CREATE_USER:
result = pksCreateUser(ctx);
break;
/*
case CREATE_CLUSTER:
//TODO implementation
break;
case DELETE_CLUSTER:
//TODO implementation
break;
*/
default:
result.fail(new IllegalArgumentException("unsupported operation"));
result = initDeferredOperation(operationType, ctx);
if (result.toCompletionStage().toCompletableFuture().isCompletedExceptionally()) {
return result;
}

Expand All @@ -193,12 +176,16 @@ private Function<RetriableTask<Void>, DeferredResult<Void>> processOperationTask
if (t instanceof PKSException) {
PKSException p = (PKSException) t;
if (p.getErrorCode() == Operation.STATUS_CODE_UNAUTHORIZED) {
logInfo("Operation for %s returned code 401, invalidate token and retry",
logInfo("Operation for %s returns code 401, invalidate token and retry",
ctx.endpoint.documentSelfLink);
pksContextCache.invalidate(ctx.endpoint.documentSelfLink);
if (ctx.endpoint.documentSelfLink != null) {
pksContextCache.invalidate(ctx.endpoint.documentSelfLink);
}
} else {
task.preventRetries();
}
} else {
task.preventRetries();
}
throw DeferredUtils.wrap(t);
});
Expand All @@ -210,9 +197,39 @@ private Function<RetriableTask<Void>, DeferredResult<Void>> processOperationTask
};
}

private DeferredResult<Void> initDeferredOperation(PKSOperationType operationType,
RequestContext ctx) {
DeferredResult<Void> result = new DeferredResult<>();
switch (operationType) {
case LIST_CLUSTERS:
result = pksListClusters(ctx);
break;
case GET_CLUSTER:
result = pksGetCluster(ctx);
break;
case CREATE_USER:
result = pksCreateUser(ctx);
break;
/*
case CREATE_CLUSTER:
//TODO implementation
break;
case DELETE_CLUSTER:
//TODO implementation
break;
*/
case LIST_PLANS:
result = pksListPlans(ctx);
break;
default:
result.fail(new IllegalArgumentException("unsupported operation"));
}
return result;
}

private DeferredResult<Void> pksCreateUser(RequestContext ctx) {
String cluster = PropertyUtils
.getPropertyString(ctx.customProperties, CLUSTER_NAME_PROP_NAME)
.getPropertyString(ctx.request.customProperties, CLUSTER_NAME_PROP_NAME)
.orElse(null);

if (cluster == null) {
Expand Down Expand Up @@ -245,18 +262,32 @@ private DeferredResult<Void> pksGetCluster(RequestContext ctx) {
.thenAccept(pksCluster -> ctx.operation.setBodyNoCloning(pksCluster).complete());
}

private DeferredResult<PKSEndpointService.Endpoint> getEndpoint(URI uri) {
Operation op = Operation.createGet(this, uri.getPath());
return sendWithDeferredResult(op, PKSEndpointService.Endpoint.class)
private DeferredResult<Void> pksListPlans(RequestContext ctx) {
return getPKSContext(ctx.endpoint)
.thenCompose(pksContext -> getClient().getPlans(pksContext))
.thenAccept(pksPlans -> ctx.operation.setBodyNoCloning(pksPlans).complete());
}

private DeferredResult<Endpoint> getEndpoint(RequestContext ctx) {
if (ctx.request.customProperties.containsKey(VALIDATE_CONNECTION)) {
return buildFakeEndpoint(ctx);
}

String path = ctx.request.resourceReference.getPath();
Operation op = Operation.createGet(this, path);
return sendWithDeferredResult(op, Endpoint.class)
.exceptionally(ex -> {
throw DeferredUtils.logErrorAndThrow(ex,
e -> String.format("Unable to get PKS endpoint state %s, reason: %s",
uri.getPath(), e.getMessage()),
path, e.getMessage()),
getClass());
});
}

private DeferredResult<AuthCredentialsServiceState> getCredentials(String selfLink) {
if (selfLink == null || selfLink.isEmpty()) {
return DeferredResult.completed(null);
}
Operation op = Operation.createGet(this, selfLink);
return sendWithDeferredResult(op, AuthCredentialsServiceState.class)
.exceptionally(ex -> {
Expand All @@ -267,7 +298,7 @@ private DeferredResult<AuthCredentialsServiceState> getCredentials(String selfLi
});
}

private DeferredResult<PKSContext> getPKSContext(PKSEndpointService.Endpoint endpoint) {
private DeferredResult<PKSContext> getPKSContext(Endpoint endpoint) {
DeferredResult<PKSContext> result = new DeferredResult<>();
new RetriableTaskBuilder<PKSContext>("get-token-from-" + endpoint.uaaEndpoint)
.withMaximumRetries(1)
Expand All @@ -290,21 +321,26 @@ private DeferredResult<PKSContext> getPKSContext(PKSEndpointService.Endpoint end
}

private Function<RetriableTask<PKSContext>, DeferredResult<PKSContext>> retriableLoginTask(
PKSEndpointService.Endpoint endpoint) {
Endpoint endpoint) {

return (task) -> {
DeferredResult<PKSContext> result = new DeferredResult<>();
try {
result.complete(pksContextCache.get(endpoint.documentSelfLink,
() -> createNewPKSContext(endpoint)));
} catch (ExecutionException e) {
if (endpoint.customProperties != null
&& endpoint.customProperties.get(VALIDATE_CONNECTION) != null) {
result.complete(createNewPKSContext(endpoint));
} else {
result.complete(pksContextCache.get(endpoint.documentSelfLink,
() -> createNewPKSContext(endpoint)));
}
} catch (Exception e) {
result.fail(e);
}
return result;
};
}

private PKSContext createNewPKSContext(PKSEndpointService.Endpoint endpoint)
private PKSContext createNewPKSContext(Endpoint endpoint)
throws ExecutionException, InterruptedException {
return getCredentials(endpoint.authCredentialsLink)
.thenCompose(authCredentials -> login(endpoint, authCredentials))
Expand All @@ -316,14 +352,19 @@ private PKSContext createNewPKSContext(PKSEndpointService.Endpoint endpoint)
.get();
}

private DeferredResult<PKSContext> login(PKSEndpointService.Endpoint endpoint,
/**
* Login in PKS and returns PKS context instance with token.
*/
private DeferredResult<PKSContext> login(Endpoint endpoint,
AuthCredentialsServiceState authCredentials) {
if (authCredentials == null) {
return DeferredResult.completed(PKSContext.create(endpoint, null));
}
AuthCredentialsType authCredentialsType = AuthCredentialsType.valueOf(authCredentials.type);
if (AuthCredentialsType.Password == authCredentialsType) {
String username = authCredentials.userEmail;
String password = EncryptionUtils.decrypt(authCredentials.privateKey);

//TODO run this in separate thread
return getClient()
.login(endpoint.uaaEndpoint, username, password)
.thenApply(uaaTokenResponse -> PKSContext.create(endpoint, uaaTokenResponse));
Expand All @@ -333,6 +374,35 @@ private DeferredResult<PKSContext> login(PKSEndpointService.Endpoint endpoint,
+ " is not supported");
}

private AdapterRequest getRequest(Operation op) {
AdapterRequest request = op.getBody(AdapterRequest.class);
request.validate();
if (request.customProperties == null) {
request.customProperties = new HashMap<>(2);
}
return request;
}

/**
* Construct fake {@link Endpoint} state used only to validate connection.
*/
private DeferredResult<Endpoint> buildFakeEndpoint(RequestContext ctx) {
Endpoint e = new Endpoint();
e.customProperties = new HashMap<>(2);
e.customProperties.put(VALIDATE_CONNECTION, "true");
e.name = "test-connection";
e.uaaEndpoint = ctx.request.customProperties.get(Endpoint.FIELD_NAME_UAA_ENDPOINT);
e.apiEndpoint = ctx.request.customProperties.get(Endpoint.FIELD_NAME_API_ENDPOINT);
e.authCredentialsLink = ctx.request.customProperties.get(PKSConstants.CREDENTIALS_LINK);
ctx.endpoint = e;
DeferredResult<Endpoint> result = new DeferredResult<>();
result.complete(e);
return result;
}

/**
* Expire cached tokens.
*/
private void expireCachedTokens() {
ConcurrentMap<String, PKSContext> map = pksContextCache.asMap();
long currentTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package com.vmware.admiral.adapter.pks.service;

import static com.vmware.admiral.adapter.pks.service.PKSAdapterService.CLUSTER_NAME_PROP_NAME;
import static com.vmware.admiral.adapter.pks.PKSConstants.CLUSTER_NAME_PROP_NAME;
import static com.vmware.admiral.common.util.OperationUtil.PROJECT_ADMIRAL_HEADER;
import static com.vmware.admiral.compute.ComputeConstants.HOST_AUTH_CREDENTIALS_PROP_NAME;
import static com.vmware.admiral.compute.ContainerHostService.CONTAINER_HOST_TYPE_PROP_NAME;
Expand Down Expand Up @@ -115,7 +115,7 @@ public void handlePost(Operation op) {

private void handleAddRequest(Operation op, AddClusterRequest clusterRequest) {
AdapterRequest adapterRequest = new AdapterRequest();
adapterRequest.operationTypeId = PKSOperationType.CREATE_USER.toString();
adapterRequest.operationTypeId = PKSOperationType.CREATE_USER.id;
adapterRequest.serviceTaskCallback = ServiceTaskCallback.createEmpty();
adapterRequest.resourceReference = UriUtils.buildUri(getHost(),
clusterRequest.endpointLink);
Expand All @@ -126,7 +126,7 @@ private void handleAddRequest(Operation op, AddClusterRequest clusterRequest) {
.setBodyNoCloning(adapterRequest)
.setCompletion((o, ex) -> {
if (ex != null) {
logSevere("Adapter request for listing PKS clusters failed. Error: %s",
logSevere("Adapter request for add PKS cluster failed. Error: %s",
Utils.toString(ex));
op.fail(ex);
} else {
Expand Down Expand Up @@ -219,4 +219,5 @@ private void setProjectLinkAsTenantLink(Operation op, AddClusterRequest request)
request.tenantLinks.add(projectLink);
}
}

}
Loading

0 comments on commit 6ee5c8e

Please sign in to comment.