Skip to content

Commit

Permalink
Secrets Frontend (apache#2853)
Browse files Browse the repository at this point in the history
* Secrets Frontend

* Do not expose secrets in cli
  • Loading branch information
srkukarni authored Oct 26, 2018
1 parent d79499d commit 82b267b
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public enum Runtime {
private ProcessingGuarantees processingGuarantees;
private boolean retainOrdering;
private Map<String, Object> userConfig;
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
// encapsulates how the secret is fetched by the underlying
// secrets provider
private Map<String, Object> secrets;
private Runtime runtime;
private boolean autoAck;
private int maxMessageRetries = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public class SinkConfig {
private Map<String, ConsumerConfig> inputSpecs = new TreeMap<>();

private Map<String, Object> configs;
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
// encapsulates how the secret is fetched by the underlying
// secrets provider
private Map<String, Object> secrets;
private int parallelism = 1;
private FunctionConfig.ProcessingGuarantees processingGuarantees;
private boolean retainOrdering;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public class SourceConfig {
private String schemaType;

private Map<String, Object> configs;
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
// encapsulates how the secret is fetched by the underlying
// secrets provider
private Map<String, Object> secrets;
private int parallelism = 1;
private FunctionConfig.ProcessingGuarantees processingGuarantees;
private Resources resources;
Expand Down
76 changes: 42 additions & 34 deletions pulsar-functions/instance/src/main/python/Function_pb2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
#

# Generated by the protocol buffer compiler. DO NOT EDIT!
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: Function.proto

Expand All @@ -39,7 +40,7 @@
name='Function.proto',
package='proto',
syntax='proto3',
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xd4\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\x12\n\nuserConfig\x18\x07 \x01(\t\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61utoAck\x18\t \x01(\x08\x12\x13\n\x0bparallelism\x18\n \x01(\x05\x12!\n\x06source\x18\x0b \x01(\x0b\x32\x11.proto.SourceSpec\x12\x1d\n\x04sink\x18\x0c \x01(\x0b\x32\x0f.proto.SinkSpec\x12#\n\tresources\x18\r \x01(\x0b\x32\x10.proto.Resources\x12\x12\n\npackageUrl\x18\x0e \x01(\t\x12)\n\x0cretryDetails\x18\x0f \x01(\x0b\x32\x13.proto.RetryDetails\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\"R\n\x0c\x43onsumerSpec\x12\x12\n\nschemaType\x18\x01 \x01(\t\x12\x16\n\x0eserdeClassName\x18\x02 \x01(\t\x12\x16\n\x0eisRegexPattern\x18\x03 \x01(\x08\"\xe4\x03\n\nSourceSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\x31\n\x10subscriptionType\x18\x03 \x01(\x0e\x32\x17.proto.SubscriptionType\x12Q\n\x16topicsToSerDeClassName\x18\x04 \x03(\x0b\x32-.proto.SourceSpec.TopicsToSerDeClassNameEntryB\x02\x18\x01\x12\x35\n\ninputSpecs\x18\n \x03(\x0b\x32!.proto.SourceSpec.InputSpecsEntry\x12\x11\n\ttimeoutMs\x18\x06 \x01(\x04\x12\x19\n\rtopicsPattern\x18\x07 \x01(\tB\x02\x18\x01\x12\x0f\n\x07\x62uiltin\x18\x08 \x01(\t\x12\x18\n\x10subscriptionName\x18\t \x01(\t\x1a=\n\x1bTopicsToSerDeClassNameEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x46\n\x0fInputSpecsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\"\n\x05value\x18\x02 \x01(\x0b\x32\x13.proto.ConsumerSpec:\x02\x38\x01\"\x91\x01\n\x08SinkSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\r\n\x05topic\x18\x03 \x01(\t\x12\x16\n\x0eserDeClassName\x18\x04 \x01(\t\x12\x0f\n\x07\x62uiltin\x18\x06 \x01(\t\x12\x12\n\nschemaType\x18\x07 \x01(\t\"H\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\x12\x18\n\x10originalFileName\x18\x02 \x01(\t\"\xa1\x01\n\x10\x46unctionMetaData\x12/\n\x0f\x66unctionDetails\x18\x01 \x01(\x0b\x32\x16.proto.FunctionDetails\x12\x37\n\x0fpackageLocation\x18\x02 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04 \x01(\x04\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01 \x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02 \x01(\t*O\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02*,\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\x0c\n\x08\x46\x41ILOVER\x10\x01\x42-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe8\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\x12\n\nuserConfig\x18\x07 \x01(\t\x12\x12\n\nsecretsMap\x18\x10 \x01(\t\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61utoAck\x18\t \x01(\x08\x12\x13\n\x0bparallelism\x18\n \x01(\x05\x12!\n\x06source\x18\x0b \x01(\x0b\x32\x11.proto.SourceSpec\x12\x1d\n\x04sink\x18\x0c \x01(\x0b\x32\x0f.proto.SinkSpec\x12#\n\tresources\x18\r \x01(\x0b\x32\x10.proto.Resources\x12\x12\n\npackageUrl\x18\x0e \x01(\t\x12)\n\x0cretryDetails\x18\x0f \x01(\x0b\x32\x13.proto.RetryDetails\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\"R\n\x0c\x43onsumerSpec\x12\x12\n\nschemaType\x18\x01 \x01(\t\x12\x16\n\x0eserdeClassName\x18\x02 \x01(\t\x12\x16\n\x0eisRegexPattern\x18\x03 \x01(\x08\"\xe4\x03\n\nSourceSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\x31\n\x10subscriptionType\x18\x03 \x01(\x0e\x32\x17.proto.SubscriptionType\x12Q\n\x16topicsToSerDeClassName\x18\x04 \x03(\x0b\x32-.proto.SourceSpec.TopicsToSerDeClassNameEntryB\x02\x18\x01\x12\x35\n\ninputSpecs\x18\n \x03(\x0b\x32!.proto.SourceSpec.InputSpecsEntry\x12\x11\n\ttimeoutMs\x18\x06 \x01(\x04\x12\x19\n\rtopicsPattern\x18\x07 \x01(\tB\x02\x18\x01\x12\x0f\n\x07\x62uiltin\x18\x08 \x01(\t\x12\x18\n\x10subscriptionName\x18\t \x01(\t\x1a=\n\x1bTopicsToSerDeClassNameEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x46\n\x0fInputSpecsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\"\n\x05value\x18\x02 \x01(\x0b\x32\x13.proto.ConsumerSpec:\x02\x38\x01\"\x91\x01\n\x08SinkSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\r\n\x05topic\x18\x03 \x01(\t\x12\x16\n\x0eserDeClassName\x18\x04 \x01(\t\x12\x0f\n\x07\x62uiltin\x18\x06 \x01(\t\x12\x12\n\nschemaType\x18\x07 \x01(\t\"H\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\x12\x18\n\x10originalFileName\x18\x02 \x01(\t\"\xa1\x01\n\x10\x46unctionMetaData\x12/\n\x0f\x66unctionDetails\x18\x01 \x01(\x0b\x32\x16.proto.FunctionDetails\x12\x37\n\x0fpackageLocation\x18\x02 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04 \x01(\x04\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01 \x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02 \x01(\t*O\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02*,\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\x0c\n\x08\x46\x41ILOVER\x10\x01\x42-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
)

_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
Expand All @@ -63,8 +64,8 @@
],
containing_type=None,
options=None,
serialized_start=1724,
serialized_end=1803,
serialized_start=1744,
serialized_end=1823,
)
_sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)

Expand All @@ -86,8 +87,8 @@
],
containing_type=None,
options=None,
serialized_start=1805,
serialized_end=1849,
serialized_start=1825,
serialized_end=1869,
)
_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)

Expand Down Expand Up @@ -116,8 +117,8 @@
],
containing_type=None,
options=None,
serialized_start=584,
serialized_end=615,
serialized_start=604,
serialized_end=635,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)

