Skip to content

Commit

Permalink
[PDI-11926] Create repo connection for each remote job
Browse files Browse the repository at this point in the history
If a job run via /kettle/runJob exceeds HTTP timeout, repository connection may be invalidated.
This fix creates a dedicated repo connection for each job using SlaveServerConfig.

http://jira.pentaho.com/browse/PDI-11926
  • Loading branch information
Nick Hudak committed May 9, 2014
1 parent 3a1f57e commit 4da38e5
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 37 deletions.
60 changes: 31 additions & 29 deletions engine/src/org/pentaho/di/job/JobExecutionConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -478,41 +478,43 @@ public JobExecutionConfiguration( Node trecNode ) throws KettleException {
String repositoryName = XMLHandler.getTagValue( repNode, "name" );
String username = XMLHandler.getTagValue( repNode, "login" );
String password = Encr.decryptPassword( XMLHandler.getTagValue( repNode, "password" ) );
connectRepository( repositoryName, username, password );
}

// Verify that the repository exists on the slave server...
//
RepositoriesMeta repositoriesMeta = new RepositoriesMeta();
try {
repositoriesMeta.readData();
} catch ( Exception e ) {
throw new KettleException( "Unable to get a list of repositories to locate repository '"
+ repositoryName + "'" );
}
RepositoryMeta repositoryMeta = repositoriesMeta.findRepository( repositoryName );
if ( repositoryMeta == null ) {
log.logBasic( "I couldn't find the repository with name '" + repositoryName + "'" );
return;
}
}

// Repository rep = (Repository) PluginRegistry.getInstance().loadClass(RepositoryPluginType.class,
// repositoryMeta, PluginClassType.MainClassType);
public Repository connectRepository( String repositoryName, String username, String password ) throws KettleException {
// Verify that the repository exists on the slave server...
//
RepositoriesMeta repositoriesMeta = new RepositoriesMeta();
try {
repositoriesMeta.readData();
} catch ( Exception e ) {
throw new KettleException( "Unable to get a list of repositories to locate repository '" + repositoryName + "'" );
}
return connectRepository( repositoriesMeta, repositoryName, username, password );
}

Repository rep =
PluginRegistry.getInstance().loadClass( RepositoryPluginType.class, repositoryMeta, Repository.class );
rep.init( repositoryMeta );
public Repository connectRepository( RepositoriesMeta repositoriesMeta, String repositoryName, String username, String password ) throws KettleException {
RepositoryMeta repositoryMeta = repositoriesMeta.findRepository( repositoryName );
if ( repositoryMeta == null ) {
log.logBasic( "I couldn't find the repository with name '" + repositoryName + "'" );
return null;
}

try {
rep.connect( username, password );
} catch ( Exception e ) {
log.logBasic( "Unable to connect to the repository with name '" + repositoryName + "'" );
return;
}
Repository rep = PluginRegistry.getInstance().loadClass( RepositoryPluginType.class, repositoryMeta,
Repository.class );
rep.init( repositoryMeta );

// Confirmed access:
//
repository = rep;
try {
rep.connect( username, password );
log.logBasic( "Connected to " + repositoryName + " as " + username );
setRepository( rep );
return rep;
} catch ( Exception e ) {
log.logBasic( "Unable to connect to the repository with name '" + repositoryName + "'" );
return null;
}

}

public String[] getArgumentStrings() {
Expand Down
40 changes: 32 additions & 8 deletions engine/src/org/pentaho/di/www/RunJobServlet.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.pentaho.di.core.logging.SimpleLoggingObject;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobAdapter;
import org.pentaho.di.job.JobConfiguration;
import org.pentaho.di.job.JobExecutionConfiguration;
import org.pentaho.di.job.JobMeta;
Expand Down Expand Up @@ -85,8 +86,12 @@ public void doGet( HttpServletRequest request, HttpServletResponse response ) th

try {

final Repository repository = transformationMap.getSlaveServerConfig().getRepository();
final JobMeta jobMeta = loadJob( repository, transOption );
SlaveServerConfig serverConfig = transformationMap.getSlaveServerConfig();
Repository slaveServerRepository = serverConfig.getRepository();
if ( slaveServerRepository == null ) {
throw new KettleException( "Unable to connect to repository in Slave Server Config: " + serverConfig.getRepositoryId() );
}
final JobMeta jobMeta = loadJob( slaveServerRepository, transOption );

// Set the servlet parameters as variables in the transformation
//
Expand All @@ -113,6 +118,12 @@ public void doGet( HttpServletRequest request, HttpServletResponse response ) th
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
LogLevel logLevel = LogLevel.getLogLevelForCode( levelOption );
jobExecutionConfiguration.setLogLevel( logLevel );

// Create new repository connection for this job
//
final Repository repository = jobExecutionConfiguration.connectRepository(
serverConfig.getRepositoryId(), serverConfig.getRepositoryUsername(), serverConfig.getRepositoryPassword() );

JobConfiguration jobConfiguration = new JobConfiguration( jobMeta, jobExecutionConfiguration );

String carteObjectId = UUID.randomUUID().toString();
Expand Down Expand Up @@ -146,18 +157,25 @@ public void doGet( HttpServletRequest request, HttpServletResponse response ) th

job.setSocketRepository( getSocketRepository() );

getJobMap().addJob( job.getJobname(), carteObjectId, job, jobConfiguration );
JobMap jobMap = getJobMap();
synchronized ( jobMap ) {
jobMap.addJob( job.getJobname(), carteObjectId, job, jobConfiguration );
}


// DO NOT disconnect from the shared repository connection when the job finishes.
// Disconnect from the job's repository when the job finishes.
//
job.addJobListener( new JobAdapter() {
public void jobFinished( Job job ) {
repository.disconnect();
}
} );

String message = "Job '" + job.getJobname() + "' was added to the list with id " + carteObjectId;
logBasic( message );

//
try {
// Execute the transformation...
//
job.start();
runJob( job );

WebResult webResult = new WebResult( WebResult.STRING_OK, "Job started", carteObjectId );
out.println( webResult.getXML() );
Expand All @@ -174,6 +192,12 @@ public void doGet( HttpServletRequest request, HttpServletResponse response ) th
}
}

protected void runJob( Job job ) {
// Execute the transformation...
//
job.start();
}

private JobMeta loadJob( Repository repository, String job ) throws KettleException {

if ( repository == null ) {
Expand Down

0 comments on commit 4da38e5

Please sign in to comment.