`

Java并发编程-Executor

阅读更多

Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。他们的关系为:


 

并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后在提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用内部的线程池完成操作。

一、创建线程池

Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

public static ExecutorService newFixedThreadPool(int nThreads)

创建固定数目线程的线程池。

public static ExecutorService newCachedThreadPool()

创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

public static ExecutorService newSingleThreadExecutor()

创建一个单线程化的Executor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

Java代码 复制代码 收藏代码
  1. Executor executor = Executors.newFixedThreadPool(10);   
  2. Runnable task = new Runnable() {   
  3.     @Override  
  4.     public void run() {   
  5.         System.out.println("task over");   
  6.     }   
  7. };   
  8. executor.execute(task);   
  9.   
  10. executor = Executors.newScheduledThreadPool(10);   
  11. ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;   
  12. scheduler.scheduleAtFixedRate(task, 1010, TimeUnit.SECONDS);  
Executor executor = Executors.newFixedThreadPool(10);
Runnable task = new Runnable() {
	@Override
	public void run() {
		System.out.println("task over");
	}
};
executor.execute(task);

executor = Executors.newScheduledThreadPool(10);
ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;
scheduler.scheduleAtFixedRate(task, 10, 10, TimeUnit.SECONDS);

 二、ExecutorService与生命周期

ExecutorService扩展了Executor并添加了一些生命周期管理的方法。一个Executor的生命周期有三种状态,运行关闭终止 。Executor创建时处于运行状态。当调用ExecutorService.shutdown()后,处于关闭状态,isShutdown()方法返回true。这时,不应该再想Executor中添加任务,所有已添加的任务执行完毕后,Executor处于终止状态,isTerminated()返回true。

如果Executor处于关闭状态,往Executor提交任务会抛出unchecked exception RejectedExecutionException。

Java代码 复制代码 收藏代码
  1. ExecutorService executorService = (ExecutorService) executor;   
  2. while (!executorService.isShutdown()) {   
  3.     try {   
  4.         executorService.execute(task);   
  5.     } catch (RejectedExecutionException ignored) {   
  6.            
  7.     }   
  8. }   
  9. executorService.shutdown();  
ExecutorService executorService = (ExecutorService) executor;
while (!executorService.isShutdown()) {
	try {
		executorService.execute(task);
	} catch (RejectedExecutionException ignored) {
		
	}
}
executorService.shutdown();

 三、使用Callable,Future返回结果

Future<V>代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask<V>实现了Future<V>和Runable<V>。Callable代表一个有返回值得操作。

Java代码 复制代码 收藏代码
  1. Callable<Integer> func = new Callable<Integer>(){   
  2.     public Integer call() throws Exception {   
  3.         System.out.println("inside callable");   
  4.         Thread.sleep(1000);   
  5.         return new Integer(8);   
  6.     }          
  7. };         
  8. FutureTask<Integer> futureTask  = new FutureTask<Integer>(func);   
  9. Thread newThread = new Thread(futureTask);   
  10. newThread.start();   
  11.   
  12. try {   
  13.     System.out.println("blocking here");   
  14.     Integer result = futureTask.get();   
  15.     System.out.println(result);   
  16. catch (InterruptedException ignored) {   
  17. catch (ExecutionException ignored) {   
  18. }  
		Callable<Integer> func = new Callable<Integer>(){
			public Integer call() throws Exception {
				System.out.println("inside callable");
				Thread.sleep(1000);
				return new Integer(8);
			}		
		};		
		FutureTask<Integer> futureTask  = new FutureTask<Integer>(func);
		Thread newThread = new Thread(futureTask);
		newThread.start();
		
		try {
			System.out.println("blocking here");
			Integer result = futureTask.get();
			System.out.println(result);
		} catch (InterruptedException ignored) {
		} catch (ExecutionException ignored) {
		}

 ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。

例子:并行计算数组的和。

Java代码 复制代码 收藏代码
  1. package executorservice;   
  2.   
  3. import java.util.ArrayList;   
  4. import java.util.List;   
  5. import java.util.concurrent.Callable;   
  6. import java.util.concurrent.ExecutionException;   
  7. import java.util.concurrent.ExecutorService;   
  8. import java.util.concurrent.Executors;   
  9. import java.util.concurrent.Future;   
  10. import java.util.concurrent.FutureTask;   
  11.   
  12. public class ConcurrentCalculator {   
  13.   
  14.     private ExecutorService exec;   
  15.     private int cpuCoreNumber;   
  16.     private List<Future<Long>> tasks = new ArrayList<Future<Long>>();   
  17.   
  18.     // 内部类   
  19.     class SumCalculator implements Callable<Long> {   
  20.         private int[] numbers;   
  21.         private int start;   
  22.         private int end;   
  23.   
  24.         public SumCalculator(final int[] numbers, int start, int end) {   
  25.             this.numbers = numbers;   
  26.             this.start = start;   
  27.             this.end = end;   
  28.         }   
  29.   
  30.         public Long call() throws Exception {   
  31.             Long sum = 0l;   
  32.             for (int i = start; i < end; i++) {   
  33.                 sum += numbers[i];   
  34.             }   
  35.             return sum;   
  36.         }   
  37.     }   
  38.   
  39.     public ConcurrentCalculator() {   
  40.         cpuCoreNumber = Runtime.getRuntime().availableProcessors();   
  41.         exec = Executors.newFixedThreadPool(cpuCoreNumber);   
  42.     }   
  43.   
  44.     public Long sum(final int[] numbers) {   
  45.         // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor   
  46.         for (int i = 0; i < cpuCoreNumber; i++) {   
  47.             int increment = numbers.length / cpuCoreNumber + 1;   
  48.             int start = increment * i;   
  49.             int end = increment * i + increment;   
  50.             if (end > numbers.length)   
  51.                 end = numbers.length;   
  52.             SumCalculator subCalc = new SumCalculator(numbers, start, end);   
  53.             FutureTask<Long> task = new FutureTask<Long>(subCalc);   
  54.             tasks.add(task);   
  55.             if (!exec.isShutdown()) {   
  56.                 exec.submit(task);   
  57.             }   
  58.         }   
  59.         return getResult();   
  60.     }   
  61.   
  62.     /**  
  63.      * 迭代每个只任务,获得部分和,相加返回  
  64.      *   
  65.      * @return  
  66.      */  
  67.     public Long getResult() {   
  68.         Long result = 0l;   
  69.         for (Future<Long> task : tasks) {   
  70.             try {   
  71.                 // 如果计算未完成则阻塞   
  72.                 Long subSum = task.get();   
  73.                 result += subSum;   
  74.             } catch (InterruptedException e) {   
  75.                 e.printStackTrace();   
  76.             } catch (ExecutionException e) {   
  77.                 e.printStackTrace();   
  78.             }   
  79.         }   
  80.         return result;   
  81.     }   
  82.   
  83.     public void close() {   
  84.         exec.shutdown();   
  85.     }   
  86. }  
package executorservice;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class ConcurrentCalculator {

	private ExecutorService exec;
	private int cpuCoreNumber;
	private List<Future<Long>> tasks = new ArrayList<Future<Long>>();

	// 内部类
	class SumCalculator implements Callable<Long> {
		private int[] numbers;
		private int start;
		private int end;

		public SumCalculator(final int[] numbers, int start, int end) {
			this.numbers = numbers;
			this.start = start;
			this.end = end;
		}

		public Long call() throws Exception {
			Long sum = 0l;
			for (int i = start; i < end; i++) {
				sum += numbers[i];
			}
			return sum;
		}
	}

	public ConcurrentCalculator() {
		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		exec = Executors.newFixedThreadPool(cpuCoreNumber);
	}

	public Long sum(final int[] numbers) {
		// 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
		for (int i = 0; i < cpuCoreNumber; i++) {
			int increment = numbers.length / cpuCoreNumber + 1;
			int start = increment * i;
			int end = increment * i + increment;
			if (end > numbers.length)
				end = numbers.length;
			SumCalculator subCalc = new SumCalculator(numbers, start, end);
			FutureTask<Long> task = new FutureTask<Long>(subCalc);
			tasks.add(task);
			if (!exec.isShutdown()) {
				exec.submit(task);
			}
		}
		return getResult();
	}

	/**
	 * 迭代每个只任务,获得部分和,相加返回
	 * 
	 * @return
	 */
	public Long getResult() {
		Long result = 0l;
		for (Future<Long> task : tasks) {
			try {
				// 如果计算未完成则阻塞
				Long subSum = task.get();
				result += subSum;
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		return result;
	}

	public void close() {
		exec.shutdown();
	}
}

 Main

Java代码 复制代码 收藏代码
  1. int[] numbers = new int[] { 123456781011 };   
  2. ConcurrentCalculator calc = new ConcurrentCalculator();   
  3. Long sum = calc.sum(numbers);   
  4. System.out.println(sum);   
  5. calc.close();  
int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 };
ConcurrentCalculator calc = new ConcurrentCalculator();
Long sum = calc.sum(numbers);
System.out.println(sum);
calc.close();

 四、CompletionService

在刚在的例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞,如果我们希望任意字任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使CompletionService。生产者submit()执行的任务。使用者take()已完成的任务,并按照完成这些任务的顺序处理它们的结果 。也就是调用CompletionService的take方法是,会返回按完成顺序放回任务的结果,CompletionService内部维护了一个阻塞队列BlockingQueue,如果没有任务完成,take()方法也会阻塞。修改刚才的例子使用CompletionService:

Java代码 复制代码 收藏代码
  1. public class ConcurrentCalculator2 {   
  2.   
  3.     private ExecutorService exec;   
  4.     private CompletionService<Long> completionService;   
  5.   
  6.   
  7.     private int cpuCoreNumber;   
  8.   
  9.     // 内部类   
  10.     class SumCalculator implements Callable<Long> {   
  11.         ......   
  12.     }   
  13.   
  14.     public ConcurrentCalculator2() {   
  15.         cpuCoreNumber = Runtime.getRuntime().availableProcessors();   
  16.         exec = Executors.newFixedThreadPool(cpuCoreNumber);   
  17.         completionService = new ExecutorCompletionService<Long>(exec);   
  18.   
  19.   
  20.     }   
  21.   
  22.     public Long sum(final int[] numbers) {   
  23.         // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor   
  24.         for (int i = 0; i < cpuCoreNumber; i++) {   
  25.             int increment = numbers.length / cpuCoreNumber + 1;   
  26.             int start = increment * i;   
  27.             int end = increment * i + increment;   
  28.             if (end > numbers.length)   
  29.                 end = numbers.length;   
  30.             SumCalculator subCalc = new SumCalculator(numbers, start, end);    
  31.             if (!exec.isShutdown()) {   
  32.                 completionService.submit(subCalc);   
  33.   
  34.   
  35.             }   
  36.                
  37.         }   
  38.         return getResult();   
  39.     }   
  40.   
  41.     /**  
  42.      * 迭代每个只任务,获得部分和,相加返回  
  43.      *   
  44.      * @return  
  45.      */  
  46.     public Long getResult() {   
  47.         Long result = 0l;   
  48.         for (int i = 0; i < cpuCoreNumber; i++) {               
  49.             try {   
  50.                 Long subSum = completionService.take().get();   
  51.                 result += subSum;              
  52.             } catch (InterruptedException e) {   
  53.                 e.printStackTrace();   
  54.             } catch (ExecutionException e) {   
  55.                 e.printStackTrace();   
  56.             }   
  57.         }   
  58.         return result;   
  59.     }   
  60.   
  61.     public void close() {   
  62.         exec.shutdown();   
  63.     }   
  64. }  
public class ConcurrentCalculator2 {

	private ExecutorService exec;
	private CompletionService<Long> completionService;


	private int cpuCoreNumber;

	// 内部类
	class SumCalculator implements Callable<Long> {
		......
	}

	public ConcurrentCalculator2() {
		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		exec = Executors.newFixedThreadPool(cpuCoreNumber);
		completionService = new ExecutorCompletionService<Long>(exec);


	}

	public Long sum(final int[] numbers) {
		// 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
		for (int i = 0; i < cpuCoreNumber; i++) {
			int increment = numbers.length / cpuCoreNumber + 1;
			int start = increment * i;
			int end = increment * i + increment;
			if (end > numbers.length)
				end = numbers.length;
			SumCalculator subCalc = new SumCalculator(numbers, start, end);	
			if (!exec.isShutdown()) {
				completionService.submit(subCalc);


			}
			
		}
		return getResult();
	}

	/**
	 * 迭代每个只任务,获得部分和,相加返回
	 * 
	 * @return
	 */
	public Long getResult() {
		Long result = 0l;
		for (int i = 0; i < cpuCoreNumber; i++) {			
			try {
				Long subSum = completionService.take().get();
				result += subSum;			
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		return result;
	}

	public void close() {
		exec.shutdown();
	}
}

 五、例子HtmlRender

该例子模拟浏览器的Html呈现过程,先呈现文本,再异步下载图片,下载完毕每个图片即显示,见附件eclipse项目htmlreander包。

 

所有代码见附件,Eclipse项目。本文参考《Java并发编程实践 》。

分享到:
评论

相关推荐

    《Java并发编程的艺术》

    《Java并发编程的艺术》内容涵盖Java并发编程机制的底层实现原理、Java内存模型、Java并发编程基础、Java中的锁、并发容器和框架、原子类、并发工具类、线程池、Executor框架等主题,每个主题都做了深入的讲解,同时...

    Java并发编程实战

    1.1 并发简史 1.2 线程的优势 1.2.1 发挥多处理器的强大能力 1.2.2 建模的简单性 1.2.3 异步事件的简化处理 1.2.4 响应更灵敏的用户界面 1.3 线程带来的风险 1.3.1 安全性问题 1.3.2 活跃性问题 1.3.3 ...

    《Java并发编程的艺术》源代码

    Java并发编程的艺术 作者:方腾飞 魏鹏 程晓明 著 丛书名:Java核心技术系列 出版日期 :2015-07-25 ISBN:978-7-111-50824-3 第1章介绍Java并发编程的挑战,向读者说明进入并发编程的世界可能会遇到哪些问题,以及如何...

    java并发编程:Executor、Executors、ExecutorService.docx

    Executor: 一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类,一般来说,...

    Java并发编程的艺术

    , 《Java并发编程的艺术》内容涵盖Java并发编程机制的底层实现原理、Java内存模型、Java并发编程基础、Java中的锁、并发容器和框架、原子类、并发工具类、线程池、Executor框架等主题,每个主题都做了深入的讲解,...

    Java并发编程实践 PDF 高清版

    本书的读者是那些具有一定Java编程经验的程序员、希望了解Java SE 5,6在线程技术上的改进和新特性的程序员,以及Java和并发编程的爱好者。 目录 代码清单 序 第1章 介绍 1.1 并发的(非常)简短历史 1.2 线程的...

    Java 并发编程实战

    1.1 并发简史 1.2 线程的优势 1.2.1 发挥多处理器的强大能力 1.2.2 建模的简单性 1.2.3 异步事件的简化处理 1.2.4 响应更灵敏的用户界面 1.3 线程带来的风险 1.3.1 安全性问题 1.3.2 活跃性问题 1.3.3 ...

    Java并发编程原理与实战

    Executor框架详解.mp4 实战:简易web服务器(一).mp4 实战:简易web服务器(二).mp4 JDK8的新增原子操作类LongAddr原理与使用.mp4 JDK8新增锁StampedLock详解.mp4 重排序问题.mp4 happens-before简单概述.mp4 锁的...

    Java并发编程的艺术_非扫描

    Java并发编程的艺术_非扫描本书特色本书结合JDK的源码介绍了Java并发框架、线程池的实现原理,帮助读者做到知其所以然。本书对原理的剖析不仅仅局限于Java层面,而是深入到JVM,甚至CPU层面来进行讲解,帮助读者从更...

    龙果 java并发编程原理实战

    龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...

    JAVA并发编程实践_中文版(1-16章全)_1/4

    6.2 executor 框架 6.3 寻找可强化的并行性 第7章 取消和关闭 7.1 任务取消 7.2 停止基于线程的服务 7.3 处理反常的线程终止 7.4 jvm关闭 第8章 应用线程池 8.1 任务与执行策略问的隐性耦合 8.2 定制线程池的大小 ...

    Java 7并发编程实战手册

    java7在并发编程方面,带来了很多令人激动的新功能,这将使你的应用程序具备更好的并行任务性能。 《Java 7并发编程实战手册》是Java 7并发编程的实战指南,介绍了Java 7并发API中大部分重要而有用的机制。全书分为9...

    并发编程实践,全面介绍基础知识、JVM同步原语、线程安全、低级并发工具、线程安全容器、高级线程协作工具、Executor部分等

    详细介绍java并发编程相关知识: 基础知识   并发与并行   Java并发演进历史   Java并发模型   线程模型   存储模型 JVM同步原语 volatile CAS 线程安全   保护“共享数据” 低级并发工具   原子变量   锁...

    java 并发编程 多线程

    java 并发 编程 多线程 concurrent lock condition executorserice executor java.util.curcurrent.

    汪文君高并发编程实战视频资源下载.txt

    │ Java并发编程.png │ ppt+源码.rar │ 高并发编程第二阶段01讲、课程大纲及主要内容介绍.wmv │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、...

Global site tag (gtag.js) - Google Analytics