Expand Down Expand Up @@ -262,56 +263,63 @@
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='runtime', full_name='proto.FunctionDetails.runtime', index=7,
name='secretsMap', full_name='proto.FunctionDetails.secretsMap', index=7,
number=16, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='runtime', full_name='proto.FunctionDetails.runtime', index=8,
number=8, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='autoAck', full_name='proto.FunctionDetails.autoAck', index=8,
name='autoAck', full_name='proto.FunctionDetails.autoAck', index=9,
number=9, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='parallelism', full_name='proto.FunctionDetails.parallelism', index=9,
name='parallelism', full_name='proto.FunctionDetails.parallelism', index=10,
number=10, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='source', full_name='proto.FunctionDetails.source', index=10,
name='source', full_name='proto.FunctionDetails.source', index=11,
number=11, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='sink', full_name='proto.FunctionDetails.sink', index=11,
name='sink', full_name='proto.FunctionDetails.sink', index=12,
number=12, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='resources', full_name='proto.FunctionDetails.resources', index=12,
name='resources', full_name='proto.FunctionDetails.resources', index=13,
number=13, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='packageUrl', full_name='proto.FunctionDetails.packageUrl', index=13,
name='packageUrl', full_name='proto.FunctionDetails.packageUrl', index=14,
number=14, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='retryDetails', full_name='proto.FunctionDetails.retryDetails', index=14,
name='retryDetails', full_name='proto.FunctionDetails.retryDetails', index=15,
number=15, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
Expand All @@ -331,7 +339,7 @@
oneofs=[
],
serialized_start=147,
serialized_end=615,
serialized_end=635,
)


