Skip to content

Commit

Permalink
Amend prepareIndexIfNeededThenExecute for token refresh (elastic#41697
Browse files Browse the repository at this point in the history
)

This fixes a low level bug that manifests, in certain circumstances, by the failure
of the refresh operation.

Version 7.1 added a new `superseded_by` field to the `.security` index mapping.
This field is used when indexing a refresh operation (a document update).
Because the document update was not guarded by the obligatory `prepareIndexIfNeededThenExecute` the refresh operation would fail if it were
the first operation when the cluster was upgraded from a version < 7.1 .
This failure was catched (and fails reliably) in the backport elastic#41673 .
  • Loading branch information
albertzaharovits authored May 1, 2019
1 parent 8e04f36 commit 2154a97
Showing 1 changed file with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,12 @@ private void getUserTokenFromId(String userTokenId, Version tokenVersion, Action
logger.warn("failed to get access token [{}] because index [{}] is not available", userTokenId, tokensIndex.aliasName());
listener.onResponse(null);
} else {
final GetRequest getRequest = client.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME,
getTokenDocumentId(userTokenId)).request();
final Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("decode token", userTokenId, ex));
tokensIndex.checkIndexVersionThenExecute(
ex -> listener.onFailure(traceLog("prepare tokens index [" + tokensIndex.aliasName() +"]", userTokenId, ex)),
() -> {
final GetRequest getRequest = client.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME,
getTokenDocumentId(userTokenId)).request();
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("decode token", userTokenId, ex));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
ActionListener.<GetResponse>wrap(response -> {
if (response.isExists()) {
Map<String, Object> accessTokenSource =
Expand Down Expand Up @@ -384,8 +383,8 @@ private void getUserTokenFromId(String userTokenId, Version tokenVersion, Action
logger.error(new ParameterizedMessage("failed to get access token [{}]", userTokenId), e);
listener.onFailure(e);
}
}), client::get);
});
}), client::get)
);
}
}

Expand Down Expand Up @@ -862,7 +861,9 @@ private void innerRefresh(String tokenDocId, Map<String, Object> source, long se
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setIfSeqNo(seqNo)
.setIfPrimaryTerm(primaryTerm);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest.request(),
refreshedTokenIndex.prepareIndexIfNeededThenExecute(
ex -> listener.onFailure(traceLog("prepare index [" + refreshedTokenIndex.aliasName() + "]", ex)),
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest.request(),
ActionListener.<UpdateResponse>wrap(updateResponse -> {
if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
logger.debug(() -> new ParameterizedMessage("updated the original token document to {}",
Expand Down Expand Up @@ -931,7 +932,7 @@ public void onFailure(Exception e) {
} else {
onFailure.accept(e);
}
}), client::update);
}), client::update));
}
}

Expand Down Expand Up @@ -1005,7 +1006,9 @@ private void getSupersedingTokenDocAsync(RefreshTokenStatus refreshTokenStatus,

private void getTokenDocAsync(String tokenDocId, SecurityIndexManager tokensIndex, ActionListener<GetResponse> listener) {
final GetRequest getRequest = client.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, tokenDocId).request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, listener, client::get);
tokensIndex.checkIndexVersionThenExecute(
ex -> listener.onFailure(traceLog("prepare tokens index [" + tokensIndex.aliasName() + "]", tokenDocId, ex)),
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, listener, client::get));
}

private Version getTokenVersionCompatibility() {
Expand Down Expand Up @@ -1392,10 +1395,10 @@ private void checkIfTokenIsValid(UserToken userToken, ActionListener<UserToken>
logger.warn("failed to validate access token because the index [" + tokensIndex.aliasName() + "] doesn't exist");
listener.onResponse(null);
} else {
final GetRequest getRequest = client
.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, getTokenDocumentId(userToken)).request();
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("check token state", userToken.getId(), ex));
tokensIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
final GetRequest getRequest = client
.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, getTokenDocumentId(userToken)).request();
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("check token state", userToken.getId(), ex));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
ActionListener.<GetResponse>wrap(response -> {
if (response.isExists()) {
Expand Down

0 comments on commit 2154a97

Please sign in to comment.