Skip to content
This repository has been archived by the owner on May 14, 2021. It is now read-only.

Commit

Permalink
Provenance reporting config can now be overridden using values set in…
Browse files Browse the repository at this point in the history
… the bootstrap.conf

This closes #166.

Signed-off-by: Aldrin Piri <[email protected]>
  • Loading branch information
GCHQ-NiFi authored and apiri committed Aug 19, 2019
1 parent 3528af2 commit 2d6cdca
Show file tree
Hide file tree
Showing 9 changed files with 490 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.apache.nifi.minifi.bootstrap.status.PeriodicStatusReporter;
import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
import org.apache.nifi.minifi.commons.schema.ProvenanceReportingSchema;
import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
import org.apache.nifi.minifi.commons.schema.SensitivePropsSchema;
import org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys;
Expand Down Expand Up @@ -83,6 +84,11 @@
import java.util.concurrent.locks.ReentrantLock;

import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.TIMEOUT_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.USE_COMPRESSION_KEY;

/**
* <p>
Expand Down Expand Up @@ -167,6 +173,27 @@ public class RunMiNiFi implements QueryableStatusAggregator, ConfigurationFileHo
SENSITIVE_PROPS_ALGORITHM_KEY,
SENSITIVE_PROPS_PROVIDER_KEY));

public static final String PROVENANCE_REPORTING_COMMENT_KEY = "nifi.minifi.provenance.reporting.comment";
public static final String PROVENANCE_REPORTING_SCHEDULING_STRATEGY_KEY = "nifi.minifi.provenance.reporting.scheduling.strategy";
public static final String PROVENANCE_REPORTING_SCHEDULING_PERIOD_KEY = "nifi.minifi.provenance.reporting.scheduling.period";
public static final String PROVENANCE_REPORTING_DESTINATION_URL_KEY = "nifi.minifi.provenance.reporting.destination.url";
public static final String PROVENANCE_REPORTING_INPUT_PORT_NAME_KEY = "nifi.minifi.provenance.reporting.input.port.name";
public static final String PROVENANCE_REPORTING_INSTANCE_URL_KEY = "nifi.minifi.provenance.reporting.instance.url";
public static final String PROVENANCE_REPORTING_COMPRESS_EVENTS_KEY = "nifi.minifi.provenance.reporting.compress.events";
public static final String PROVENANCE_REPORTING_BATCH_SIZE_KEY = "nifi.minifi.provenance.reporting.batch.size";
public static final String PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT_KEY = "nifi.minifi.provenance.reporting.communications.timeout";

public static final Set<String> BOOTSTRAP_PROVENANCE_REPORTING_KEYS = new HashSet<>(
Arrays.asList(PROVENANCE_REPORTING_COMMENT_KEY,
PROVENANCE_REPORTING_SCHEDULING_STRATEGY_KEY,
PROVENANCE_REPORTING_SCHEDULING_PERIOD_KEY,
PROVENANCE_REPORTING_DESTINATION_URL_KEY,
PROVENANCE_REPORTING_INPUT_PORT_NAME_KEY,
PROVENANCE_REPORTING_INSTANCE_URL_KEY,
PROVENANCE_REPORTING_COMPRESS_EVENTS_KEY,
PROVENANCE_REPORTING_BATCH_SIZE_KEY,
PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT_KEY
));

public static final Map<String, String> BOOTSTRAP_KEYS_TO_YML_KEYS;

Expand All @@ -187,6 +214,16 @@ public class RunMiNiFi implements QueryableStatusAggregator, ConfigurationFileHo
mutableMap.put(SENSITIVE_PROPS_ALGORITHM_KEY, SensitivePropsSchema.SENSITIVE_PROPS_ALGORITHM_KEY);
mutableMap.put(SENSITIVE_PROPS_PROVIDER_KEY, SensitivePropsSchema.SENSITIVE_PROPS_PROVIDER_KEY);

mutableMap.put(PROVENANCE_REPORTING_COMMENT_KEY, COMMENT_KEY);
mutableMap.put(PROVENANCE_REPORTING_SCHEDULING_STRATEGY_KEY, SCHEDULING_STRATEGY_KEY);
mutableMap.put(PROVENANCE_REPORTING_SCHEDULING_PERIOD_KEY, SCHEDULING_PERIOD_KEY);
mutableMap.put(PROVENANCE_REPORTING_DESTINATION_URL_KEY, ProvenanceReportingSchema.DESTINATION_URL_KEY);
mutableMap.put(PROVENANCE_REPORTING_INPUT_PORT_NAME_KEY, ProvenanceReportingSchema.PORT_NAME_KEY);
mutableMap.put(PROVENANCE_REPORTING_INSTANCE_URL_KEY, ProvenanceReportingSchema.ORIGINATING_URL_KEY);
mutableMap.put(PROVENANCE_REPORTING_COMPRESS_EVENTS_KEY, USE_COMPRESSION_KEY);
mutableMap.put(PROVENANCE_REPORTING_BATCH_SIZE_KEY, ProvenanceReportingSchema.BATCH_SIZE_KEY);
mutableMap.put(PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT_KEY, TIMEOUT_KEY);

BOOTSTRAP_KEYS_TO_YML_KEYS = Collections.unmodifiableMap(mutableMap);
}

Expand Down Expand Up @@ -1765,7 +1802,8 @@ private ByteBuffer performTransformation(InputStream configIs, String configDest
ConfigTransformer.transformConfigFile(
teeInputStream,
configDestinationPath,
buildSecurityPropertiesFromBootstrap(getBootstrapProperties()).orElse(null)
buildSecurityPropertiesFromBootstrap(getBootstrapProperties()).orElse(null),
buildProvenanceReportingPropertiesFromBootstrap(getBootstrapProperties()).orElse(null)
);

return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
Expand All @@ -1776,7 +1814,7 @@ private ByteBuffer performTransformation(InputStream configIs, String configDest
}
}

// TODO extract this to separate class BootstrapTransformer, and make private
// TODO extract this and buildProvenanceReportingPropertiesFromBootstrap to separate class BootstrapTransformer, and make private
public Optional<SecurityPropertiesSchema> buildSecurityPropertiesFromBootstrap(final Properties bootstrapProperties) {

Optional<SecurityPropertiesSchema> securityPropsOptional = Optional.empty();
Expand Down Expand Up @@ -1808,6 +1846,26 @@ public Optional<SecurityPropertiesSchema> buildSecurityPropertiesFromBootstrap(f
return securityPropsOptional;
}

public Optional<ProvenanceReportingSchema> buildProvenanceReportingPropertiesFromBootstrap(final Properties bootstrapProperties) {

Optional<ProvenanceReportingSchema> provenanceReportingPropsOptional = Optional.empty();

final Map<String, Object> provenanceReportingProperties = new HashMap<>();

BOOTSTRAP_PROVENANCE_REPORTING_KEYS.stream()
.filter(key -> StringUtils.isNotBlank(bootstrapProperties.getProperty(key)))
.forEach(key ->
provenanceReportingProperties.put(BOOTSTRAP_KEYS_TO_YML_KEYS.get(key), bootstrapProperties.getProperty(key))
);

if (!provenanceReportingProperties.isEmpty()) {
final ProvenanceReportingSchema provenanceReportingSchema = new ProvenanceReportingSchema(provenanceReportingProperties);
provenanceReportingPropsOptional = Optional.of(provenanceReportingSchema);
}

return provenanceReportingPropsOptional;
}

private static class Status {

private final Integer port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,36 @@ public static void transformConfigFile(String sourceFile, String destPath, Secur
final File ymlConfigFile = new File(sourceFile);
final InputStream ios = new FileInputStream(ymlConfigFile);

transformConfigFile(ios, destPath, securityProperties);
transformConfigFile(ios, destPath, securityProperties, null);
}

public static void transformConfigFile(String sourceFile, String destPath, SecurityPropertiesSchema securityProperties, ProvenanceReportingSchema provenanceReportingProperties) throws Exception {
final File ymlConfigFile = new File(sourceFile);
final InputStream ios = new FileInputStream(ymlConfigFile);

transformConfigFile(ios, destPath, securityProperties, provenanceReportingProperties);
}

public static void transformConfigFile(InputStream sourceStream, String destPath) throws Exception {
transformConfigFile(sourceStream, destPath, null);
transformConfigFile(sourceStream, destPath, null, null);
}


public static void transformConfigFile(InputStream sourceStream, String destPath, SecurityPropertiesSchema securityProperties) throws Exception {
public static void transformConfigFile(
InputStream sourceStream,
String destPath,
SecurityPropertiesSchema securityProperties,
ProvenanceReportingSchema provenanceReportingProperties) throws Exception {
ConvertableSchema<ConfigSchema> convertableSchema = throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(sourceStream));
ConfigSchema configSchema = throwIfInvalid(convertableSchema.convert());

// See if we are providing defined properties from the filesystem configurations and use those as the definitive values
if (securityProperties != null) {
configSchema.setSecurityProperties(securityProperties);
}
if (provenanceReportingProperties != null) {
configSchema.setProvenanceReportingProperties(provenanceReportingProperties);
}

// Create nifi.properties and flow.xml.gz in memory
ByteArrayOutputStream nifiPropertiesOutputStream = new ByteArrayOutputStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.nifi.minifi.bootstrap;

import org.apache.nifi.minifi.commons.schema.ProvenanceReportingSchema;
import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
import org.apache.nifi.minifi.commons.schema.SensitivePropsSchema;
import org.junit.Assert;
Expand Down Expand Up @@ -94,6 +95,32 @@ public void buildSecurityPropertiesDefinedButInvalid() throws Exception {

}

@Test
public void buildProvenanceReportingNotDefined() throws Exception {
final RunMiNiFi testMiNiFi = new RunMiNiFi(null);
final Properties bootstrapProperties = getTestBootstrapProperties("bootstrap-provenance-reporting/bootstrap.conf.default");
final Optional<ProvenanceReportingSchema> provenanceReportingPropsOptional = testMiNiFi.buildProvenanceReportingPropertiesFromBootstrap(bootstrapProperties);
Assert.assertTrue(!provenanceReportingPropsOptional.isPresent());
}

@Test
public void buildProvenanceReportingDefined() throws Exception {
final RunMiNiFi testMiNiFi = new RunMiNiFi(null);
final Properties bootstrapProperties = getTestBootstrapProperties("bootstrap-provenance-reporting/bootstrap.conf.configured");
final Optional<ProvenanceReportingSchema> provenanceReportingPropsOptional = testMiNiFi.buildProvenanceReportingPropertiesFromBootstrap(bootstrapProperties);
Assert.assertTrue(provenanceReportingPropsOptional.isPresent());

final ProvenanceReportingSchema provenanceReportingSchema = provenanceReportingPropsOptional.get();
Assert.assertEquals("This is a comment!", provenanceReportingSchema.getComment());
Assert.assertEquals("TIMER_DRIVEN", provenanceReportingSchema.getSchedulingStrategy());
Assert.assertEquals("15 secs", provenanceReportingSchema.getSchedulingPeriod());
Assert.assertEquals("http://localhost:8080/", provenanceReportingSchema.getDestinationUrl());
Assert.assertEquals("provenance", provenanceReportingSchema.getPortName());
Assert.assertEquals("http://${hostname(true)}:8081/nifi", provenanceReportingSchema.getOriginatingUrl());
Assert.assertEquals("10 secs", provenanceReportingSchema.getTimeout());
}


public static Properties getTestBootstrapProperties(final String fileName) throws IOException {
final Properties bootstrapProperties = new Properties();
try (final InputStream fis = RunMiNiFiTest.class.getClassLoader().getResourceAsStream(fileName)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Java command to use when running MiNiFi
java=java

# Username to use when running MiNiFi. This value will be ignored on Windows.
run.as=

# Configure where MiNiFi's lib and conf directories live
# When running as a Windows service set full paths instead of relative paths
lib.dir=./lib
conf.dir=./conf

# How long to wait after telling MiNiFi to shutdown before explicitly killing the Process
graceful.shutdown.seconds=20

# The location for the configuration file
# When running as a Windows service use the full path to the file
nifi.minifi.config=./conf/config.yml

# Security Properties #
# These properties take precedence over any equivalent properties specified in config.yml #
nifi.minifi.security.keystore=/my/test/keystore.jks
nifi.minifi.security.keystoreType=JKS
nifi.minifi.security.keystorePasswd=mykeystorepassword
nifi.minifi.security.keyPasswd=mykeypassword
nifi.minifi.security.truststore=/my/test/truststore.jks
nifi.minifi.security.truststoreType=JKS
nifi.minifi.security.truststorePasswd=mytruststorepassword
nifi.minifi.security.ssl.protocol=TLS

nifi.minifi.sensitive.props.key=sensitivepropskey
nifi.minifi.sensitive.props.algorithm=algo
nifi.minifi.sensitive.props.provider=BC

# Provenance Reporting Properties #
# These properties take precedence over any equivalent properties specified in the config.yml #
nifi.minifi.provenance.reporting.comment=This is a comment!
nifi.minifi.provenance.reporting.scheduling.strategy=TIMER_DRIVEN
nifi.minifi.provenance.reporting.scheduling.period=15 secs
nifi.minifi.provenance.reporting.destination.url=http://localhost:8080/
nifi.minifi.provenance.reporting.input.port.name=provenance
nifi.minifi.provenance.reporting.instance.url=http://${hostname(true)}:8081/nifi
nifi.minifi.provenance.reporting.batch.size=1000
nifi.minifi.provenance.reporting.communications.timeout=10 secs

# Notifiers to use for the associated agent, comma separated list of class names
#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor
#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.RestChangeIngestor
#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor

# File change notifier configuration

# Path of the file to monitor for changes. When these occur, the FileChangeNotifier, if configured, will begin the configuration reloading process
#nifi.minifi.notifier.ingestors.file.config.path=
# How frequently the file specified by 'nifi.minifi.notifier.file.config.path' should be evaluated for changes.
#nifi.minifi.notifier.ingestors.file.polling.period.seconds=5

# Rest change notifier configuration

# Port on which the Jetty server will bind to, keep commented for a random open port
#nifi.minifi.notifier.ingestors.receive.http.port=8338

#Pull HTTP change notifier configuration

# Hostname on which to pull configurations from
#nifi.minifi.notifier.ingestors.pull.http.hostname=localhost
# Port on which to pull configurations from
#nifi.minifi.notifier.ingestors.pull.http.port=4567
# Path to pull configurations from
#nifi.minifi.notifier.ingestors.pull.http.path=/c2/config
# Query string to pull configurations with
#nifi.minifi.notifier.ingestors.pull.http.query=class=raspi3
# Period on which to pull configurations from, defaults to 5 minutes if commented out
#nifi.minifi.notifier.ingestors.pull.http.period.ms=300000

# Periodic Status Reporters to use for the associated agent, comma separated list of class names
#nifi.minifi.status.reporter.components=org.apache.nifi.minifi.bootstrap.status.reporters.StatusLogger

# Periodic Status Logger configuration

# The FlowStatus query to submit to the MiNiFi instance
#nifi.minifi.status.reporter.log.query=instance:health,bulletins
# The log level at which the status will be logged
#nifi.minifi.status.reporter.log.level=INFO
# The period (in milliseconds) at which to log the status
#nifi.minifi.status.reporter.log.period=60000

# Disable JSR 199 so that we can use JSP's without running a JDK
java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true

# JVM memory settings
java.arg.2=-Xms256m
java.arg.3=-Xmx256m

# Enable Remote Debugging
#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000

java.arg.4=-Djava.net.preferIPv4Stack=true

# allowRestrictedHeaders is required for Cluster/Node communications to work properly
java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true
java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol

# Sets the provider of SecureRandom to /dev/urandom to prevent blocking on VMs
java.arg.7=-Djava.security.egd=file:/dev/urandom


# The G1GC is still considered experimental but has proven to be very advantageous in providing great
# performance without significant "stop-the-world" delays.
#java.arg.13=-XX:+UseG1GC

#Set headless mode by default
java.arg.14=-Djava.awt.headless=true
Loading

0 comments on commit 2d6cdca

Please sign in to comment.