Skip to content

Commit

Permalink
add support for loading sink/source secrets in inherited fields (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored Jun 2, 2019
1 parent 2e5bc20 commit 4faccf1
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

Expand All @@ -40,14 +43,25 @@ public static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz, Sin
return loadWithSecrets(map, clazz, secretName -> sinkContext.getSecret(secretName));
}


public static List<Field> getAllFields(Class<?> type) {
List<Field> fields = new LinkedList<>();
fields.addAll(Arrays.asList(type.getDeclaredFields()));

if (type.getSuperclass() != null) {
fields.addAll(getAllFields(type.getSuperclass()));
}

return fields;
}

private static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz, Function<String, String> secretsGetter) {
Map<String, Object> configs = new HashMap<>(map);

for (Field field : clazz.getDeclaredFields()) {
for (Field field : getAllFields(clazz)) {
field.setAccessible(true);
for (Annotation annotation : field.getAnnotations()) {
if (annotation.annotationType().equals(FieldDoc.class)) {

if (((FieldDoc) annotation).sensitive()) {
String secret = null;
try {
Expand All @@ -56,7 +70,6 @@ private static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz, Fu
log.warn("Failed to read secret {}", field.getName(), e);
break;
}

if (secret != null) {
configs.put(field.getName(), secret);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.io.common;

import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
Expand All @@ -44,15 +45,15 @@ static class TestConfig {
sensitive = true,
help = "password"
)
private String password;
protected String password;

@FieldDoc(
required = true,
defaultValue = "",
sensitive = false,
help = ""
)
private String notSensitive;
protected String notSensitive;

/**
* Non-string secrets are not supported at this moment
Expand All @@ -63,14 +64,38 @@ static class TestConfig {
sensitive = true,
help = ""
)
private long sensitiveLong;
protected long sensitiveLong;
}

@Data
static class DerivedConfig extends TestConfig {
@FieldDoc(
required = true,
defaultValue = "",
sensitive = true,
help = ""
)
protected String moreSensitiveStuff;
}

@Data
static class DerivedDerivedConfig extends DerivedConfig {
@FieldDoc(
required = true,
defaultValue = "",
sensitive = true,
help = ""
)
protected String derivedDerivedSensitive;
}

static class TestSourceContext implements SourceContext {

static Map<String, String> secretsMap = new HashMap<>();
static {
secretsMap.put("password", "my-source-password");
secretsMap.put("moreSensitiveStuff", "more-sensitive-stuff");
secretsMap.put("derivedDerivedSensitive", "derived-derived-sensitive");
}

@Override
Expand Down Expand Up @@ -159,7 +184,6 @@ public CompletableFuture<ByteBuffer> getStateAsync(String key) {

@Test
public void testSourceLoadWithSecrets() {

Map<String, Object> configMap = new HashMap<>();
configMap.put("notSensitive", "foo");
TestConfig testConfig = IOConfigUtils.loadWithSecrets(configMap, TestConfig.class, new TestSourceContext());
Expand All @@ -177,12 +201,38 @@ public void testSourceLoadWithSecrets() {
Assert.assertEquals(testConfig.notSensitive, "foo");
Assert.assertEquals(testConfig.password, "my-source-password");
Assert.assertEquals(testConfig.sensitiveLong, 5L);

// test derived classes
configMap = new HashMap<>();
configMap.put("notSensitive", "foo");
configMap.put("sensitiveLong", 5L);

DerivedConfig derivedConfig = IOConfigUtils.loadWithSecrets(configMap, DerivedConfig.class, new TestSourceContext());

Assert.assertEquals(derivedConfig.notSensitive, "foo");
Assert.assertEquals(derivedConfig.password, "my-source-password");
Assert.assertEquals(derivedConfig.sensitiveLong, 5L);
Assert.assertEquals(derivedConfig.moreSensitiveStuff, "more-sensitive-stuff");

configMap = new HashMap<>();
configMap.put("notSensitive", "foo");
configMap.put("sensitiveLong", 5L);

DerivedDerivedConfig derivedDerivedConfig = IOConfigUtils.loadWithSecrets(configMap, DerivedDerivedConfig.class, new TestSourceContext());

Assert.assertEquals(derivedDerivedConfig.notSensitive, "foo");
Assert.assertEquals(derivedDerivedConfig.password, "my-source-password");
Assert.assertEquals(derivedDerivedConfig.sensitiveLong, 5L);
Assert.assertEquals(derivedDerivedConfig.moreSensitiveStuff, "more-sensitive-stuff");
Assert.assertEquals(derivedDerivedConfig.derivedDerivedSensitive, "derived-derived-sensitive");
}

static class TestSinkContext implements SinkContext {
static Map<String, String> secretsMap = new HashMap<>();
static {
secretsMap.put("password", "my-sink-password");
secretsMap.put("moreSensitiveStuff", "more-sensitive-stuff");
secretsMap.put("derivedDerivedSensitive", "derived-derived-sensitive");
}

@Override
Expand Down Expand Up @@ -290,5 +340,29 @@ public void testSinkLoadWithSecrets() {
Assert.assertEquals(testConfig.notSensitive, "foo");
Assert.assertEquals(testConfig.password, "my-sink-password");
Assert.assertEquals(testConfig.sensitiveLong, 5L);

// test derived classes
configMap = new HashMap<>();
configMap.put("notSensitive", "foo");
configMap.put("sensitiveLong", 5L);

DerivedConfig derivedConfig = IOConfigUtils.loadWithSecrets(configMap, DerivedConfig.class, new TestSinkContext());

Assert.assertEquals(derivedConfig.notSensitive, "foo");
Assert.assertEquals(derivedConfig.password, "my-sink-password");
Assert.assertEquals(derivedConfig.sensitiveLong, 5L);
Assert.assertEquals(derivedConfig.moreSensitiveStuff, "more-sensitive-stuff");

configMap = new HashMap<>();
configMap.put("notSensitive", "foo");
configMap.put("sensitiveLong", 5L);

DerivedDerivedConfig derivedDerivedConfig = IOConfigUtils.loadWithSecrets(configMap, DerivedDerivedConfig.class, new TestSinkContext());

Assert.assertEquals(derivedDerivedConfig.notSensitive, "foo");
Assert.assertEquals(derivedDerivedConfig.password, "my-sink-password");
Assert.assertEquals(derivedDerivedConfig.sensitiveLong, 5L);
Assert.assertEquals(derivedDerivedConfig.moreSensitiveStuff, "more-sensitive-stuff");
Assert.assertEquals(derivedDerivedConfig.derivedDerivedSensitive, "derived-derived-sensitive");
}
}

0 comments on commit 4faccf1

Please sign in to comment.