Java Concurrency Fork Join Pool

Fork Join pool


ForkJoinPool was introduced in Java 7. Same is similar to Executor framework but with one difference. Forkjoin pool acts in recursive way unlike Executor thread, Executor thread splits the bigger task then submit task to worker threads. ForkJoin pool takes a big task then
Split into smaller tasks again those smaller tasks splits themselves to  sub tasks until each sub-task is atomic or not divisible.  So it work’s recursively. 




Pictorial view to understand ForkJoin pool concept.                      
java 7 Fork Join pool,java 1.7 Fork Join pool
                          

                                                Divide Task in to smaller tasks


Fork: To split the sub-tasks from Bigger task. Ex: Task 1.1 splits to Task 1.1.1 and Task 1.1.2

Join: Getting result from immediate sub-tasks. Ex: Task 1.1 take results from Task 1.1.1 and Task 1.1.2

Fork Join pool is faster than Executor service.

Fork join Vs Executor service.

java 7 Fork Join Vs Executor service

Example:

Suppose we want search an element in a sorted array. So we will use
 Binary Search Algorithm.

Our Search Algorithm:

Step 1:   Determines the mid element of the array, check mid element equals with search element if so return else split array in to two halves based on mid element.

Step 2: If search element is less than mid element then we create a new subtask in this sub task we take left half of the array.

Step 3: If element is greater than mid element then we create a new subtask in this sub task we take right half of the array.

Step 4: Until element is not found we continue the step 4 and 5.

Step 5: If array size is 1 and array element is not equal to search element. returns Element Not Found.


Coding:
package com.example.concurrency;

import java.util.Arrays;
import java.util.concurrent.RecursiveTask;

public class ForkJoinSearcher extends RecursiveTask<Boolean>{

       int[] arr;
       int searchableElement;
       ForkJoinSearcher(int[] arr,int search)
       {
              this.arr = arr;
              this.searchableElement=search;
             
       }
       @Override
       protected Boolean compute() {
      
              int mid=( 0 + arr.length)/2;
              System.out.println(Thread.currentThread().getName() + " says : After splliting the arry length is :: "+ arr.length + " Midpoint is " + mid);
             
              if(arr[mid]==searchableElement)
              {
                     System.out.println(" FOUND !!!!!!!!!");
                     return true;
              }
              else if(mid==1 || mid == arr.length)
              {
                     System.out.println("NOT FOUND !!!!!!!!!");
                     return false;
              }
              else if(searchableElement < arr[mid])
              {
                     System.out.println(Thread.currentThread().getName() + " says :: Doing Left-search");
                     int[] left = Arrays.copyOfRange(arr, 0, mid);
                     ForkJoinSearcher forkTask = new ForkJoinSearcher(left,searchableElement);
                     forkTask.fork();
                     return forkTask.join();
              }
              else if(searchableElement > arr[mid])
              {
                     System.out.println(Thread.currentThread().getName() + " says :: Doing Right-search");
                     int[] right = Arrays.copyOfRange(arr, mid, arr.length);
                     ForkJoinSearcher forkTask = new ForkJoinSearcher(right,searchableElement);
                     forkTask.fork();
                     return forkTask.join();
              }
             
              return false;
             
       }

}



package com.example.concurrency;

import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;

public class BinarySearch {
      
       int[] arr = new int[100];
      
       public BinarySearch()
       {
              init();
       }
      
       private void init()
       {
              for(int i=0; i<arr.length;i++)
              {
                     arr[i]=i;
              }
             
              Arrays.sort(arr);
             
             
       }
      
       public void createForJoinPool(int search)
       {
              ForkJoinPool forkJoinPool = new ForkJoinPool(50);
              ForkJoinSearcher searcher = new ForkJoinSearcher(this.arr,search);

              Boolean status = forkJoinPool.invoke(searcher);
             
              System.out.println(" Element ::" + search +" has been found in array? :: " + status );
             
       }
      
      
       public static void main(String[] args) {
             
              BinarySearch search = new BinarySearch();
              search.createForJoinPool(10);
              System.out.println("**********************");
              search.createForJoinPool(104);
             
       }
      
      
      
}


Output :

ForkJoinPool-1-worker-57 says : After splliting the arry length is :: 100 Midpoint is 50
ForkJoinPool-1-worker-57 says :: Doing Left-search
ForkJoinPool-1-worker-57 says : After splliting the arry length is :: 50 Midpoint is 25
ForkJoinPool-1-worker-57 says :: Doing Left-search
ForkJoinPool-1-worker-50 says : After splliting the arry length is :: 25 Midpoint is 12
ForkJoinPool-1-worker-50 says :: Doing Left-search
ForkJoinPool-1-worker-57 says : After splliting the arry length is :: 12 Midpoint is 6
ForkJoinPool-1-worker-57 says :: Doing Right-search
ForkJoinPool-1-worker-50 says : After splliting the arry length is :: 6 Midpoint is 3
ForkJoinPool-1-worker-50 says :: Doing Right-search
ForkJoinPool-1-worker-43 says : After splliting the arry length is :: 3 Midpoint is 1
 FOUND !!!!!!!!!
 Element ::10 has been found in array? :: true
