多线程之线程池

"jdk并发包-线程池"

Posted by 青乡 on January 1, 2016

线程池分类

单线程

既然是单线程 为什么还要使用线程池?因为1.单线程2.数据集合。使用数据集合作一个缓冲。

多线程

1.不显式添加数据集合 使用默认数据集合

2.显式添加数据集合
例如 有界队列 配置数据的数量

定时线程池


一般情况下,系统业务较多,会包含多个线程池,分别处理不同的业务类型。

线程池的数据流

数据是被新创建的线程消费,还是被复用线程消费,还是被暂时放到缓存里,还是阻塞添加到有界队列。根据线程数量,分4种情况:
1.线程数量0——》最小线程数量
1)一开始:创建新的线程,线程不复用
2)后期:线程复用

2.最小数量——》有界队列
数据线放到有界队列

3.有界队列满——》最大线程数量
创建新的线程

4.有界队列满,并且最大数量满
阻塞添加数据到有界队列,一旦有界队列数据被消费,立马写数据到有界队列。

注:根据需求不同,可以改变2和3的顺序。jdk线程池默认是1234。
注:最大线程数量如果没配置,默认和最小线程数量一样。

代码示例


//支付-实名认证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/** 10个线程检查异步订单,当队列满了之后,等待 */

	protected static Executor executor = new ThreadPoolExecutor(10//最小线程数量, 10//最大线程数量, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1000 //有界队列),

			new RejectedExecutionHandler() { //有界队列满,阻塞写数据到有界队列

				@Override

				public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

					if (!executor.isShutdown()) {

						try {

							executor.getQueue().put(r);

						} catch (InterruptedException e) {

							log.error(e.getMessage(), e);

						}

					}

				}

			});


//支付-处理订单

