Skip to content

Commit

Permalink
Merge pull request pentaho#6953 from bmorrise/PDI-18360
Browse files Browse the repository at this point in the history
[PDI-18360] Embed all needed run configurations in jobs
  • Loading branch information
NJtwentyone authored Oct 17, 2019
2 parents 3de34d8 + 99fb9df commit 370d8db
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2018-2019 by Hitachi Vantara : http://www.pentaho.com
*
* *******************************************************************************
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
Expand All @@ -24,15 +24,22 @@

package org.pentaho.di.engine.configuration.impl.extension;

import org.pentaho.di.base.AbstractMeta;
import org.pentaho.di.core.attributes.metastore.EmbeddedMetaStore;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.extension.ExtensionPoint;
import org.pentaho.di.core.extension.ExtensionPointInterface;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.util.StringUtil;
import org.pentaho.di.engine.configuration.api.RunConfiguration;
import org.pentaho.di.engine.configuration.impl.EmbeddedRunConfigurationManager;
import org.pentaho.di.engine.configuration.impl.RunConfigurationManager;
import org.pentaho.di.engine.configuration.impl.pentaho.DefaultRunConfigurationProvider;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.job.entry.JobEntryCopy;
import org.pentaho.di.job.entry.JobEntryRunConfigurableInterface;

import java.util.ArrayList;
import java.util.List;

/**
* Created by bmorrise on 5/3/17.
Expand All @@ -48,16 +55,49 @@ public RunConfigurationSaveExtensionPoint( RunConfigurationManager runConfigurat
}

@Override public void callExtensionPoint( LogChannelInterface logChannelInterface, Object o ) throws KettleException {
AbstractMeta abstractMeta = (AbstractMeta) ( (Object[]) o )[ 0 ];
String runConfiguration = (String) ( (Object[]) o )[ 1 ];
final EmbeddedMetaStore embeddedMetaStore = abstractMeta.getEmbeddedMetaStore();
JobMeta jobMeta = (JobMeta) ( (Object[]) o )[ 0 ];
final EmbeddedMetaStore embeddedMetaStore = jobMeta.getEmbeddedMetaStore();

RunConfigurationManager embeddedRunConfigurationManager =
EmbeddedRunConfigurationManager.build( embeddedMetaStore );
embeddedRunConfigurationManager.deleteAll();

RunConfiguration loadedRunConfiguration = runConfigurationManager.load( abstractMeta.environmentSubstitute( runConfiguration ) );
embeddedRunConfigurationManager.save( loadedRunConfiguration );
List<String> runConfigurationNames = new ArrayList<>();
boolean embedAll = false;
for ( JobEntryCopy jobEntryCopy : jobMeta.getJobCopies() ) {
if ( jobEntryCopy.getEntry() instanceof JobEntryRunConfigurableInterface ) {
String usedConfiguration = ( (JobEntryRunConfigurableInterface) jobEntryCopy.getEntry() ).getRunConfiguration();
embedAll = embedAll || StringUtil.isVariable( usedConfiguration );
if ( !runConfigurationNames.contains( usedConfiguration ) ) {
runConfigurationNames.add( usedConfiguration );
}
}
}

if ( embedAll ) {
embedAllRunConfigurations( embeddedRunConfigurationManager );
} else {
embedRunConfigurations( embeddedRunConfigurationManager, runConfigurationNames );
}
}

private void embedAllRunConfigurations( RunConfigurationManager embeddedRunConfigurationManager ) {
List<RunConfiguration> runConfigurations = runConfigurationManager.load();
for ( RunConfiguration loadedRunConfiguration : runConfigurations ) {
if ( !loadedRunConfiguration.isReadOnly() ) {
embeddedRunConfigurationManager.save( loadedRunConfiguration );
}
}
}

private void embedRunConfigurations( RunConfigurationManager embeddedRunConfigurationManager,
List<String> runConfigurationNames ) {
for ( String runConfigurationName : runConfigurationNames ) {
if ( !runConfigurationName.equals( DefaultRunConfigurationProvider.DEFAULT_CONFIG_NAME ) ) {
RunConfiguration loadedRunConfiguration = runConfigurationManager.load( runConfigurationName );
embeddedRunConfigurationManager.save( loadedRunConfiguration );
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* *****************************************************************************
*
* Pentaho Data Integration
*
* Copyright (C) 2019 by Hitachi Vantara : http://www.pentaho.com
*
* *******************************************************************************
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* *****************************************************************************
*
*/

package org.pentaho.di.engine.configuration.impl.extension;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.pentaho.di.core.attributes.metastore.EmbeddedMetaStore;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.engine.configuration.api.RunConfiguration;
import org.pentaho.di.engine.configuration.impl.RunConfigurationManager;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.job.entries.job.JobEntryJob;
import org.pentaho.di.job.entries.trans.JobEntryTrans;
import org.pentaho.di.job.entry.JobEntryCopy;
import org.pentaho.di.job.entry.JobEntryInterface;
import org.pentaho.di.job.entry.JobEntryRunConfigurableInterface;

