Quartz Interrupt Job Example

0

If a job is running for a long time, then we must have a way to interrupt it. In order for a Quartz job to be made interruptible, we first needs to implement InterruptableJob.
Calling InterruptableJob.interrupt() won’t automatically interrupt the job as it is just an API and one needs to implement the actual mechanism.
In this article, we will see the various ways of interrupting a job.

Interrupting a job

In the below class, we start the scheduler and then schedule an interruptible job. Next, we wait for a second for the job to kick in.
scheduler.getCurrentlyExecutingJobs() will return a list of jobs currently executing. We then interrupt the job, sleep for 3 seconds and then print the jobs currently executing. If interrupted successfully, we shouldn’t see the job we just interrupted.

InterruptibleJobScheduler:

package com.javarticles.quartz;

import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

public class InterruptibleJobScheduler {
    private Class<? extends Job> jobClass;
    public InterruptibleJobScheduler(Class<? extends Job> jobClass) {
        this.jobClass = jobClass;
    }
    
    public void start() throws SchedulerException {
        SchedulerFactory schedulerFactory = new StdSchedulerFactory();
        Scheduler scheduler = schedulerFactory.getScheduler();
        try {
            JobDetail jobDetail = JobBuilder.newJob(jobClass)
                    .withIdentity("myJob", "myGroup").build();

            Trigger trigger = TriggerBuilder
                    .newTrigger()
                    .withIdentity("myTrigger", "myGroup")
                    .startNow()
                    .withSchedule(
                            SimpleScheduleBuilder
                                    .simpleSchedule()
                    )
                    .build();
            scheduler.start();
            scheduler.scheduleJob(jobDetail, trigger);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("before interrupt " + scheduler.getCurrentlyExecutingJobs());
            scheduler.interrupt(jobDetail.getKey());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("after interrupt " + scheduler.getCurrentlyExecutingJobs());
        } catch (SchedulerException e) {
            e.printStackTrace();
        } finally {
            scheduler.shutdown();
        }
    }
}

Interrupting Job using Thread.interrupt

In InterruptableJob.interrupt(), we set some flag to indicate that an interruption is requested. Implementation of the job should be such a way that it can periodically check the flag to see if the job is to be interrupted. In the below example, we call Thread.interrupt() to set the interrupt flag. In Job.execute() method, we store a reference to the calling Thread as a member variable. In interrupt() method, we call interrupt() on that Thread.
In job.execute, we check thread’s interrupt flag to know if an interrupt has been requested. If it is interrupted, we break from the loop.

InterruptibleJobUsingThreadInterrupt:

package com.javarticles.quartz;

import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
import org.quartz.UnableToInterruptJobException;

public class InterruptibleJobUsingThreadInterrupt implements InterruptableJob {
    private Thread currentThread;

    public static void main(String[] args) throws SchedulerException {
        InterruptibleJobScheduler interruptibleJobScheduler = new InterruptibleJobScheduler(
                InterruptibleJobUsingThreadInterrupt.class);
        interruptibleJobScheduler.start();
    }

    public void execute(JobExecutionContext context)
            throws JobExecutionException {
        currentThread = Thread.currentThread();
        try {
            System.out.println("Start");
            while (true) {
                if (Thread.interrupted()) {
                    System.out.print("\nJob Interrupted\n");
                    break;
                }
            }
        } finally {
            currentThread = null;
        }
    }

    public void interrupt() throws UnableToInterruptJobException {
        if (currentThread != null) {
            currentThread.interrupt();
        }
    }
}

Output:

