Skip to content

Commit

Permalink
[BACKLOG-21225] Save and load encrypted info for streaming steps (pen…
Browse files Browse the repository at this point in the history
  • Loading branch information
DFieldFL authored and Kurtis Walker committed Feb 14, 2018
1 parent a81efb7 commit 71d5135
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.pentaho.di.core.CheckResultInterface;
import org.pentaho.di.core.RowMetaAndData;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.encryption.Encr;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.injection.Injection;
import org.pentaho.di.core.injection.bean.BeanInjectionInfo;
Expand Down Expand Up @@ -59,6 +60,8 @@ public abstract class BaseStreamStepMeta extends StepWithMappingMeta implements
public static final String TRANSFORMATION_PATH = "TRANSFORMATION_PATH";
public static final String NUM_MESSAGES = "NUM_MESSAGES";
public static final String DURATION = "DURATION";
public static final String PASSWORD = "PASSWORD";

@Injection ( name = TRANSFORMATION_PATH ) // pull this stuff up to common
protected String transformationPath = "";

Expand All @@ -75,15 +78,19 @@ public abstract class BaseStreamStepMeta extends StepWithMappingMeta implements
return properties.entrySet().stream()
.map( entry -> {
try {
Object obj = injector.getObject( this, entry.getKey() );
String key = entry.getKey();
Object obj = injector.getObject( this, key );
if ( entry.getValue().pathArraysCount == 1 ) {
@SuppressWarnings( "unchecked" )
List<String> list = (List<String>) obj;
return list.stream()
.map( v -> XMLHandler.addTagValue( entry.getKey(), v ) )
.map( v -> XMLHandler.addTagValue( key, v ) )
.collect( Collectors.joining() );
}
return XMLHandler.addTagValue( entry.getKey(), obj.toString() );
// Suffix PASSWORD to all elements that need to be encrypted/decrypted
String value = key.endsWith( PASSWORD )
? Encr.encryptPasswordIfNotUsingVariables( obj.toString() ) : obj.toString();
return XMLHandler.addTagValue( key, value );
} catch ( Exception e ) {
throw new RuntimeException( e );
}
Expand All @@ -109,7 +116,11 @@ private List<RowMetaAndData> nodesToRowMetaAndData( List<Node> nodes ) {
return nodes.stream()
.map( node -> {
RowMetaAndData rmad = new RowMetaAndData();
rmad.addValue( new ValueMetaString( node.getNodeName() ), node.getTextContent() );
String nodeName = node.getNodeName();
// Suffix PASSWORD to all elements that need to be encrypted/decrypted
Object nodeValue = nodeName.endsWith( PASSWORD )
? Encr.decryptPasswordOptionallyEncrypted( node.getTextContent() ) : node.getTextContent();
rmad.addValue( new ValueMetaString( nodeName ), nodeValue );
return rmad;
} )
.collect( Collectors.toList() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,40 @@
package org.pentaho.di.trans.streaming.common;

import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.pentaho.di.core.CheckResultInterface;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.ObjectLocationSpecificationMethod;
import org.pentaho.di.core.annotations.Step;
import org.pentaho.di.core.encryption.Encr;
import org.pentaho.di.core.encryption.TwoWayPasswordEncoderPluginType;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.injection.Injection;
import org.pentaho.di.core.injection.InjectionSupported;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.logging.LogChannelInterfaceFactory;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.core.variables.Variables;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.resource.ResourceEntry;
import org.pentaho.di.resource.ResourceReference;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.metastore.api.IMetaStore;
import org.w3c.dom.Node;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.junit.Assert.assertEquals;
Expand All @@ -60,9 +70,19 @@
public class BaseStreamStepMetaTest {

private BaseStreamStepMeta meta;
@Mock private IMetaStore metastore;
@Mock LogChannelInterfaceFactory logChannelFactory;
@Mock LogChannelInterface logChannel;

@BeforeClass
public static void setUpBeforeClass() throws KettleException {
PluginRegistry.addPluginType( TwoWayPasswordEncoderPluginType.getInstance() );
PluginRegistry.init( true );
String passwordEncoderPluginID =
Const.NVL( EnvUtil.getSystemProperty( Const.KETTLE_PASSWORD_ENCODER_PLUGIN ), "Kettle" );
Encr.init( passwordEncoderPluginID );
}

@Before
public void setUp() throws Exception {
meta = new StuffStreamMeta();
Expand All @@ -71,11 +91,15 @@ public void setUp() throws Exception {
when( logChannelFactory.create( any() ) ).thenReturn( logChannel );
}

@Step( id = "StuffStream", name = "Stuff Stream" )
@InjectionSupported( localizationPrefix = "stuff" )
private static class StuffStreamMeta extends BaseStreamStepMeta {
@Injection( name = "stuff" )
List<String> stuff = Arrays.asList( "one", "two" );

@Injection( name = "AUTH_PASSWORD" )
String password = "test";

@Override
public StepInterface getStep( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr,
TransMeta transMeta,
Expand Down Expand Up @@ -140,13 +164,14 @@ public void testCheckErrorsOnVariablesSubstituteError() {
}

@Test
public void testSaveXMLWithInjectionList() {
public void testSaveLoadXMLWithInjectionList() throws Exception {
meta.setBatchDuration( "1000" );
meta.setBatchSize( "100" );
meta.setTransformationPath( "aPath" );
String xml = meta.getXML();
assertEquals(
"<NUM_MESSAGES>100</NUM_MESSAGES>" + Const.CR
+ "<AUTH_PASSWORD>Encrypted 2be98afc86aa7f2e4cb79ce10ca97bcce</AUTH_PASSWORD>" + Const.CR
+ "<DURATION>1000</DURATION>" + Const.CR
+ "<stuff>one</stuff>" + Const.CR
+ "<stuff>two</stuff>" + Const.CR
Expand All @@ -160,13 +185,58 @@ public void testSaveDefaultEmptyConnection() {
String xml = meta.getXML();
assertEquals(
"<NUM_MESSAGES>1000</NUM_MESSAGES>" + Const.CR
+ "<AUTH_PASSWORD>Encrypted 2be98afc86aa7f2e4cb79ce10ca97bcce</AUTH_PASSWORD>" + Const.CR
+ "<DURATION>1000</DURATION>" + Const.CR
+ "<stuff>one</stuff>" + Const.CR
+ "<stuff>two</stuff>" + Const.CR
+ "<TRANSFORMATION_PATH/>" + Const.CR,
xml );
}

@Test
public void testLoadingXML() throws Exception {
String inputXml = " <step>\n"
+ " <name>Stuff Stream</name>\n"
+ " <type>StuffStream</type>\n"
+ " <description />\n"
+ " <distribute>Y</distribute>\n"
+ " <custom_distribution />\n"
+ " <copies>1</copies>\n"
+ " <partitioning>\n"
+ " <method>none</method>\n"
+ " <schema_name />\n"
+ " </partitioning>\n"
+ " <NUM_MESSAGES>5</NUM_MESSAGES>\n"
+ " <AUTH_PASSWORD>Encrypted 2be98afc86aa7f2e4cb79ce10ca97bcce</AUTH_PASSWORD>\n"
+ " <DURATION>60000</DURATION>\n"
+ " <stuff>one</stuff>\n"
+ " <stuff>two</stuff>\n"
+ " <TRANSFORMATION_PATH>write-to-log.ktr</TRANSFORMATION_PATH>\n"
+ " <cluster_schema />\n"
+ " <remotesteps>\n"
+ " <input>\n"
+ " </input>\n"
+ " <output>\n"
+ " </output>\n"
+ " </remotesteps>\n"
+ " <GUI>\n"
+ " <xloc>80</xloc>\n"
+ " <yloc>64</yloc>\n"
+ " <draw>Y</draw>\n"
+ " </GUI>\n"
+ " </step>\n";
Node node = XMLHandler.loadXMLString( inputXml ).getFirstChild();
StuffStreamMeta loadedMeta = new StuffStreamMeta();
loadedMeta.loadXML( node, Collections.emptyList(), metastore );
assertEquals( "5", loadedMeta.getBatchSize() );
assertEquals( "60000", loadedMeta.getBatchDuration() );
assertEquals( "test", loadedMeta.password );
assertEquals( 2, loadedMeta.stuff.size() );
assertTrue( loadedMeta.stuff.contains( "one" ) );
assertTrue( loadedMeta.stuff.contains( "two" ) );
assertEquals( "write-to-log.ktr", loadedMeta.getTransformationPath() );
}

@Test
public void testGetResourceDependencies() {
String stepId = "KafkConsumerInput";
Expand Down

0 comments on commit 71d5135

Please sign in to comment.