Expand Down Expand Up @@ -375,8 +383,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=617,
serialized_end=699,
serialized_start=637,
serialized_end=719,
)


Expand Down Expand Up @@ -413,8 +421,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=1053,
serialized_end=1114,
serialized_start=1073,
serialized_end=1134,
)

_SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
Expand Down Expand Up @@ -450,8 +458,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=1116,
serialized_end=1186,
serialized_start=1136,
serialized_end=1206,
)

_SOURCESPEC = _descriptor.Descriptor(
Expand Down Expand Up @@ -543,8 +551,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=702,
serialized_end=1186,
serialized_start=722,
serialized_end=1206,
)


Expand Down Expand Up @@ -616,8 +624,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=1189,
serialized_end=1334,
serialized_start=1209,
serialized_end=1354,
)


Expand Down Expand Up @@ -654,8 +662,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=1336,
serialized_end=1408,
serialized_start=1356,
serialized_end=1428,
)


Expand Down Expand Up @@ -706,8 +714,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=1411,
serialized_end=1572,
serialized_start=1431,
serialized_end=1592,
)


Expand Down Expand Up @@ -744,8 +752,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=1574,
serialized_end=1655,
serialized_start=1594,
serialized_end=1675,
)


Expand Down Expand Up @@ -782,8 +790,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=1657,
serialized_end=1722,
serialized_start=1677,
serialized_end=1742,
)

_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
Expand Down
1 change: 1 addition & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ message FunctionDetails {
string logTopic = 5;
ProcessingGuarantees processingGuarantees = 6;
string userConfig = 7;
string secretsMap = 16;
Runtime runtime = 8;
bool autoAck = 9;
int32 parallelism = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
}

if (functionConfig.getSecrets() != null && !functionConfig.getSecrets().isEmpty()) {
functionDetailsBuilder.setSecretsMap(new Gson().toJson(functionConfig.getSecrets()));
}

functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck());
functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
if (functionConfig.getResources() != null) {
Expand Down Expand Up @@ -275,6 +279,12 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails)
}
functionConfig.setUserConfig(userConfig);

if (!isEmpty(functionDetails.getSecretsMap())) {
Type type = new TypeToken<Map<String, Object>>() {}.getType();
Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type);
functionConfig.setSecrets(secretsMap);
}

if (functionDetails.hasResources()) {
Resources resources = new Resources();
resources.setCpu(functionDetails.getResources().getCpu());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader clas
if (sinkConfig.getConfigs() != null) {
sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs()));
}
if (sinkConfig.getSecrets() != null && !sinkConfig.getSecrets().isEmpty()) {
functionDetailsBuilder.setSecretsMap(new Gson().toJson(sinkConfig.getSecrets()));
}
if (typeArg != null) {
sinkSpecBuilder.setTypeClassName(typeArg);
}
Expand Down Expand Up @@ -228,6 +231,11 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
Type type = new TypeToken<Map<String, String>>() {}.getType();
sinkConfig.setConfigs(new Gson().fromJson(functionDetails.getSink().getConfigs(), type));
}
if (!isEmpty(functionDetails.getSecretsMap())) {
Type type = new TypeToken<Map<String, Object>>() {}.getType();
Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type);
sinkConfig.setSecrets(secretsMap);
}
if (functionDetails.hasResources()) {
Resources resources = new Resources();
resources.setCpu(functionDetails.getResources().getCpu());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader
sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs()));
}

if (sourceConfig.getSecrets() != null && !sourceConfig.getSecrets().isEmpty()) {
functionDetailsBuilder.setSecretsMap(new Gson().toJson(sourceConfig.getSecrets()));
}

if (typeArg != null) {
sourceSpecBuilder.setTypeClassName(typeArg);
}
Expand Down Expand Up @@ -156,6 +160,11 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
Type type = new TypeToken<Map<String, String>>() {}.getType();
sourceConfig.setConfigs(new Gson().fromJson(sourceSpec.getConfigs(), type));
}
if (!isEmpty(functionDetails.getSecretsMap())) {
Type type = new TypeToken<Map<String, Object>>() {}.getType();
Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type);
sourceConfig.setSecrets(secretsMap);
}
Function.SinkSpec sinkSpec = functionDetails.getSink();
sourceConfig.setTopicName(sinkSpec.getTopic());
if (!StringUtils.isEmpty(sinkSpec.getSchemaType())) {
Expand Down

0 comments on commit 82b267b

Please sign in to comment.