如何将优先级线程池为支持的?
摘要:运维在升级,无聊写博客 最近在实现消息通知平台上面,对于针对不同的通知需要设置优先级,实现当通知队列堵塞的时候可以有限推送高优先级的消息。为了保证通知队列的高效并发,通知队列的消费端是采用多线程并发处理的,因此需要实现一个可以实现优先级的多
运维在升级,无聊写博客
最近在实现消息通知平台上面,对于针对不同的通知需要设置优先级,实现当通知队列堵塞的时候可以有限推送高优先级的消息。为了保证通知队列的高效并发,通知队列的消费端是采用多线程并发处理的,因此需要实现一个可以实现优先级的多线程处理逻辑:
对于ThreadPollExecutor来说,
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
如果实现优先级线程池需要注意一下三点
1.线程池中新加入的线程会放到workQueue中,如果是优先级队列,那么该参数必须要是PriorityBlockingQueue。
2.PriorityBlockingQueue容器中最终存储的是FutureTask对象,改对象是newTaskFor实例化的,因此需要实现继承自Comparable的FutureTask实现【例如:ComparableFutureTask】
3.ComparableFutureTask中实现比较线程的优先级,需要将实例化具有优先级的线程对象【例如:PriorityTask】
如上根据上面的点,可参考的代码如下
【PriorityTask】
public abstract class PriorityTask implements Runnable, Comparable<PriorityTask> {
private Integer prority;
public PriorityTask(Integer prority) {
this.prority = prority;
}
@Override
public abstract void run();
@Override
public int compareTo(PriorityTask o) {
return prority.compareTo(o.prority);
}
}
View Code
【带有ComparableFutureTask的PriorityThreadPoolExecutor】
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new ComparableFutureTask<T>(runnable, value);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new ComparableFutureTask<T>(callable);
}
protected class ComparableFutureTask<V>
extends FutureTask<V> implements Comparable<ComparableFutureTask<V>> {
private Object object;
public ComparableFutureTask(Callable<V> callable) {
super(callable);
object = callable;
}
public ComparableFutureTask(Runnable runnable, V result) {
super(runnable, result);
object = runnable;
}
@Override
@SuppressWarnings("unchecked")
public int compareTo(ComparableFutureTask<V> o) {
if (this == o) {
return 0;
}
if (o == null) {
return -1; // high priority
}
if (object != null && o.object != null) {
if (object.getClass().equals(o.object.getClass())) {
if (object instanceof Comparable) {
return ((Comparable) object).compareTo(o.object);
}
}
}
return 0;
}
}
}
View Code
【使用代码如下】
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.wjs.common.config.ConfigUtil;
import com.wjs.message.bo.NotifyMessage;
import com.wjs.message.service.notify.NotifyService;
import com.wjs.message.service.queue.QueueService;
/**
* 消息发送队列,优先级队列实现
*
* @author Silver
* @date 2016年12月20日 下午8:20:45
*
*
*/
@Service("queueService")
public class QueueServicePriorityImpl implements QueueService {
private static final Logger LOGGER = LoggerFactory.getLogger(QueueServicePriorityImpl.class);
private volatile static Queue<NotifyMessage> queue = new PriorityBlockingQueue<NotifyMessage>(100000);
@Autowired
NotifyService notifyService;
/**
* 服务初始化,启动队列消费
*
* @author Silver
* @date 2016年12月21日 上午9:07:20
*/
@PostConstruct
public void init() {
new Thread(new Runnable() {
@Override
public void run() {
Integer execSize = ConfigUtil.getInteger("message.queue.poll.execsize");
if (null == execSize || execSize == 0) {
// 由于任务后续是发送邮件/短信/调用APP推送/调用dubbo的,是非CPU密集型的计算,因此线程数控制在核数 * 3的值
execSize = Double.valueOf(Runtime.getRuntime().availableProcessors() * 3).intValue();
}
LOGGER.info("Queue_Consume_thread_size:{}",execSize);
ExecutorService es = new PriorityThreadPoolExecutor(execSize, execSize,
0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>());
while (true) {
//** poll 移除并返问队列头部的元素 如果队列为空,则返回null
final NotifyMessage message = queue.poll();
if (null != message) {
es.submit(new PriorityTask(message.getPriority()) {
@Override
public void run() {
try {
System.out.println(message);
} catch (Exception e) {
LOGGER.error("MessageQueue-ERROR->:{},", message, e);
}
}
});
}
}
}
}).start();
}
@Override
public boolean push(NotifyMessage message) {
// offer 添加一个元素并返回true 如果队列已满,或者异常情况,则返回false
return queue.offer(message);
}
@Override
public Integer size() {
return queue.size();
}
}
View Code
【单测代码如下】
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import com.wjs.message.bo.NotifyMessage;
import com.wjs.message.bo.NotifyMessageEmail;
import com.wjs.message.bo.NotifyMessagePush;
import com.wjs.message.bo.NotifyMessageSms;
import com.wjs.message.bo.NotifyMessageSys;
import com.wjs.message.service.BaseServiceTest;
public class QueueTestService extends BaseServiceTest{
@Autowired
QueueService queueService;
@Test
public void testPull(){
for (int i = 10000; i > 1; i--) {
NotifyMessage message = new NotifyMessage();
switch (i % 3) {
case 0:
message = new NotifyMessageEmail();
message.setContent("Email"+i);
break;
case 1:
message = new NotifyMessageSys();
message.setContent("Sys"+i);
break;
case 2:
message = new NotifyMessageSms();
message.setContent("Sms"+i);
break;
case 3:
message = new NotifyMessagePush();
message.setContent("Push"+i);
break;
default:
break;
}
message.setContent(i+"");
message.setPriority(i);
queueService.push(message);
}
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
View Code
