Skip to content

Commit

Permalink
SDC-10344: SDC Security Manager should not block access to the blobstore
Browse files Browse the repository at this point in the history
This turned out to be quite a big issue actually - the BlobStore has
been directly passed to stages that runs with different classloader -
thus there were moments when the BlobStore wasn't properly running with
it's own class loader which could lead to many random exceptions.

This patch introduces a Runtime wrapper like we have for everything else
that is crossing classloader boundaries.

Change-Id: I8606cbd20d23297194394b1b7f524e57d87a6b24
Reviewed-on: https://review.streamsets.net/17427
Tested-by: StreamSets CI <[email protected]>
Reviewed-by: Jeff Evans <[email protected]>
  • Loading branch information
jarcec committed Oct 18, 2018
1 parent 9916455 commit ab3d886
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2018 StreamSets Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.streamsets.datacollector.blobstore;

import com.streamsets.datacollector.util.LambdaUtil;
import com.streamsets.pipeline.api.BlobStore;
import com.streamsets.pipeline.api.StageException;

import java.util.Set;

/**
* This is a runtime wrapper for BlobStore that will properly switch class loaders.
*/
public class BlobStoreRuntime implements BlobStore {

private ClassLoader cl;
private BlobStore delegate;

public BlobStoreRuntime(ClassLoader cl, BlobStore delegate) {
this.cl = cl;
this.delegate = delegate;
}

@Override
public void store(String namespace, String id, long version, String content) throws StageException {
LambdaUtil.privilegedWithClassLoader(
cl,
StageException.class,
() -> { delegate.store(namespace, id, version, content); return null; }
);
}

@Override
public long latestVersion(String namespace, String id) throws StageException {
return LambdaUtil.privilegedWithClassLoader(
cl,
StageException.class,
() -> delegate.latestVersion(namespace, id)
);
}

@Override
public boolean exists(String namespace, String id) {
return LambdaUtil.privilegedWithClassLoader(
cl,
() -> delegate.exists(namespace, id)
);
}

@Override
public boolean exists(String namespace, String id, long version) {
return LambdaUtil.privilegedWithClassLoader(
cl,
() -> delegate.exists(namespace, id, version)
);
}

@Override
public Set<Long> allVersions(String namespace, String id) {
return LambdaUtil.privilegedWithClassLoader(
cl,
() -> delegate.allVersions(namespace, id)
);
}

@Override
public String retrieve(String namespace, String id, long version) throws StageException {
return LambdaUtil.privilegedWithClassLoader(
cl,
StageException.class,
() -> delegate.retrieve(namespace, id, version)
);
}

@Override
public VersionedContent retrieveLatest(String namespace, String id) throws StageException {
return LambdaUtil.privilegedWithClassLoader(
cl,
StageException.class,
() -> delegate.retrieveLatest(namespace, id)
);
}

@Override
public void delete(String namespace, String id, long version) throws StageException {
LambdaUtil.privilegedWithClassLoader(
cl,
StageException.class,
() -> { delegate.delete(namespace, id, version); return null; }
);
}

@Override
public void deleteAllVersions(String namespace, String id) throws StageException {
LambdaUtil.privilegedWithClassLoader(
cl,
StageException.class,
() -> { delegate.deleteAllVersions(namespace, id); return null; }
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -728,17 +728,18 @@ public InterceptorBean createInterceptor(
List<Issue> issues
) {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
InterceptorCreator.Context context = contextBuilder.buildFor(
definition.getLibraryDefinition().getName(),
definition.getKlass().getName(),
stageConfiguration,
stageDefinition,
interceptorType
);

try {
Thread.currentThread().setContextClassLoader(definition.getStageClassLoader());

InterceptorCreator creator = definition.getDefaultCreator().newInstance();
Interceptor interceptor = creator.create(contextBuilder.buildFor(
definition.getLibraryDefinition().getName(),
definition.getKlass().getName(),
stageConfiguration,
stageDefinition,
interceptorType
));
Interceptor interceptor = creator.create(context);

if(interceptor == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.streamsets.datacollector.blobstore.BlobStoreRuntime;
import com.streamsets.datacollector.email.EmailSender;
import com.streamsets.datacollector.lineage.LineagePublisherDelegator;
import com.streamsets.datacollector.main.RuntimeInfo;
Expand Down Expand Up @@ -122,8 +123,9 @@ public InterceptorContext(
long startTime,
LineagePublisherDelegator lineagePublisherDelegator
) {
this.containerClassLoader = Thread.currentThread().getContextClassLoader();
this.type = type;
this.blobStore = blobStore;
this.blobStore = new BlobStoreRuntime(containerClassLoader, blobStore);
this.configuration = configuration;
this.stageInstanceName = stageInstanceName;
this.metricName = metricName;
Expand All @@ -142,7 +144,6 @@ public InterceptorContext(
this.emailSender = emailSender;
this.startTime = startTime;
this.lineagePublisherDelegator = lineagePublisherDelegator;
this.containerClassLoader = Thread.currentThread().getContextClassLoader();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.streamsets.datacollector.runner;

import com.streamsets.datacollector.blobstore.BlobStoreRuntime;
import com.streamsets.datacollector.config.StageConfiguration;
import com.streamsets.datacollector.config.StageDefinition;
import com.streamsets.datacollector.event.dto.PipelineStartEvent;
Expand Down Expand Up @@ -180,7 +181,7 @@ public InterceptorCreator.Context buildFor(
}

return new ContextImpl(
blobStore,
new BlobStoreRuntime(Thread.currentThread().getContextClassLoader(), blobStore),
sdcConf,
stageConfiguration,
stageDefinition,
Expand Down

0 comments on commit ab3d886

Please sign in to comment.