1.多线程执行任务类
package com.visy.threadpool;
import com.visy.executor.ExecutorFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ThreadPoolConfig {
private TheadPoolProperties theadPoolProperties;
private ThreadPoolExecutor executor;
private ThreadPoolExecutor executorChild;
public ThreadPoolConfig(TheadPoolProperties theadPoolProperties) {
this.theadPoolProperties = theadPoolProperties;
this.executor = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize());
this.executorChild = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service-child", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize());
}
public List doConcurrentTask(List> taskList, ThreadPoolExecutor... executorChilds) {
if (taskList != null && !taskList.isEmpty()) {
List resultList = new ArrayList();
List futureList = null;
try {
if (this.executor.getQueue().size() >= this.theadPoolProperties.getQueueSize()) {
throw new RuntimeException("queue size bigger than 100, now size is " + this.executor.getQueue().size());
}
if (executorChilds != null && executorChilds.length > 0 && executorChilds[0] != null) {
futureList = executorChilds[0].invokeAll(taskList);
} else {
futureList = this.executor.invokeAll(taskList, (long)this.theadPoolProperties.getTimeOut(), TimeUnit.SECONDS);
}
} catch (InterruptedException var6) {
var6.printStackTrace();
}
this.doFutureList(resultList, futureList);
return resultList;
} else {
return null;
}
}
void doFutureList(List resultList, List> futureList) {
if (futureList != null) {
Iterator var3 = futureList.iterator();
while(var3.hasNext()) {
Future future = (Future)var3.next();
try {
resultList.add(future.get());
} catch (ExecutionException | InterruptedException var6) {
var6.printStackTrace();
}
}
}
}
public void doVoidConcurrentTask(List> taskList) {
if (taskList != null && !taskList.isEmpty()) {
Iterator var2 = taskList.iterator();
while(var2.hasNext()) {
Callable call = (Callable)var2.next();
this.executor.submit(call);
}
}
}
public TheadPoolProperties getTheadPoolProperties() {
return this.theadPoolProperties;
}
public ThreadPoolExecutor getExecutor() {
return this.executor;
}
public ThreadPoolExecutor getExecutorChild() {
return this.executorChild;
}
public void setTheadPoolProperties(TheadPoolProperties theadPoolProperties) {
this.theadPoolProperties = theadPoolProperties;
}
public void setExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
}
public void setExecutorChild(ThreadPoolExecutor executorChild) {
this.executorChild = executorChild;
}
public boolean equals(Object o) {
if (o == this) {
return true;
} else if (!(o instanceof ThreadPoolConfig)) {
return false;
} else {
ThreadPoolConfig other = (ThreadPoolConfig)o;
if (!other.canEqual(this)) {
return false;
} else {
label47: {
Object this$theadPoolProperties = this.getTheadPoolProperties();
Object other$theadPoolProperties = other.getTheadPoolProperties();
if (this$theadPoolProperties == null) {
if (other$theadPoolProperties == null) {
break label47;
}
} else if (this$theadPoolProperties.equals(other$theadPoolProperties)) {
break label47;
}
return false;
}
Object this$executor = this.getExecutor();
Object other$executor = other.getExecutor();
if (this$executor == null) {
if (other$executor != null) {
return false;
}
} else if (!this$executor.equals(other$executor)) {
return false;
}
Object this$executorChild = this.getExecutorChild();
Object other$executorChild = other.getExecutorChild();
if (this$executorChild == null) {
if (other$executorChild != null) {
return false;
}
} else if (!this$executorChild.equals(other$executorChild)) {
return false;
}
return true;
}
}
}
protected boolean canEqual(Object other) {
return other instanceof ThreadPoolConfig;
}
public int hashCode() {
int PRIME = true;
int result = 1;
Object $theadPoolProperties = this.getTheadPoolProperties();
int result = result * 59 + ($theadPoolProperties == null ? 43 : $theadPoolProperties.hashCode());
Object $executor = this.getExecutor();
result = result * 59 + ($executor == null ? 43 : $executor.hashCode());
Object $executorChild = this.getExecutorChild();
result = result * 59 + ($executorChild == null ? 43 : $executorChild.hashCode());
return result;
}
public String toString() {
return "ThreadPoolConfig(theadPoolProperties=" + this.getTheadPoolProperties() + ", executor=" + this.getExecutor() + ", executorChild=" + this.getExecutorChild() + ")";
}
} 2.执行器工厂类
package com.visy.executor;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExecutorFactory {
private static final Logger logger = LoggerFactory.getLogger(ExecutorFactory.class);
private static final Map threadPoolExecutorMap = new ConcurrentHashMap();
private static final int DEFAULT_QUEUE_SIZE = 1000;
private static final String DEFAULT_EXECUTOR_NAME = "default-executor";
private static final int MAX_THREAD_NUM = 100;
private static final int CORE_THREAD_NUM = 1;
private static volatile ExecutorFactory instance;
private ExecutorFactory() {
}
public static ExecutorFactory getInstance() {
if (instance == null) {
Class var0 = ExecutorFactory.class;
synchronized(ExecutorFactory.class) {
if (instance == null) {
instance = new ExecutorFactory();
}
}
}
return instance;
}
public ThreadPoolExecutor getThreadPoolExecutorByName(String name) {
return (ThreadPoolExecutor)threadPoolExecutorMap.get(name);
}
public static Map getThreadPoolExecutorMap() {
return threadPoolExecutorMap;
}
public ThreadPoolExecutor getThreadPoolExecutor(String threadPoolExecutorName, int queueSize, int coreThreadNum, int maxPoolSize) {
if (StringUtils.isBlank(threadPoolExecutorName)) {
throw new IllegalArgumentException("thread name empty");
} else {
if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) {
Class var5 = ExecutorFactory.class;
synchronized(ExecutorFactory.class) {
if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) {
ThreadPoolExecutor executor = (new ThreadPool(coreThreadNum, maxPoolSize, 30L, queueSize, threadPoolExecutorName)).getExecutor();
threadPoolExecutorMap.put(threadPoolExecutorName, executor);
logger.info("thread name: {} executor created", threadPoolExecutorName);
}
}
}
return (ThreadPoolExecutor)threadPoolExecutorMap.get(threadPoolExecutorName);
}
}
public void submit(T t) {
ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor();
defaultExecutor.submit(t);
}
public void submit(String poolName, T t) {
ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName);
if (executor == null) {
logger.error("thread name: {} executor not exist.", poolName);
throw new IllegalArgumentException("thread name:" + poolName + " executor not exist.");
} else {
executor.submit(t);
}
}
public > Future 3.多线程配置类
package com.visy.threadpool;
import javax.validation.constraints.NotNull;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.annotation.Validated;
@Validated
@Configuration
@ConfigurationProperties(prefix = "visy.threadpool")
public class TheadPoolProperties {
// 执行并行任务时,等待多久时间超时(单位:秒)
@NotNull
private Integer timeOut;
// 队列大小
@NotNull
private Integer queueSize;
// 核心线程数量
@NotNull
private Integer coreThreadNum;
// 线程池最大线程数量
@NotNull
private Integer maxPoolSize;
// 并行执行每组大小
private Integer groupSize = 20;
public TheadPoolProperties() {
}
public Integer getTimeOut() {
return this.timeOut;
}
public Integer getQueueSize() {
return this.queueSize;
}
public Integer getCoreThreadNum() {
return this.coreThreadNum;
}
public Integer getMaxPoolSize() {
return this.maxPoolSize;
}
public Integer getGroupSize() {
return this.groupSize;
}
public void setTimeOut(Integer timeOut) {
this.timeOut = timeOut;
}
public void setQueueSize(Integer queueSize) {
this.queueSize = queueSize;
}
public void setCoreThreadNum(Integer coreThreadNum) {
this.coreThreadNum = coreThreadNum;
}
public void setMaxPoolSize(Integer maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public void setGroupSize(Integer groupSize) {
this.groupSize = groupSize;
}
public boolean equals(Object o) {
if (o == this) {
return true;
} else if (!(o instanceof TheadPoolProperties)) {
return false;
} else {
TheadPoolProperties other = (TheadPoolProperties)o;
if (!other.canEqual(this)) {
return false;
} else {
label71: {
Object this$timeOut = this.getTimeOut();
Object other$timeOut = other.getTimeOut();
if (this$timeOut == null) {
if (other$timeOut == null) {
break label71;
}
} else if (this$timeOut.equals(other$timeOut)) {
break label71;
}
return false;
}
Object this$queueSize = this.getQueueSize();
Object other$queueSize = other.getQueueSize();
if (this$queueSize == null) {
if (other$queueSize != null) {
return false;
}
} else if (!this$queueSize.equals(other$queueSize)) {
return false;
}
label57: {
Object this$coreThreadNum = this.getCoreThreadNum();
Object other$coreThreadNum = other.getCoreThreadNum();
if (this$coreThreadNum == null) {
if (other$coreThreadNum == null) {
break label57;
}
} else if (this$coreThreadNum.equals(other$coreThreadNum)) {
break label57;
}
return false;
}
Object this$maxPoolSize = this.getMaxPoolSize();
Object other$maxPoolSize = other.getMaxPoolSize();
if (this$maxPoolSize == null) {
if (other$maxPoolSize != null) {
return false;
}
} else if (!this$maxPoolSize.equals(other$maxPoolSize)) {
return false;
}
Object this$groupSize = this.getGroupSize();
Object other$groupSize = other.getGroupSize();
if (this$groupSize == null) {
if (other$groupSize == null) {
return true;
}
} else if (this$groupSize.equals(other$groupSize)) {
return true;
}
return false;
}
}
}
protected boolean canEqual(Object other) {
return other instanceof TheadPoolProperties;
}
public int hashCode() {
int PRIME = true;
int result = 1;
Object $timeOut = this.getTimeOut();
int result = result * 59 + ($timeOut == null ? 43 : $timeOut.hashCode());
Object $queueSize = this.getQueueSize();
result = result * 59 + ($queueSize == null ? 43 : $queueSize.hashCode());
Object $coreThreadNum = this.getCoreThreadNum();
result = result * 59 + ($coreThreadNum == null ? 43 : $coreThreadNum.hashCode());
Object $maxPoolSize = this.getMaxPoolSize();
result = result * 59 + ($maxPoolSize == null ? 43 : $maxPoolSize.hashCode());
Object $groupSize = this.getGroupSize();
result = result * 59 + ($groupSize == null ? 43 : $groupSize.hashCode());
return result;
}
public String toString() {
return "TheadPoolProperties(timeOut=" + this.getTimeOut() + ", queueSize=" + this.getQueueSize() + ", coreThreadNum=" + this.getCoreThreadNum() + ", maxPoolSize=" + this.getMaxPoolSize() + ", groupSize=" + this.getGroupSize() + ")";
}
}4.列表拆分工具类
package com.visy.utils; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.jar.Attributes; /** * 列表或数组按指定大小分组,用于批量取一部分数据循环处理 * */ public class ArraySplitUtil{ /** * 按指定大小对列表分组 * @param list * @param splitSize * @return */ public List > splistList(List
list, int splitSize) { if (null == list || list.size() == 0) { return null; } int listSize = list.size(); List > newList = new ArrayList<>(); if (listSize < splitSize) { newList.add(list); return newList; } int addLength = splitSize; int times = listSize / splitSize; if (listSize % splitSize != 0) { times += 1; } int start = 0; int end = 0; int last = times - 1; for (int i = 0; i < times; i++) { start = i * splitSize; if (i < last) { end = start + addLength; } else { end = listSize; } newList.add(list.subList(start, end)); } return newList; } /** * 按指定大小对数组分组 * @param array * @param splitSize * @return */ public List
splistArray(T[] array, int splitSize) { if (null == array) { return null; } int listSize = array.length; List newList = new ArrayList<>(); if (listSize < splitSize) { newList.add(array); return newList; } int addLength = splitSize; int times = listSize / splitSize; if (listSize % splitSize != 0) { times += 1; } int start = 0; int end = 0; int last = times - 1; for (int i = 0; i < times; i++) { start = i * splitSize; if (i < last) { end = start + addLength; } else { end = listSize; } newList.add(Arrays.copyOfRange(array, start, end)); } return newList; } public static ArraySplitUtil build(){ return new ArraySplitUtil<>(); } }
5.多任务执行助手类
小邮包-包月订购包年服务网,该程序由好买卖商城开发,程序采用PHP+MYSQL架设,程序商业模式为目前最为火爆的包月订制包年服务模式,这种包年订购在国外网站已经热火很多年了,并且已经发展到一定规模,像英国的男士用品网站BlackSocks,一年的袜子购买量更是达到了1000万双。功能:1、实现多产品上线,2、不用注册也可以直接下单购买,3、集成目前主流支付接口,4、下单发货均有邮件提醒。
立即学习“Java免费学习笔记(深入)”;
package com.visy.helper;
import com.baomidou.mybatisplus.toolkit.CollectionUtils;
import com.google.common.collect.Lists;
import com.visy.utils.ArraySplitUtil;
import com.visy.threadpool.ThreadPoolConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 多任务助手
* @author visy.wang
* @date 2022/5/9 14:38
*/
@Service
public class MultiTaskHelper {
@Autowired
private ThreadPoolConfig threadPoolConfig;
private static final Map> ArraySplitUtilCache = new ConcurrentHashMap<>();
public List> createAndRunListTask(List list, Function handler){
return createAndRunListTask(list, null, handler);
}
public List> createAndRunListTaskV2(List list, Function, List> handler){
return createAndRunListTask(list, handler, null);
}
public void createAndRunListTaskWithoutReturn(List list, Consumer handler){
createAndRunListTaskWithoutReturn(list, null, handler);
}
public void createAndRunListTaskWithoutReturnV2(List list, Consumer> handler){
createAndRunListTaskWithoutReturn(list, handler, null);
}
/**
* 把列表按线程数分组
* @param list 列表
* @return 分组后的列表
*/
@SuppressWarnings("unchecked")
private List> listSplit(List list){
String key = list.get(0).getClass().getName();
int groupSize = threadPoolConfig.getTheadPoolProperties().getGroupSize();
ArraySplitUtil arraySplitUtil = (ArraySplitUtil)ArraySplitUtilCache.get(key);
if(Objects.isNull(arraySplitUtil)){
arraySplitUtil = ArraySplitUtil.build();
ArraySplitUtilCache.put(key, arraySplitUtil);
}
return arraySplitUtil.splistList(list, groupSize);
}
/**
* 创建并运行多任务
* @param list 输入数据列表
* @param handler1 处理器1 (优先级使用)
* @param handler2 处理器2
* @param 输入数据类型
* @param 输出数据类型
* @return 执行结果分组列表
*/
private List> createAndRunListTask(List list, Function, List> handler1, Function handler2){
List> listGroup = listSplit(list);
//设定每个组的任务
List>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size());
listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> {
taskList.add(() -> {
if(Objects.nonNull(handler1)){
return handler1.apply(subList);
}else if(Objects.nonNull(handler2)){
return subList.stream().map(handler2).collect(Collectors.toList());
}else{
return null;
}
});
});
return threadPoolConfig.doConcurrentTask(taskList);
}
/**
* 创建并运行多任务(无返回结果)
* @param list 输入数据列表
* @param handler1 处理器1 (优先级更高)
* @param handler2 处理器2
* @param 输入数据类型
*/
private void createAndRunListTaskWithoutReturn(List list, Consumer> handler1, Consumer handler2){
List> listGroup = listSplit(list);
//设定每个组的任务
List>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size());
listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> {
taskList.add(() -> {
if(Objects.nonNull(handler1)){
handler1.accept(subList);
}else if(Objects.nonNull(handler2)){
subList.forEach(handler2);
}
return null;
});
});
threadPoolConfig.doConcurrentTask(taskList);
}
}
6.多任务助手使用:
@Autowired
package com.zoom.fleet.schedule.service;
import com.visy.helper.MultiTaskHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* 多任务助手使用示例
* @author visy.wang
* @date 2022/5/13 14:11
*/
@Service
public class MultiTaskTest {
@Autowired
private MultiTaskHelper multiTaskHelper;
private void test(){
//待多任务执行的数据列表
List idList = new ArrayList<>();
//1.有返回结果的执行方式一, 定义单个数据的处理逻辑,返回多任务执行结果和合集
List> resultList = multiTaskHelper.createAndRunListTask(idList, id->{
//每一项数据的业务代码
return Long.valueOf(id);
});
//2.有返回结果的执行方式二, 定义单个数线程的处理逻辑,返回多任务执行结果和合集
resultList = multiTaskHelper.createAndRunListTaskV2(idList, subIdList->{
//每一个线程下列表操作的业务代码
return subIdList.stream().map(id->{
//每一项数据的业务代码
return Long.valueOf(id);
}).collect(Collectors.toList());
});
//3.无返回结果的执行方式一, 定义单个数据的处理逻辑
multiTaskHelper.createAndRunListTaskWithoutReturn(idList, id->{
//每一项数据的业务代码...
});
//3.无返回结果的执行方式一, 定义单个数据的处理逻辑
multiTaskHelper.createAndRunListTaskWithoutReturnV2(idList, subIdList->{
subIdList.forEach(id->{
//每一项数据的业务代码...
});
//继续操作subIdList...
});
}
}










