From d6f53950894982f53f3d7d1e1d3f2598aba12e4e Mon Sep 17 00:00:00 2001 From: vishnupillai Date: Wed, 24 Sep 2014 13:55:12 -0700 Subject: [PATCH] Allow faster shutdown of dir polls. Currently, if there are multiple dir polls deployed, Q2 shutdown takes upto 60 seconds for each of them to shutdown. With 4 dir polls deployed, the typical time for a shutdown is > 4 mins. This change makes the shutdown much faster. --- jpos/src/main/java/org/jpos/util/DirPoll.java | 67 ++++++++++++------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/jpos/src/main/java/org/jpos/util/DirPoll.java b/jpos/src/main/java/org/jpos/util/DirPoll.java index 3d97845c18..a58fc67562 100644 --- a/jpos/src/main/java/org/jpos/util/DirPoll.java +++ b/jpos/src/main/java/org/jpos/util/DirPoll.java @@ -22,7 +22,6 @@ import org.jpos.core.Configuration; import org.jpos.core.ConfigurationException; import org.jpos.iso.ISOException; -import org.jpos.iso.ISOUtil; import java.io.*; import java.text.SimpleDateFormat; @@ -43,7 +42,7 @@ * scanning for incoming requests (of varying priorities) * on the request directory and processing them by means of * DirPoll.Processor or DirPoll.FileProcessor - * + * * @author Alejandro P. Revilla * @author Matthew Milliss * @since jPOS 1.2.7 @@ -66,6 +65,7 @@ public class DirPoll extends SimpleLogSource private String responseSuffix; private ThreadPool pool; private Object processor; + private final Object shutdownMonitor = new Object(); private boolean shutdown; private boolean paused = false; @@ -233,7 +233,7 @@ public synchronized void setThreadPool (ThreadPool pool) { //--------------------------------------- FilenameFilter implementation public boolean accept(File dir, String name) { boolean result; - String ext = currentPriority >= 0 ? + String ext = currentPriority >= 0 ? ((String) prio.elementAt(currentPriority)) : null; if (ext != null) { if (isRegexPriorityMatching()) { @@ -254,7 +254,7 @@ public boolean accept(File dir, String name) { } //--------------------------------------------- Runnable implementation - public void run() { + public void run() { Thread.currentThread().setName ("DirPoll-"+basePath); if (prio.isEmpty()) addPriority(""); @@ -278,19 +278,31 @@ public void run() { getPool().execute (new ProcessorRunner (f)); Thread.yield(); // be nice } - else - Thread.sleep(pollInterval); - } catch (InterruptedException e) { + else { + synchronized (shutdownMonitor) { + if (!shutdown) { + shutdownMonitor.wait(pollInterval); + } + } + } + } catch (InterruptedException e) { } catch (Throwable e) { Logger.log (new LogEvent (this, "dirpoll", e)); try { - Thread.sleep(pollInterval * 10); // anti hog + synchronized (shutdownMonitor) { + if (!shutdown) { + shutdownMonitor.wait(pollInterval * 10); + } + } } catch (InterruptedException ex) { } } - } + } } public void destroy () { - shutdown = true; + synchronized (shutdownMonitor) { + shutdown = true; + shutdownMonitor.notifyAll(); + } } //----------------------------------------------------- public helpers @@ -315,7 +327,7 @@ private byte[] readRequest (File f) throws IOException { in.close(); return b; } - private void writeResponse (String requestName, byte[] b) + private void writeResponse (String requestName, byte[] b) throws IOException { if (responseSuffix != null) { @@ -354,7 +366,7 @@ private void compress(File f) throws IOException { } protected File scan() { - for (currentPriority=0; + for (currentPriority=0; currentPriority < prio.size(); currentPriority++) { String files[] = requestDir.list(this); @@ -377,7 +389,7 @@ public interface Processor { * @param request request image * @return response (or null) */ - public byte[] process(String name, byte[] request) + public byte[] process(String name, byte[] request) throws DirPollException; } public interface FileProcessor { @@ -395,21 +407,21 @@ public ProcessorRunner (File request) throws IOException { this.logEvent = null; } public void run() { - LogEvent evt = + LogEvent evt = new LogEvent ( DirPoll.this, "dirpoll", request.getName() ); try { - if (processor == null) - throw new DirPollException + if (processor == null) + throw new DirPollException ("null processor - nothing to do"); else if (processor instanceof Processor) { byte[] resp = ((Processor) processor).process ( request.getName(), readRequest (request) ); - if (resp != null) + if (resp != null) writeResponse (request.getName(), resp); - } else if (processor instanceof FileProcessor) + } else if (processor instanceof FileProcessor) ((FileProcessor) processor).process (request); if (shouldArchive) { @@ -419,8 +431,8 @@ else if (processor instanceof Processor) { } } else { if (!request.delete ()) - throw new DirPollException - ("error: can't unlink request " + request.getName()); + throw new DirPollException + ("error: can't unlink request " + request.getName()); } } catch (Throwable e) { @@ -428,7 +440,14 @@ else if (processor instanceof Processor) { evt.addMessage (e); try { if ((e instanceof DirPollException && ((DirPollException)e).isRetry())) { - ISOUtil.sleep(pollInterval*10); // retry delay (pollInterval defaults to 100ms) + synchronized (shutdownMonitor) { + if (!shutdown) { + try { + shutdownMonitor.wait(pollInterval * 10); // retry delay (pollInterval defaults to 100ms) + } catch (InterruptedException ie) { + } + } + } evt.addMessage("retrying"); moveTo(request, requestDir); } else { @@ -439,7 +458,7 @@ else if (processor instanceof Processor) { evt.addMessage (_e); } } finally { - if (logEvent != null) + if (logEvent != null) Logger.log (logEvent); } } @@ -465,7 +484,7 @@ public void setRetry(boolean retry) { this.retry = retry; } } - + public void pause() { synchronized (this) { if (!paused) { @@ -486,7 +505,7 @@ public void unpause() { } } } - + public boolean isPaused() { synchronized (this) { return paused;