**********************
ForkJoinPool-2-worker-57 says : After splliting the arry length is :: 100 Midpoint is 50
ForkJoinPool-2-worker-57 says :: Doing Right-search
ForkJoinPool-2-worker-57 says : After splliting the arry length is :: 50 Midpoint is 25
ForkJoinPool-2-worker-57 says :: Doing Right-search
ForkJoinPool-2-worker-50 says : After splliting the arry length is :: 25 Midpoint is 12
ForkJoinPool-2-worker-50 says :: Doing Right-search
ForkJoinPool-2-worker-57 says : After splliting the arry length is :: 13 Midpoint is 6
ForkJoinPool-2-worker-57 says :: Doing Right-search
ForkJoinPool-2-worker-50 says : After splliting the arry length is :: 7 Midpoint is 3
ForkJoinPool-2-worker-50 says :: Doing Right-search
ForkJoinPool-2-worker-57 says : After splliting the arry length is :: 4 Midpoint is 2
ForkJoinPool-2-worker-57 says :: Doing Right-search
ForkJoinPool-2-worker-43 says : After splliting the arry length is :: 2 Midpoint is 1
NOT FOUND !!!!!!!!!
 Element ::104 has been found in array? :: false

Java Callable (Future) understanding


Java callable understanding



In Java Multi-threading program we extensively use Java Callable and Future. I believe all of you have basic understanding in thread. In briefly thread is a separate path of execution so if you have to perform a repetitive task you can break the work into multiple small chunks(tasks) and assign them to threads. Multiple Threads will execute tasks in parallel to get the result quickly.

In Java 5 java.util.concurrent.Callable interface was introduced in concurrency package, which  is similar to Runnable interface, but with one difference it can returns any Object and able to throw Exception.

Java Callable interface use Generic <T> so it can return any type of Object. Executor Framework provide a submit() method to execute Callable implementations  in a thread pool. Actually Java Executor Framework follows WorkerThread pattern where in a thread pool you can initiate threads by Executors.newFixedThreadPool(10); method, Then you can submit task into pool , you can recall in java ,runnable act as target of a thread and runnable interface has a  method called public void run() which has to be implemented and you can define the task there which will execute by Threads in thread pool. Executor framework assign a work (runnable target) to threads only if there is an available thread in pool. If all threads are in use, work has to wait. Once a task is completed by a Thread same returns to pool as available thread. Callable is same as Runnable but it can return any type of Object, if we want to get a result/ status from a work (callable) You can return it.

JAVA FUTURE  
 


Java Callable tasks return java.util.concurrent.Future object. Java Future provides cancel() method to cancel the associated Callable task. There is an overloaded version of get() method where we can specify the time to wait for the result, it’s useful to avoid current thread getting blocked for longer time. Please note that get method is synchronous method, until callable finishes it’s task and returns a value it will wait for callable. There are isDone() and isCancelled() methods to find out the current status of associated Callable task.

Example : Suppose the problem is to find sum  1 to 100. We can do it by looping  1 to 100 sequentially and adding them.
Another way we can achieve it by divide and Conquer rule.  Group the numbers in a way so each group has exact two elements. Then Assign those group to a pool of threads
So each thread returns a partial sum parallely. Then collect those partial sum and add them in order to get whole sum.






Callabe & Future understanding by example


Coding :

Step : create a Adder class implement callable to do the Partial sum on group
package com.example.thread.callable;

import java.util.concurrent.Callable;

public class CallableAdder implements Callable<Integer> {

    Integer operand1;
    Integer operand2;
    CallableAdder(Integer operand1,Integer operand2)
    {
        this.operand1=operand1;
        this.operand2=operand2;
      
    }
  
  
    public Integer call() throws Exception {
        // TODO Auto-generated method stub
        System.out.println(Thread.currentThread().getName()+" says : partial Sum for " + operand1 + " and "+ operand2+ " is "  +(operand1+operand2));
        return operand1+operand2;
    }

}

Step 2: Create a manager class which responsible for grouping integers and  submit group to Executor framework for partial add then collect the partial sum,
Wait till all partial sum returns and add them

package com.example.thread.callable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ParallelAdder {
  
  
    public Integer parallelSum()
    {
      
        //long t1 = System.currentTimeMillis();
        ExecutorService executor = Executors.newFixedThreadPool(10);
        List <Future<Integer>> list = new ArrayList<Future<Integer>>();
        int count=1;
        int prev=0;
        for(int i=1;i<=100;i++)
        {
            if(count%2==0)//grouping
            {
                System.out.println("Prev :" + prev + " current: " + i);
                Future<Integer> future = executor.submit(new CallableAdder(prev,i));
                list.add(future);
                count=1;
                continue;
            }
            prev=i;
            count++;
          
        }
        int totsum=0;
        for(Future<Integer> fut : list)
        {
            try {
                totsum = totsum+ fut.get();
            } catch (InterruptedException e) {
              
                e.printStackTrace();
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
      
        System.out.println("Total Sum is " + totsum);
        //long t2 = System.currentTimeMillis();
    //    System.out.println("Time taken by parallelSum " + (t2-t1));
        return totsum;
      
    }
  
  
    public int sequentialSum()
    {
      
        //long t1 = System.currentTimeMillis();
        Integer totsum=0;
        for(int i=0;i<=100;i++)
        {
            totsum=totsum+i;
        }
      
        //long t2 = System.currentTimeMillis();
      
        System.out.println("sequentialSum Total Sum is " + totsum);
        //System.out.println("Time taken by sequentialSum " + (t2-t1));
        return totsum;
    }
  
    public static void main(String[] args) {
      
        ParallelAdder adder = new ParallelAdder();
        int pSum= adder.parallelSum();
        int sSum= adder.sequentialSum();
      
        System.out.println("parallel Sum equals to Sequential Sum ? " );
        System.out.println("Answer is :: "  + (pSum==sSum));
      
      
      
    }

}