import java.util.ArrayList;
import java.util.List;

import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith( MockitoJUnitRunner.class )
public class RunConfigurationSaveExtensionPointTest {

private static final int JOB_ENTRY_COUNT = 10;
private static final String RUN_CONFIGURATION = "Run Configuration ";
public static final String RUN_CONFIG_VARIBLE = "${RUN_CONFIG_VARIBLE}";

@Mock private JobMeta jobMeta;
@Mock private LogChannelInterface log;
@Mock private EmbeddedMetaStore embeddedMetaStore;
@Mock private RunConfigurationManager runConfigurationManager;

@Before
public void setup() {
when( jobMeta.getEmbeddedMetaStore() ).thenReturn( embeddedMetaStore );
for ( int i = 0; i < JOB_ENTRY_COUNT; i++ ) {
String configurationName = RUN_CONFIGURATION + i;
RunConfiguration runConfiguration = mock( RunConfiguration.class );
when( runConfiguration.getName() ).thenReturn( configurationName );
when( runConfigurationManager.load( configurationName ) ).thenReturn( runConfiguration );
}
}

@Test
public void testCallExtensionPointMultipleConfigs() throws Exception {
List<JobEntryCopy> jobEntryCopies = new ArrayList<>();
for ( int i = 0; i < JOB_ENTRY_COUNT; i++ ) {
Class<? extends JobEntryRunConfigurableInterface> type =
i < JOB_ENTRY_COUNT / 2 ? JobEntryJob.class : JobEntryTrans.class;
String configurationName = RUN_CONFIGURATION + i;
jobEntryCopies.add( createCopy( type, configurationName ) );
}
when( jobMeta.getJobCopies() ).thenReturn( jobEntryCopies );

RunConfigurationSaveExtensionPoint runConfigurationSaveExtensionPoint =
new RunConfigurationSaveExtensionPoint( runConfigurationManager );
runConfigurationSaveExtensionPoint.callExtensionPoint( log, new Object[] { jobMeta } );

for ( int i = 0; i < JOB_ENTRY_COUNT; i++ ) {
String configurationName = RUN_CONFIGURATION + i;
verify( runConfigurationManager ).load( configurationName );
}
}

@Test
public void testCallExtensionPointOneConfig() throws Exception {
List<JobEntryCopy> jobEntryCopies = new ArrayList<>();
for ( int i = 0; i < JOB_ENTRY_COUNT; i++ ) {
Class<? extends JobEntryRunConfigurableInterface> type =
i < JOB_ENTRY_COUNT / 2 ? JobEntryJob.class : JobEntryTrans.class;
jobEntryCopies.add( createCopy( type, RUN_CONFIGURATION + "0" ) );
}
when( jobMeta.getJobCopies() ).thenReturn( jobEntryCopies );

RunConfigurationSaveExtensionPoint runConfigurationSaveExtensionPoint =
new RunConfigurationSaveExtensionPoint( runConfigurationManager );
runConfigurationSaveExtensionPoint.callExtensionPoint( log, new Object[] { jobMeta } );

verify( runConfigurationManager, times( 1 ) ).load( anyString() );
}

@Test
public void testCallExtensionPointAllConfigs() throws Exception {
List<JobEntryCopy> jobEntryCopies = new ArrayList<>();
jobEntryCopies.add( createCopy( JobEntryJob.class, RUN_CONFIGURATION + "0" ) );
jobEntryCopies.add( createCopy( JobEntryTrans.class, RUN_CONFIGURATION + "1" ) );
jobEntryCopies.add( createCopy( JobEntryJob.class, RUN_CONFIGURATION + "2" ) );
jobEntryCopies.add( createCopy( JobEntryTrans.class, RUN_CONFIG_VARIBLE ) );
when( jobMeta.getJobCopies() ).thenReturn( jobEntryCopies );

RunConfigurationSaveExtensionPoint runConfigurationSaveExtensionPoint =
new RunConfigurationSaveExtensionPoint( runConfigurationManager );
runConfigurationSaveExtensionPoint.callExtensionPoint( log, new Object[] { jobMeta } );

verify( runConfigurationManager ).load();
}

private JobEntryCopy createCopy( Class<? extends JobEntryRunConfigurableInterface> type,
String runConfigurationName ) {
JobEntryCopy jobEntryCopy = mock( JobEntryCopy.class );
JobEntryRunConfigurableInterface jobEntryInterface = mock( type );

when( jobEntryCopy.getEntry() ).thenReturn( (JobEntryInterface) jobEntryInterface );
when( jobEntryInterface.getRunConfiguration() ).thenReturn( runConfigurationName );

return jobEntryCopy;
}
}

0 comments on commit 370d8db

Please sign in to comment.