Implementing Durable Job Chaining in Quartz.NET
Welcome to the final blog post in my Quart.NET series! I will be showing you how to implement durable job chaining in Quartz.NET (both Quartz.NET 2.x and Quartz.NET 3.x).
To begin with, I'd like to go over the Job Chaining solution that comes with Quartz.NET out of the box.
From the Quartz documentation itself, the Job Chain implementation:
Keeps a collection of mappings of which Job to trigger after the completion of a given job. If this listener is notified of a job completing that has a mapping, then it will then attempt to trigger the follow-up job. This achieves "job chaining", or a "poor man's workflow".
Generally an instance of this listener would be registered as a global job listener, rather than being registered directly to a given job.
If for some reason there is a failure creating the trigger for the follow-up job (which would generally only be caused by a rare serious failure in the system, or the non-existence of the follow-up job), an error messsage is logged, but no other action is taken. If you need more rigorous handling of the error, consider scheduling the triggering of the flow-up job within your job itself.
Their implementation is as follows:
public void AddJobChainLink(JobKey firstJob, JobKey secondJob)
{
if (firstJob == null || secondJob == null)
{
throw new ArgumentException("Key cannot be null!");
}
if (firstJob.Name == null || secondJob.Name == null)
{
throw new ArgumentException("Key cannot have a null name!");
}
chainLinks.Add(firstJob, secondJob);
}
public override void JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException)
{
JobKey sj;
chainLinks.TryGetValue(context.JobDetail.Key, out sj);
if (sj == null)
{
return;
}
Log.Info(string.Format(CultureInfo.InvariantCulture, "Job '{0}' will now chain to Job '{1}'", context.JobDetail.Key, sj));
try
{
context.Scheduler.TriggerJob(sj);
}
catch (SchedulerException se)
{
Log.Error(string.Format(CultureInfo.InvariantCulture, "Error encountered during chaining to Job '{0}'", sj), se);
}
}
Again, this is functionality that comes with the Quartz.NET NuGet package out of the box.
If we go back to the explanation of this from Quartz and look at the final sentence: "If you need more rigorous handling of the error, consider scheduling the triggering of the flow-up job within your job itself."
For our needs, we required this "rigorous handling of errors". So today, we will be implementing our own version of the chain handler that will:
1: Chain jobs and persist the links even if the scheduler goes down.
2: Be able to pass a payload between parent and children jobs.
Implementation
The Chaining of Jobs
Since the Quartz.NET implementation for chaining doesn't allow for persistent chains (all job chains are stored within memory), we need to store these chains within the JobDataMap durably. To do this, we will be utilising Quartz' ADO.NET Persistent Job Store. To use this, we need to configure Quartz to use a flavour of DB, please do so by following the steps at: Quartz.NET Persistent Job Stores.
This persistent job store saves all jobs to the database upon creation. This allows Quartz to load all jobs that were in execution/queued in the event of a server failure.
We will now create a method that will let us create jobs and specify a list of children jobs to be executed upon the parent job's completion (Note: anytime you see Constants.XXX.YYY, you can either use a string or your own Constant):
public IJobDetail CreateJob<TJob>(Dictionary<string, object> payloadMap, params IJobDetail[] childrenJobs)
where TJob : IJob
{
var newJob = JobBuilder.Create<TJob>()
.WithIdentity(Guid.NewGuid().ToString("N"))
.StoreDurably(true)
.RequestRecovery(false)
.Build();
newJob.JobDataMap.Put(Constants.PayloadKey, payloadMap);
if (childrenJobs != null && childrenJobs.Length > 0)
{
var jkList = childrenJobs.Select(job => job.Key).ToList();
newJob.JobDataMap.Put(Constants.NextJobKey, jkList);
}
TaskScheduler.Schedule(newJob);
return newJob;
}
In this CreateJob method, we specify the type of job we want to create through the generic . We also accept a payloadMap. This payloadMap is attached to the new job and can contain any object. For example, if I wanted to store an integer: 1337 and have to persist to all children jobs, I would create the dictionary and pass it through:
var payloadMap = new Dictionary<string, int>{("intValue", 1337)};
We also accept a reference to an array of all jobs that we want to execute after the job (IJobDetail
) being created has completed.
With these parameters, the method utilises the Quartz API to create a durably stored job (the job's JobDataMap is stored within the DB) and places the dictionary payload within the JobDataMap
. Then it loops through each children job specified, getting their unique job key and attaching that to the JobDataMap
of our parent job. It finally add the parent job to the scheduler (but NOT executing it).
The Job Chain Listener
Now that we have the ability to create parent and children jobs that are linked by their job keys, we need a way to execute children jobs once a parent job is complete. We will be creating a job listener to do this. The implementation varies depending on whether you're using Quartz.NET 2.x or Quartz.NET 3.x.
Quartz.NET 2.x
// Quartz.NET 2.x
public class JobChainHandler : IJobListener
{
public string Name => "JobChainHandler";
public void JobToBeExecuted(IJobExecutionContext context) { }
public void JobExecutionVetoed(IJobExecutionContext context) { }
public void JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException)
{
if (jobException != null)
{
return;
}
if (context == null)
{
throw new ArgumentNullException("Completed job does not have valid Job Execution Context");
}
var finishedJob = context.JobDetail;
context.Scheduler.DeleteJob(finishedJob.Key);
var childJobs = finishedJob.JobDataMap.Get(Constants.NextJobKey) as List<JobKey>;
if (childJobs == null)
{
return;
}
foreach (var jobKey in childJobs)
{
var newJob = context.Scheduler.GetJobDetail(jobKey);
if (newJob == null)
{
Debug.WriteLine($"Could not find Job with ID: {jobKey}");
continue;
}
var oldJobMap = context.JobDetail.JobDataMap.Get(Constants.PayloadKey) as Dictionary<string, object>;
newJob.JobDataMap.Put(Constants.PayloadKey, oldJobMap);
context.Scheduler.AddJob(newJob, true, false);
context.Scheduler.TriggerJob(jobKey);
}
}
}
Quartz.NET 3.x
// Quartz.NET 3.x
public class JobChainHandler : IJobListener
{
public string Name => "JobChainHandler";
public Task JobToBeExecuted(IJobExecutionContext context, CancellationToken ct)
{
return Task.FromResult<object>(null);
}
public Task JobExecutionVetoed(IJobExecutionContext context, CancellationToken ct)
{
return Task.FromResult<object>(null);
}
public async Task JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException, CancellationToken ct)
{
if (jobException != null)
{
return;
}
if (context == null)
{
throw new ArgumentNullException("Completed job does not have valid Job Execution Context");
}
var finishedJob = context.JobDetail;
await context.Scheduler.DeleteJob(finishedJob.Key);
var childJobs = finishedJob.JobDataMap.Get(Constants.NextJobKey) as List<JobKey>;
if (childJobs == null)
{
return;
}
foreach (var jobKey in childJobs)
{
var newJob = await context.Scheduler.GetJobDetail(jobKey);
if (newJob == null)
{
Debug.WriteLine($"Could not find Job with ID: {jobKey}");
continue;
}
var oldJobMap = context.JobDetail.JobDataMap.Get(Constants.PayloadKey) as Dictionary<string, object>;
newJob.JobDataMap.Put(Constants.PayloadKey, oldJobMap);
await context.Scheduler.AddJob(newJob, true, false);
await context.Scheduler.TriggerJob(jobKey);
}
}
}
Constants Class
public class Constants
{
public static readonly string PayloadKey = "payloadkey";
public static readonly string NextJob = "nextjobkey";
}
Persisting Data
All jobs that we want to run with our JobChainHandler will need an additional attribute for it to work properly. For each of our jobs, we will need to add the following attribute: [PersistJobDataAfterExecution]
. If we don't add this attribute, jobs will not persist the their JobDataMap
and lose the next job keys to run after a job's completion. This is done like:
[PersistJobDataAfterExecution]
public class ExampleJob: IJob
{
public void Execute(IJobExecutionContext context) { ... }
}
Overview
Here we are telling Quartz that every time a job is complete, check if the job has any children job keys within it's JobDataMap
. If it does, grab the payload of the parent job, loop through each JobKey, transfer the payload from the parent, add the children job(s) to the scheduler and immediately kick them off.
Usage
Now we have created our job creation/chaining method and chain listener, all we have to do is actually use them! We create the leaf jobs first (jobs with no children) and work up to the root job (the job with no parent). An example of this is as follows:
public void Example()
{
var fourthJob = CreateJob<FourthJob>();
var thirdJob = CreateJob<ThirdJob>(fourthJob);
var secondJob = CreateJob<SecondJob>();
var payload = new Dictionary<string, object>
{
{ "jobData1", 123 },
{ "jobData2", "hello" },
{ "jobData3", true }
};
var firstJob = CreateJob<FirstJob>(payload, secondJob, thirdJob);
scheduler.triggerJob(firstJob.Key); // Starts the chain!
}
This method will have the following flow:
Since we utilise persistent job data storage, our jobs will also recover and maintain the workflow even if the scheduler goes down. We have now created our simple workflow system in Quartz.NET! Woohoo!