In the below example you can find out how to write multi-Threaded framework using ThreadPool and Executor Framework.
===
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorFramework {
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
List<MyThreadTask> opTaskList = new ArrayList<MyThreadTask>();
opTaskList.add(new MyThreadTask(new Configuration("h1")));
opTaskList.add(new MyThreadTask(new Configuration("h2")));
ExecutorService pool = Executors.newFixedThreadPool(opTaskList.size());
List<Future<TaskResult>> futures = new ArrayList<Future<TaskResult>>();
futures = pool.invokeAll(opTaskList);
try {
boolean doneAllThread = checkResults(futures);
if(doneAllThread){
System.out.println("Done all tasks..Shutting down");
pool.shutdownNow();
}
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static boolean checkResults(List<Future<TaskResult>> futures)
throws InterruptedException, ExecutionException{
boolean doneAllThread = true;
TaskResult temp=null;
for (Future<TaskResult> f : futures) {
temp=f.get();
System.out.println("Thread finished. Result - {}"+ temp);
if(null != temp && !temp.isResult() && temp.getThrownEx() != null){
System.out.println(temp.getThrownEx().getStackTrace());
}
doneAllThread &= (temp != null)?temp.isResult():false;
}
return doneAllThread;
}
}
class TaskResult{
private String threadName;
private boolean result;
private Exception thrownEx;
public TaskResult(String threadName, boolean result, Exception thrownEx){
this.threadName=threadName;
this.result=result;
this.thrownEx=thrownEx;
}
public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
public boolean isResult() {
return result;
}
public void setResult(boolean result) {
this.result = result;
}
public Exception getThrownEx() {
return thrownEx;
}
public void setThrownEx(Exception thrownEx) {
this.thrownEx = thrownEx;
}
@Override
public String toString() {
return "TaskResult [threadName=" + threadName + ", result="
+ result + ", thrownEx=" + thrownEx + "]";
}
}
class Configuration{
String hostName;
public Configuration(String hn) {
// TODO Auto-generated constructor stub
this.hostName=hn;
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
}
class MyThreadTask implements Callable<TaskResult>
{
Configuration config;
public MyThreadTask(Configuration temp)
{
this.config=temp;
}
/**
* Call method contains the actual execution flow of the thread.
*/
@Override
public TaskResult call()
throws Exception
{
try{
System.out.println("*****************************");
System.out.println("**Start of Thread of host:"+
Thread.currentThread().getName()+
this.config.getHostName());
System.out.println("*****************************");
System.out.println("Config Data:\n{}\n"+config);
System.out.println("*****************************");
System.out.println("**End of Thread of host:"+
Thread.currentThread().getName()+
config.getHostName());
System.out.println("*****************************");
}catch(Exception ex){
return new TaskResult(Thread.currentThread().getName(),
false,ex);
}
return new TaskResult(Thread.currentThread().getName(),true,null);
}
}
===
===
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorFramework {
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
List<MyThreadTask> opTaskList = new ArrayList<MyThreadTask>();
opTaskList.add(new MyThreadTask(new Configuration("h1")));
opTaskList.add(new MyThreadTask(new Configuration("h2")));
ExecutorService pool = Executors.newFixedThreadPool(opTaskList.size());
List<Future<TaskResult>> futures = new ArrayList<Future<TaskResult>>();
futures = pool.invokeAll(opTaskList);
try {
boolean doneAllThread = checkResults(futures);
if(doneAllThread){
System.out.println("Done all tasks..Shutting down");
pool.shutdownNow();
}
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static boolean checkResults(List<Future<TaskResult>> futures)
throws InterruptedException, ExecutionException{
boolean doneAllThread = true;
TaskResult temp=null;
for (Future<TaskResult> f : futures) {
temp=f.get();
System.out.println("Thread finished. Result - {}"+ temp);
if(null != temp && !temp.isResult() && temp.getThrownEx() != null){
System.out.println(temp.getThrownEx().getStackTrace());
}
doneAllThread &= (temp != null)?temp.isResult():false;
}
return doneAllThread;
}
}
class TaskResult{
private String threadName;
private boolean result;
private Exception thrownEx;
public TaskResult(String threadName, boolean result, Exception thrownEx){
this.threadName=threadName;
this.result=result;
this.thrownEx=thrownEx;
}
public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
public boolean isResult() {
return result;
}
public void setResult(boolean result) {
this.result = result;
}
public Exception getThrownEx() {
return thrownEx;
}
public void setThrownEx(Exception thrownEx) {
this.thrownEx = thrownEx;
}
@Override
public String toString() {
return "TaskResult [threadName=" + threadName + ", result="
+ result + ", thrownEx=" + thrownEx + "]";
}
}
class Configuration{
String hostName;
public Configuration(String hn) {
// TODO Auto-generated constructor stub
this.hostName=hn;
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
}
class MyThreadTask implements Callable<TaskResult>
{
Configuration config;
public MyThreadTask(Configuration temp)
{
this.config=temp;
}
/**
* Call method contains the actual execution flow of the thread.
*/
@Override
public TaskResult call()
throws Exception
{
try{
System.out.println("*****************************");
System.out.println("**Start of Thread of host:"+
Thread.currentThread().getName()+
this.config.getHostName());
System.out.println("*****************************");
System.out.println("Config Data:\n{}\n"+config);
System.out.println("*****************************");
System.out.println("**End of Thread of host:"+
Thread.currentThread().getName()+
config.getHostName());
System.out.println("*****************************");
}catch(Exception ex){
return new TaskResult(Thread.currentThread().getName(),
false,ex);
}
return new TaskResult(Thread.currentThread().getName(),true,null);
}
}
===
No comments:
Post a Comment