Skip to content

Commit

Permalink
Allow faster shutdown of dir polls.
Browse files Browse the repository at this point in the history
Currently, if there are multiple dir polls deployed, Q2 shutdown takes upto 60 seconds for each of them to shutdown. With 4 dir polls deployed, the typical time for a shutdown is > 4 mins. This change makes the shutdown much faster.
  • Loading branch information
vishnupillai committed Sep 24, 2014
1 parent 0930426 commit d6f5395
Showing 1 changed file with 43 additions and 24 deletions.
67 changes: 43 additions & 24 deletions jpos/src/main/java/org/jpos/util/DirPoll.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.jpos.core.Configuration;
import org.jpos.core.ConfigurationException;
import org.jpos.iso.ISOException;
import org.jpos.iso.ISOUtil;

import java.io.*;
import java.text.SimpleDateFormat;
Expand All @@ -43,7 +42,7 @@
* scanning for incoming requests (of varying priorities)
* on the request directory and processing them by means of
* DirPoll.Processor or DirPoll.FileProcessor
*
*
* @author <a href="mailto:[email protected]">Alejandro P. Revilla</a>
* @author <a href="mailto:[email protected]">Matthew Milliss</a>
* @since jPOS 1.2.7
Expand All @@ -66,6 +65,7 @@ public class DirPoll extends SimpleLogSource
private String responseSuffix;
private ThreadPool pool;
private Object processor;
private final Object shutdownMonitor = new Object();

private boolean shutdown;
private boolean paused = false;
Expand Down Expand Up @@ -233,7 +233,7 @@ public synchronized void setThreadPool (ThreadPool pool) {
//--------------------------------------- FilenameFilter implementation
public boolean accept(File dir, String name) {
boolean result;
String ext = currentPriority >= 0 ?
String ext = currentPriority >= 0 ?
((String) prio.elementAt(currentPriority)) : null;
if (ext != null) {
if (isRegexPriorityMatching()) {
Expand All @@ -254,7 +254,7 @@ public boolean accept(File dir, String name) {
}
//--------------------------------------------- Runnable implementation

public void run() {
public void run() {
Thread.currentThread().setName ("DirPoll-"+basePath);
if (prio.isEmpty())
addPriority("");
Expand All @@ -278,19 +278,31 @@ public void run() {
getPool().execute (new ProcessorRunner (f));
Thread.yield(); // be nice
}
else
Thread.sleep(pollInterval);
} catch (InterruptedException e) {
else {
synchronized (shutdownMonitor) {
if (!shutdown) {
shutdownMonitor.wait(pollInterval);
}
}
}
} catch (InterruptedException e) {
} catch (Throwable e) {
Logger.log (new LogEvent (this, "dirpoll", e));
try {
Thread.sleep(pollInterval * 10); // anti hog
synchronized (shutdownMonitor) {
if (!shutdown) {
shutdownMonitor.wait(pollInterval * 10);
}
}
} catch (InterruptedException ex) { }
}
}
}
}
public void destroy () {
shutdown = true;
synchronized (shutdownMonitor) {
shutdown = true;
shutdownMonitor.notifyAll();
}
}

//----------------------------------------------------- public helpers
Expand All @@ -315,7 +327,7 @@ private byte[] readRequest (File f) throws IOException {
in.close();
return b;
}
private void writeResponse (String requestName, byte[] b)
private void writeResponse (String requestName, byte[] b)
throws IOException
{
if (responseSuffix != null) {
Expand Down Expand Up @@ -354,7 +366,7 @@ private void compress(File f) throws IOException {
}

protected File scan() {
for (currentPriority=0;
for (currentPriority=0;
currentPriority < prio.size(); currentPriority++)
{
String files[] = requestDir.list(this);
Expand All @@ -377,7 +389,7 @@ public interface Processor {
* @param request request image
* @return response (or null)
*/
public byte[] process(String name, byte[] request)
public byte[] process(String name, byte[] request)
throws DirPollException;
}
public interface FileProcessor {
Expand All @@ -395,21 +407,21 @@ public ProcessorRunner (File request) throws IOException {
this.logEvent = null;
}
public void run() {
LogEvent evt =
LogEvent evt =
new LogEvent (
DirPoll.this, "dirpoll", request.getName()
);
try {
if (processor == null)
throw new DirPollException
if (processor == null)
throw new DirPollException
("null processor - nothing to do");
else if (processor instanceof Processor) {
byte[] resp = ((Processor) processor).process (
request.getName(), readRequest (request)
);
if (resp != null)
if (resp != null)
writeResponse (request.getName(), resp);
} else if (processor instanceof FileProcessor)
} else if (processor instanceof FileProcessor)
((FileProcessor) processor).process (request);

if (shouldArchive) {
Expand All @@ -419,16 +431,23 @@ else if (processor instanceof Processor) {
}
} else {
if (!request.delete ())
throw new DirPollException
("error: can't unlink request " + request.getName());
throw new DirPollException
("error: can't unlink request " + request.getName());
}

} catch (Throwable e) {
logEvent = evt;
evt.addMessage (e);
try {
if ((e instanceof DirPollException && ((DirPollException)e).isRetry())) {
ISOUtil.sleep(pollInterval*10); // retry delay (pollInterval defaults to 100ms)
synchronized (shutdownMonitor) {
if (!shutdown) {
try {
shutdownMonitor.wait(pollInterval * 10); // retry delay (pollInterval defaults to 100ms)
} catch (InterruptedException ie) {
}
}
}
evt.addMessage("retrying");
moveTo(request, requestDir);
} else {
Expand All @@ -439,7 +458,7 @@ else if (processor instanceof Processor) {
evt.addMessage (_e);
}
} finally {
if (logEvent != null)
if (logEvent != null)
Logger.log (logEvent);
}
}
Expand All @@ -465,7 +484,7 @@ public void setRetry(boolean retry) {
this.retry = retry;
}
}

public void pause() {
synchronized (this) {
if (!paused) {
Expand All @@ -486,7 +505,7 @@ public void unpause() {
}
}
}

public boolean isPaused() {
synchronized (this) {
return paused;
Expand Down

0 comments on commit d6f5395

Please sign in to comment.