From 32e309ae4d62f3b38334ad60977660cb55c1deff Mon Sep 17 00:00:00 2001 From: Ganesh Date: Tue, 30 Jan 2018 20:25:08 +0530 Subject: [PATCH] BAEL-1456 - How to implement task prioritization in Java (#3366) * priority based job execution in java * minor fixes * updated to use java 8 features --- .../concurrent/prioritytaskexecution/Job.java | 24 ++++++++ .../prioritytaskexecution/JobPriority.java | 7 +++ .../PriorityJobScheduler.java | 56 +++++++++++++++++++ .../PriorityJobSchedulerUnitTest.java | 38 +++++++++++++ 4 files changed, 125 insertions(+) create mode 100644 core-java-concurrency/src/main/java/com/baeldung/concurrent/prioritytaskexecution/Job.java create mode 100644 core-java-concurrency/src/main/java/com/baeldung/concurrent/prioritytaskexecution/JobPriority.java create mode 100644 core-java-concurrency/src/main/java/com/baeldung/concurrent/prioritytaskexecution/PriorityJobScheduler.java create mode 100644 core-java-concurrency/src/test/java/com/baeldung/concurrent/prioritytaskexecution/PriorityJobSchedulerUnitTest.java diff --git a/core-java-concurrency/src/main/java/com/baeldung/concurrent/prioritytaskexecution/Job.java b/core-java-concurrency/src/main/java/com/baeldung/concurrent/prioritytaskexecution/Job.java new file mode 100644 index 0000000000..a70041ed7d --- /dev/null +++ b/core-java-concurrency/src/main/java/com/baeldung/concurrent/prioritytaskexecution/Job.java @@ -0,0 +1,24 @@ +package com.baeldung.concurrent.prioritytaskexecution; + +public class Job implements Runnable { + private String jobName; + private JobPriority jobPriority; + + public Job(String jobName, JobPriority jobPriority) { + this.jobName = jobName; + this.jobPriority = jobPriority != null ? jobPriority : JobPriority.MEDIUM; + } + + public JobPriority getJobPriority() { + return jobPriority; + } + + @Override + public void run() { + try { + System.out.println("Job:" + jobName + " Priority:" + jobPriority); + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } + } +} \ No newline at end of file diff --git a/core-java-concurrency/src/main/java/com/baeldung/concurrent/prioritytaskexecution/JobPriority.java b/core-java-concurrency/src/main/java/com/baeldung/concurrent/prioritytaskexecution/JobPriority.java new file mode 100644 index 0000000000..d8092a9ce7 --- /dev/null +++ b/core-java-concurrency/src/main/java/com/baeldung/concurrent/prioritytaskexecution/JobPriority.java @@ -0,0 +1,7 @@ +package com.baeldung.concurrent.prioritytaskexecution; + +public enum JobPriority { + HIGH, + MEDIUM, + LOW +} diff --git a/core-java-concurrency/src/main/java/com/baeldung/concurrent/prioritytaskexecution/PriorityJobScheduler.java b/core-java-concurrency/src/main/java/com/baeldung/concurrent/prioritytaskexecution/PriorityJobScheduler.java new file mode 100644 index 0000000000..70fd1710c0 --- /dev/null +++ b/core-java-concurrency/src/main/java/com/baeldung/concurrent/prioritytaskexecution/PriorityJobScheduler.java @@ -0,0 +1,56 @@ +package com.baeldung.concurrent.prioritytaskexecution; + +import java.util.Comparator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class PriorityJobScheduler { + + private ExecutorService priorityJobPoolExecutor; + private ExecutorService priorityJobScheduler; + private PriorityBlockingQueue priorityQueue; + + public PriorityJobScheduler(Integer poolSize, Integer queueSize) { + priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize); + Comparator jobComparator = Comparator.comparing(Job::getJobPriority); + priorityQueue = new PriorityBlockingQueue(queueSize, + (Comparator) jobComparator); + + priorityJobScheduler = Executors.newSingleThreadExecutor(); + priorityJobScheduler.execute(()->{ + while (true) { + try { + priorityJobPoolExecutor.execute(priorityQueue.take()); + } catch (InterruptedException e) { + break; + } + } + }); + } + + public void scheduleJob(Job job) { + priorityQueue.add(job); + } + + public int getQueuedTaskCount() { + return priorityQueue.size(); + } + + protected void close(ExecutorService scheduler) { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + } + } + + public void closeScheduler() { + close(priorityJobPoolExecutor); + close(priorityJobScheduler); + } +} diff --git a/core-java-concurrency/src/test/java/com/baeldung/concurrent/prioritytaskexecution/PriorityJobSchedulerUnitTest.java b/core-java-concurrency/src/test/java/com/baeldung/concurrent/prioritytaskexecution/PriorityJobSchedulerUnitTest.java new file mode 100644 index 0000000000..902bada3a2 --- /dev/null +++ b/core-java-concurrency/src/test/java/com/baeldung/concurrent/prioritytaskexecution/PriorityJobSchedulerUnitTest.java @@ -0,0 +1,38 @@ +package com.baeldung.concurrent.prioritytaskexecution; + +import org.junit.Test; + +public class PriorityJobSchedulerUnitTest { + private static int POOL_SIZE = 1; + private static int QUEUE_SIZE = 10; + + @Test + public void whenMultiplePriorityJobsQueued_thenHighestPriorityJobIsPicked() { + Job job1 = new Job("Job1", JobPriority.LOW); + Job job2 = new Job("Job2", JobPriority.MEDIUM); + Job job3 = new Job("Job3", JobPriority.HIGH); + Job job4 = new Job("Job4", JobPriority.MEDIUM); + Job job5 = new Job("Job5", JobPriority.LOW); + Job job6 = new Job("Job6", JobPriority.HIGH); + + PriorityJobScheduler pjs = new PriorityJobScheduler(POOL_SIZE, QUEUE_SIZE); + + pjs.scheduleJob(job1); + pjs.scheduleJob(job2); + pjs.scheduleJob(job3); + pjs.scheduleJob(job4); + pjs.scheduleJob(job5); + pjs.scheduleJob(job6); + + // ensure no tasks is pending before closing the scheduler + while (pjs.getQueuedTaskCount() != 0); + + // delay to avoid job sleep (added for demo) being interrupted + try { + Thread.sleep(2000); + } catch (InterruptedException ignored) { + } + + pjs.closeScheduler(); + } +}