1
2
3
4
5
// 订单查询普通银行线程

	public static ExecutorService checkExeService = Executors.newFixedThreadPool(6 //最小线程数量);



//支付-结算

1
2
3
private static ExecutorService executorService = Executors.newFixedThreadPool(3 //最小线程数量); 



//支付-清除缓存数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);



public void init(){

		//每3分钟执行一次 

		executor.scheduleWithFixedDelay(new CheckListener(), SCHEDULE_INIT,SCHEDULE_TIME, TimeUnit.MINUTES);

	}







即时通讯

1
2
3
4
5
int minSize = properties.getPropertyAsIntWithDefault("threadPool.minSize", 50);
			int maxSize = properties.getPropertyAsIntWithDefault("threadPool.maxSize", 200);
			int queueSize = properties.getPropertyAsIntWithDefault("threadPool.queueSize", 100000);
			int keepAlive = properties.getPropertyAsIntWithDefault("threadPool.keepAlive", 60);
			XXThreadPool.init(minSize, maxSize, queueSize, keepAlive);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
import java.util.concurrent.BlockingQueue;

import java.util.concurrent.Callable;

import java.util.concurrent.Future;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.RejectedExecutionHandler;

import java.util.concurrent.ThreadFactory;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;



import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import com.google.common.util.concurrent.ThreadFactoryBuilder;



/**

 * JDK默认线程池:CoreSize-->Core用完了,进队列-->队列满了,开新线程直到MaxSize-->MaxSize也到了,执行Reject策略

 * 我们希望的线程池:CoreSize-->Core用完了,开新线程直到MaxSize-->MaxSize也到了,进队列-->队列满了,执行Reject策略

 * 

 * 参照tomcat的线程池实现思路,实现了XX线程池

 * 

 * https://segmentfault.com/a/1190000008052008

 *

 */

public class XXThreadPool {

	private final static Logger logger = LoggerFactory.getLogger(XXThreadPool.class);



	private static final TimeUnit unit = TimeUnit.SECONDS;// keepAliveTime 的单位

	private static ThreadPoolExecutor onlinePoolExecutor = null;

	private static ThreadPoolExecutor offlinePoolExecutor = null;

	private static XXThreadPool XXThreadPool=null;

	

    private XXThreadPool(){

    }

	

	/**根据application.xml中的配置初始化线程池大小

	 * 初始化线程池

	 * @param minSize:最小线程数,空闲时,多余的线程自动销毁,收缩成最小值

	 * @param maxSize:最大线程数

	 * @param queueSize:最大队列长度

	 * @param keepAlive:最小线程数之外的线程的最大空闲时间,单位为秒

	 */

	public static void init(int minSize, int maxSize, int queueSize //有界队列大小10万个数据,大型企业员工规模10万人以上, int keepAlive //空闲时间1分钟,如果1分钟之内空闲,那么被销毁) {



		if (XXThreadPool == null) {

			synchronized (XXThreadPool.class) {

				if (XXThreadPool == null) {



					XXThreadPool = new XXThreadPool();

					// TaskQueue workQueue = new TaskQueue(queueSize);

					// 创建一个有界队列

					BlockingQueue<Runnable> onWorkQueue = new LinkedBlockingQueue<Runnable>(queueSize);

					

					BlockingQueue<Runnable> offWorkQueue = new LinkedBlockingQueue<Runnable>(queueSize);



					// 丢弃任务并抛出RejectedExecutionException异常

					RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();

					// 创建线程工厂,自定义线程池名称

					ThreadFactory onlineFactory = new ThreadFactoryBuilder().setNameFormat("XXOnlinePool#%d-").build();



					ThreadFactory offlineFactory = new ThreadFactoryBuilder().setNameFormat("XXOfflinePool#%d-").build();



					// 构造线程池

					onlinePoolExecutor = new ThreadPoolExecutor(minSize, maxSize, keepAlive, unit, onWorkQueue, onlineFactory, handler);



					offlinePoolExecutor = new ThreadPoolExecutor(minSize, maxSize, keepAlive, unit, offWorkQueue, offlineFactory, handler);



				}

			}

		}

	}

	/**

	 * 任务执行的方法

	 * @param task

	 */

	public static void execute(Runnable task) {

		int queSize = onlinePoolExecutor.getQueue().size();

		if(queSize >= 5000 && queSize < 20000){

			logger.info("onlinePool Queue Size is {}",queSize);

		}else if(queSize >= 20000){

			logger.warn("onlinePool Queue Size is {}",queSize);

		}

		onlinePoolExecutor.execute(task);

	}

	

	/**

	 * 任务执行的方法

	 * @param task

	 */

	public static void exeOfflineTask(Runnable task) {

		int queSize = offlinePoolExecutor.getQueue().size();

		if(queSize >= 5000 && queSize < 20000){

			logger.info("offlinePool Queue Size is {}",queSize);

		}else if(queSize >= 20000){

			logger.warn("offlinePool Queue Size is {}",queSize);

		}

		offlinePoolExecutor.execute(task);

	}

	

	/**

	 * 任务执行的方法

	 * @param <T>

	 * @param task

	 */

	public static <T> Future<T> execute(Callable<T> task) {

		int queSize = onlinePoolExecutor.getQueue().size();

		if(queSize >= 5000 && queSize < 20000){

			logger.info("onlinePool Queue Size is {}",queSize);

		}else if(queSize >= 20000){

			logger.warn("onlinePool Queue Size is {}",queSize);

		}

		return onlinePoolExecutor.submit(task);

	}

	

	/**

	 * 返回线程池中线程的总数

	 * @return

	 */

	public static int getPoolSize(){

		

		return onlinePoolExecutor.getPoolSize();

		

	}



	/**

	 * 返回当前处于工作的线程数

	 * @return

	 */

	public static int getActiveThreadCount(){

		

		return onlinePoolExecutor.getActiveCount();

	}

	/**

	 * 返回当前任务队列中未处理的任务数

	 * @return

	 */

	public static int getQueueSize(){

		return onlinePoolExecutor.getQueue().size();

	}

	

/*	public static void main(String[] argv) throws InterruptedException{

		//init(5,10,50,60);

		

		

	}*/

	

}




线程池的线程数量

1.数量

cpu + 1 //cpu密集型

2*cpu + 1 //io密集型

2.依据

1)cpu密集型

因为cpu已经很繁忙,基本上每个cpu都被一个线程占用,所以线程数量宜小,线程数量只需要比cpu数量多1就好,避免某个线程出问题导致有一个cpu闲置。

2)io密集型

因为io很繁忙,很多cpu可能在等待io完成,所以线程数量宜大。

线程数量一般都是个位数,目标是使得每个cpu转起来,不让cpu处于闲置。但是,线程数量不能太多,因为会导致线程上下文切换,cpu切换线程上下文开销很大。


cpu数量可以动态获取

public class ThreadPool {

1
2
3
4
5
6
7
public static void main(String[] args) {

	Runtime runtime = Runtime.getRuntime();

	System.out.println(runtime.availableProcessors()); //6核,输出12

}

}


参考

https://www.cnblogs.com/dennyzhangdd/p/6909771.html

监控

如何判断线程是否闲置

线程分类
1.线程池的线程
2.工作线程集合Worker
3.非工作线程集合的线程就是闲置线程
如果配置了线程池闲置时间,那么闲置线程会被销毁。
线程池会对每个非工作线程进行时间标记,超时就销毁。

如何获取活跃线程数量

有方法可以调用。


方法具体如何实现?
从Worker里取。 Worker里的线程就是正在执行任务的线程,线程执行完毕,就从Worker删除,但是没有从线程池删除。

参考

https://juejin.im/entry/58fada5d570c350058d3aaad