13:01| INFO | StdSchedulerFactory.java 1184 | Using default implementation for ThreadExecutor
13:01| INFO | SimpleThreadPool.java 268 | Job execution threads will use class loader of thread: main
13:01| INFO | SchedulerSignalerImpl.java 61 | Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
13:01| INFO | QuartzScheduler.java 240 | Quartz Scheduler v.2.2.1 created.
13:01| INFO | RAMJobStore.java 155 | RAMJobStore initialized.
13:01| INFO | QuartzScheduler.java 305 | Scheduler meta-data: Quartz Scheduler (v2.2.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

13:01| INFO | StdSchedulerFactory.java 1339 | Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
13:01| INFO | StdSchedulerFactory.java 1343 | Quartz scheduler version: 2.2.1
13:01| INFO | QuartzScheduler.java 575 | Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
Start
before interrupt [JobExecutionContext: trigger: 'myGroup.myTrigger job: myGroup.myJob fireTime: 'Sun Jun 26 13:01:55 IST 2016 scheduledFireTime: Sun Jun 26 13:01:55 IST 2016 previousFireTime: 'null nextFireTime: null isRecovering: false refireCount: 0]

Job Interrupted
after interrupt []
13:01| INFO | QuartzScheduler.java 694 | Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutting down.
13:01| INFO | QuartzScheduler.java 613 | Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED paused.
13:01| INFO | QuartzScheduler.java 771 | Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutdown complete.

Long running task composed of smaller tasks

Suppose we have multiple tasks which are similar in nature, for example, running purge on different database tables then we can break the entire purge task into smaller tasks, one for each table. After the completion of one task and before starting the next one, we can check whether the job has been interrupted. If yes then we simply need quit the job execution. If the job is interrupted, it won’t get into the loop.

for (int i = 0; i < tasks.length && !isInterrupted(); i++) {
            tasks[i].doTask();
}

LongJobComposedOfSmallerTasks:

package com.javarticles.quartz;

import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
import org.quartz.UnableToInterruptJobException;

public class LongJobComposedOfSmallerTasks implements InterruptableJob {
    private SmallTask[] tasks = new SmallTask[8];
    private JobExecutionContext context;

    public static void main(String[] args) throws SchedulerException {
        InterruptibleJobScheduler interruptibleJobScheduler = new InterruptibleJobScheduler(
                LongJobComposedOfSmallerTasks.class);
        interruptibleJobScheduler.start();
    }

    public void execute(JobExecutionContext context)
            throws JobExecutionException {
        this.context = context;
        for (int i = 0; i < tasks.length; i++) {
            tasks[i]= new SmallTask(i);
        }
        for (int i = 0; i < tasks.length && !isInterrupted(); i++) {
            tasks[i].doTask();
        }
        System.out.println("Done: interrupted? " + isInterrupted());
        System.out.println("Tasks left:");
        for (int i = 0; i < tasks.length; i++) {
            if (!tasks[i].isDone()) {
                System.out.println(tasks[i]);
            }
        }
    }

    private boolean isInterrupted() {
        return this.context.getJobDetail().getJobDataMap()
                .containsKey("interrupt")
                && (Boolean) this.context.getJobDetail().getJobDataMap()
                        .get("interrupt");
    }

    public void interrupt() throws UnableToInterruptJobException {
        this.context.getJobDetail().getJobDataMap().put("interrupt", true);
    }

    private class SmallTask {
        private int i;
        private boolean done;

        SmallTask(int i) {
            this.i = i;
        }

        void doTask() {
            System.out.println("Start " + this);
            int count = 0;
            for (long j = 0; j < 999999999L; j++) {
                // System.out.print(j);
                if (j % 10000000 == 0) {
                    System.out.print('.');
                    count++;
                    if (count == 1000) {
                        System.out.println("\n");
                        count = 0;
                    }
                }
            }
            System.out.println();
            done = true;
        }

        public String toString() {
            return "Task(" + i + ")";
        }

        boolean isDone() {
            return done;
        }
    }
}

As you can see only the first task runs thru and remaining tasks never get a chance to run as the job is interrupted.

Output:

13:04| INFO | StdSchedulerFactory.java 1184 | Using default implementation for ThreadExecutor
13:04| INFO | SimpleThreadPool.java 268 | Job execution threads will use class loader of thread: main
13:04| INFO | SchedulerSignalerImpl.java 61 | Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
13:04| INFO | QuartzScheduler.java 240 | Quartz Scheduler v.2.2.1 created.
13:04| INFO | RAMJobStore.java 155 | RAMJobStore initialized.
13:04| INFO | QuartzScheduler.java 305 | Scheduler meta-data: Quartz Scheduler (v2.2.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

13:04| INFO | StdSchedulerFactory.java 1339 | Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
13:04| INFO | StdSchedulerFactory.java 1343 | Quartz scheduler version: 2.2.1
13:04| INFO | QuartzScheduler.java 575 | Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
Start Task(0)
............................................................................before interrupt [JobExecutionContext: trigger: 'myGroup.myTrigger job: myGroup.myJob fireTime: 'Sun Jun 26 13:04:29 IST 2016 scheduledFireTime: Sun Jun 26 13:04:29 IST 2016 previousFireTime: 'null nextFireTime: null isRecovering: false refireCount: 0]
........................
Done: interrupted? true
Tasks left:
Task(1)
Task(2)
Task(3)
Task(4)
Task(5)
Task(6)
Task(7)
after interrupt []
13:04| INFO | QuartzScheduler.java 694 | Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutting down.
13:04| INFO | QuartzScheduler.java 613 | Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED paused.
13:04| INFO | QuartzScheduler.java 771 | Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutdown complete.

ServerSocket Interruptible Job

In our last example, we create a server socket and then make it listen for connections. The atomic boolean variable stop decides whether to exit from the loop or wait for socket connections. The server socket is set to a timeout of 5 seconds so after every 5 seconds, it checks for the stop flag.

When the job is interrupted, it sets the stop flag to true.

ServerSocketJob:

package com.javarticles.quartz;

import org.quartz.*;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

public class ServerSocketJob implements InterruptableJob {
    private AtomicBoolean stop = new AtomicBoolean(false);

    public static void main(String[] args) throws SchedulerException {
        InterruptibleJobScheduler interruptibleJobScheduler = new InterruptibleJobScheduler(
                ServerSocketJob.class);
        interruptibleJobScheduler.start();
    }

    public void execute(JobExecutionContext context)
            throws JobExecutionException {
        try {
            System.out.println("Start the job");
            ServerSocket s = null;
            try {
                s = new ServerSocket(9999);
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                s.setSoTimeout(5000);
            } catch (SocketException e) {
                e.printStackTrace();
            }
            while (!stop.get()) {
                try {
                    s.accept();
                } catch (SocketTimeoutException e) {
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("Server is down");
        } finally {
            stop = null;
        }
    }

    public void interrupt() throws UnableToInterruptJobException {
        System.out.println("Job Interrupted");
        stop.set(true);
    }
}

Output:

13:10| INFO | StdSchedulerFactory.java 1184 | Using default implementation for ThreadExecutor
13:10| INFO | SimpleThreadPool.java 268 | Job execution threads will use class loader of thread: main
13:10| INFO | SchedulerSignalerImpl.java 61 | Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
13:10| INFO | QuartzScheduler.java 240 | Quartz Scheduler v.2.2.1 created.
13:10| INFO | RAMJobStore.java 155 | RAMJobStore initialized.
13:10| INFO | QuartzScheduler.java 305 | Scheduler meta-data: Quartz Scheduler (v2.2.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

13:10| INFO | StdSchedulerFactory.java 1339 | Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
13:10| INFO | StdSchedulerFactory.java 1343 | Quartz scheduler version: 2.2.1
13:10| INFO | QuartzScheduler.java 575 | Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
Start the job
before interrupt [JobExecutionContext: trigger: 'myGroup.myTrigger job: myGroup.myJob fireTime: 'Sun Jun 26 13:10:54 IST 2016 scheduledFireTime: Sun Jun 26 13:10:54 IST 2016 previousFireTime: 'null nextFireTime: null isRecovering: false refireCount: 0]
Job Interrupted
after interrupt [JobExecutionContext: trigger: 'myGroup.myTrigger job: myGroup.myJob fireTime: 'Sun Jun 26 13:10:54 IST 2016 scheduledFireTime: Sun Jun 26 13:10:54 IST 2016 previousFireTime: 'null nextFireTime: null isRecovering: false refireCount: 0]
13:10| INFO | QuartzScheduler.java 694 | Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutting down.
13:10| INFO | QuartzScheduler.java 613 | Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED paused.
13:10| INFO | QuartzScheduler.java 771 | Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutdown complete.
Server is down

Download the source code

This was an example about the various ways of interrupting Quartz job.

You can download the source code here: quartzInterruptibleExample.zip

About Author

Ram's expertise lies in test driven development and re-factoring. He is passionate about open source technologies and loves blogging on various java and open-source technologies like spring. You can reach him at rsatish.m@gmail.com

Comments are closed.