优化异步任务

This commit is contained in:
zhouhao
2017-09-01 15:40:11 +08:00
parent cd458f7009
commit 45c5299cb1
15 changed files with 470 additions and 89 deletions

View File

@@ -0,0 +1,24 @@
# 异步任务工具,支持多线程事务
```xml
<dependency>
<groupId>org.hswebframework.web</groupId>
<artifactId>hsweb-concurrent-async-job</artifactId>
<version>${project.verion}</version>
</dependency>
```
```java
@Autowired
private AsyncJobService asyncJobService;
public void testJob(){
List<Object> results= asyncJobService.batch()
.submit(()->...) //提交job
.submit(()->...) //提交另外一个job
.submit(()->...,true) //提交支持事务的job
.getResult();
}
```

View File

@@ -11,5 +11,42 @@
<artifactId>hsweb-concurrent-async-job</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.hswebframework.web</groupId>
<artifactId>hsweb-tests</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,41 @@
package org.hswebframework.web.async;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* TODO 完成注释
*
* @author zhouhao
*/
public class AsyncJobException extends RuntimeException {
List<Exception> errors;
public AsyncJobException(List<Exception> errors) {
this.errors = errors;
}
public AsyncJobException(String message, List<Exception> errors) {
super(message);
this.errors = errors;
}
public List<Exception> getErrors() {
return errors;
}
@Override
public StackTraceElement[] getStackTrace() {
if (null != errors) {
List<StackTraceElement> stackTraceElements = errors.stream()
.map(Exception::getStackTrace)
.flatMap(Stream::of)
.collect(Collectors.toList());
stackTraceElements.addAll(Arrays.asList(super.getStackTrace()));
return stackTraceElements.toArray(new StackTraceElement[stackTraceElements.size()]);
}
return super.getStackTrace();
}
}

View File

@@ -0,0 +1,49 @@
package org.hswebframework.web.async;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* TODO 完成注释
*
* @author zhouhao
*/
@Configuration
@ConditionalOnMissingBean(AsyncJobService.class)
public class AsyncJobServiceAutoConfiguration {
@Value("${hsweb.async.job.maxThreadPoolSize:-1}")
private int maxThreadPoolSize = -1;
@Bean
@ConditionalOnMissingBean(ExecutorService.class)
public ExecutorService executorService() {
if (maxThreadPoolSize == -1) {
maxThreadPoolSize = Runtime.getRuntime().availableProcessors() * 50;
}
return Executors.newFixedThreadPool(maxThreadPoolSize);
}
@Bean
@ConditionalOnMissingBean(TransactionSupportJobWrapper.class)
public TransactionSupportJobWrapper transactionSupportJobWrapper() {
return new SpringTransactionSupportJobWrapper();
}
@Bean
public AsyncJobService asyncJobService(ExecutorService executorService, TransactionSupportJobWrapper transactionSupportJobWrapper) {
TransactionSupportAsyncJobService asyncJobService = new TransactionSupportAsyncJobService();
asyncJobService.setTranslationSupportJobWrapper(transactionSupportJobWrapper);
asyncJobService.setExecutorService(executorService);
return asyncJobService;
}
}

View File

@@ -4,13 +4,17 @@ import java.util.List;
import java.util.concurrent.Callable;
/**
* TODO 完成注释
*
* @author zhouhao
*/
public interface BatchAsyncJobContainer {
<V> BatchAsyncJobContainer submit(Callable<V> callable);
default <V> BatchAsyncJobContainer submit(Callable<V> callable) {
submit(callable, false);
return this;
}
<V> BatchAsyncJobContainer submit(Callable<V> callable, boolean enableTransaction);
List<Object> getResult() throws Exception;
}

View File

@@ -0,0 +1,52 @@
package org.hswebframework.web.async;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.concurrent.Callable;
/**
* TODO 完成注释
*
* @author zhouhao
*/
public class SpringTransactionSupportJob<V> implements TransactionSupportJob<V> {
private TransactionStatus transactionStatus;
private TransactionTemplate transactionTemplate;
private boolean rollback = false;
private Callable<V> target;
public SpringTransactionSupportJob(TransactionTemplate transactionTemplate, Callable<V> job) {
this.transactionTemplate = transactionTemplate;
this.target = job;
}
@Override
public void rollBackOnly() {
rollback = true;
}
@Override
public void commit() {
//do noting
if (transactionStatus != null) {
if (rollback) {
transactionTemplate.getTransactionManager().rollback(transactionStatus);
} else {
transactionTemplate.getTransactionManager().commit(transactionStatus);
}
}
}
@Override
public V call() throws Exception {
transactionStatus = transactionTemplate.getTransactionManager().getTransaction(transactionTemplate);
if (rollback) transactionStatus.setRollbackOnly();
return target.call();
}
}

View File

@@ -0,0 +1,24 @@
package org.hswebframework.web.async;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.concurrent.Callable;
/**
* @author zhouhao
*/
public class SpringTransactionSupportJobWrapper implements TransactionSupportJobWrapper {
private TransactionTemplate transactionTemplate;
@Autowired
public void setTransactionTemplate(TransactionTemplate transactionTemplate) {
this.transactionTemplate = transactionTemplate;
}
@Override
public <V> TransactionSupportJob<V> wrapper(Callable<V> callable) {
return new SpringTransactionSupportJob<>(transactionTemplate, callable);
}
}

View File

@@ -0,0 +1,114 @@
package org.hswebframework.web.async;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* @author zhouhao
*/
public class TransactionBatchAsyncJobContainer implements BatchAsyncJobContainer {
private ExecutorService executorService;
private TransactionSupportJobWrapper translationSupportJobWrapper;
private static final Logger logger = LoggerFactory.getLogger(TransactionBatchAsyncJobContainer.class);
public TransactionBatchAsyncJobContainer(ExecutorService executorService, TransactionSupportJobWrapper translationSupportJobWrapper) {
this.executorService = executorService;
this.translationSupportJobWrapper = translationSupportJobWrapper;
}
private List<Exception> exceptions = new ArrayList<>();
private AtomicInteger failCounter = new AtomicInteger();
private AtomicInteger transactionJobOverCounter = new AtomicInteger();
private CountDownLatch countDownLatch = new CountDownLatch(1);
private List<Future> futures = new ArrayList<>();
private int transactionJobNumber = 0;
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public <V> BatchAsyncJobContainer submit(Callable<V> callable, boolean enableTransaction) {
if (!enableTransaction) {
if (logger.isDebugEnabled()) {
logger.debug("submit not transaction support job {}", transactionJobNumber);
}
futures.add(executorService.submit(callable));
return this;
}
transactionJobNumber++;
if (logger.isDebugEnabled()) {
logger.debug("submit transaction support job {}", transactionJobNumber);
}
int tmpJobFlag = transactionJobNumber;
TransactionSupportJob<V> translationJob = translationSupportJobWrapper.wrapper(callable);
Callable<V> proxy = () -> {
V value = null;
try {
if (failCounter.get() > 0) {
return null;
}
value = translationJob.call();
transactionJobOverCounter.incrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("transaction support job {} success,wait...", tmpJobFlag);
}
//等待其他任务完成
countDownLatch.await();
if (failCounter.get() > 0) {
if (logger.isDebugEnabled()) {
logger.debug("transaction support job {} success,but other job failed, do rollback only!", tmpJobFlag);
}
translationJob.rollBackOnly();
} else {
if (logger.isDebugEnabled()) {
logger.debug("transaction support job {} success,commit.", tmpJobFlag);
}
}
translationJob.commit();
} catch (Exception e) {
exceptions.add(e);
failCounter.incrementAndGet();
transactionJobOverCounter.incrementAndGet();
logger.warn("transaction support job {} fail.", tmpJobFlag, e);
}
return value;
};
futures.add(executorService.submit(proxy));
return this;
}
@Override
public List<Object> getResult() throws Exception {
while (transactionJobOverCounter.get() != transactionJobNumber) {
Thread.sleep(50);
}
countDownLatch.countDown();
if (!exceptions.isEmpty()) {
throw new AsyncJobException(exceptions);
}
return futures.stream().map(this::getValue).collect(Collectors.toList());
}
private Object getValue(Future future) {
try {
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,30 @@
package org.hswebframework.web.async;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.ExecutorService;
/**
* @author zhouhao
*/
public class TransactionSupportAsyncJobService implements AsyncJobService {
private ExecutorService executorService;
private TransactionSupportJobWrapper translationSupportJobWrapper;
@Autowired(required = false)
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
@Autowired
public void setTranslationSupportJobWrapper(TransactionSupportJobWrapper translationSupportJobWrapper) {
this.translationSupportJobWrapper = translationSupportJobWrapper;
}
@Override
public BatchAsyncJobContainer batch() {
return new TransactionBatchAsyncJobContainer(executorService, translationSupportJobWrapper);
}
}

View File

@@ -7,8 +7,9 @@ import java.util.concurrent.Callable;
*
* @author zhouhao
*/
public interface TranslationSupportJob<V> extends Callable<V> {
public interface TransactionSupportJob<V> extends Callable<V> {
void rollBack();
void rollBackOnly();
void commit();
}

View File

@@ -7,6 +7,6 @@ import java.util.concurrent.Callable;
*
* @author zhouhao
*/
public interface TranslationSupportJobWrapper {
<V> TranslationSupportJob<V> wrapper(Callable<V> callable);
public interface TransactionSupportJobWrapper {
<V> TransactionSupportJob<V> wrapper(Callable<V> callable);
}

View File

@@ -1,82 +0,0 @@
package org.hswebframework.web.async;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* @author zhouhao
*/
public class TranslationBatchAsyncJobContainer implements BatchAsyncJobContainer {
private ExecutorService executorService;
private List<Exception> exceptions = new ArrayList<>();
private Supplier<TranslationSupportJobWrapper> supportJobSupplierBuilder;
private CountDownLatch downLatch = new CountDownLatch(1);
private AtomicInteger failCounter = new AtomicInteger();
private AtomicInteger overCounter = new AtomicInteger();
private List<Future> futures = new ArrayList<>();
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public <V> BatchAsyncJobContainer submit(Callable<V> callable) {
TranslationSupportJob<V> translationJob = supportJobSupplierBuilder
.get()
.wrapper(callable);
Callable<V> proxy = () -> {
try {
if (failCounter.get() > 0) {
return null;
}
V val = translationJob.call();
overCounter.incrementAndGet();
downLatch.await();
if (failCounter.get() > 0) {
translationJob.rollBack();
return null;
}
return val;
} catch (Exception e) {
exceptions.add(e);
failCounter.incrementAndGet();
overCounter.incrementAndGet();
}
return null;
};
futures.add(executorService.submit(proxy));
return this;
}
@Override
public List<Object> getResult() throws Exception {
while (overCounter.get() != futures.size()) {
Thread.sleep(10);
}
downLatch.countDown();
if (!exceptions.isEmpty()) {
throw new RuntimeException();
}
return futures.stream().map(this::getValue).collect(Collectors.toList());
}
private Object getValue(Future future) {
try {
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,3 @@
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.hswebframework.web.async.AsyncJobServiceAutoConfiguration

View File

@@ -0,0 +1,66 @@
package org.hswebframework.web.async;
import org.hsweb.ezorm.rdb.executor.SqlExecutor;
import org.hswebframework.web.tests.SimpleWebApplicationTests;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.support.TransactionTemplate;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.Executors;
import static org.junit.Assert.*;
/**
* TODO 完成注释
*
* @author zhouhao
*/
public class TransactionSupportAsyncJobServiceTest extends SimpleWebApplicationTests {
@Autowired
private SqlExecutor sqlExecutor;
@Autowired
private AsyncJobService asyncJobService;
@Configuration
@EnableConfigurationProperties(DataSourceProperties.class)
public static class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource(DataSourceProperties properties) {
return properties.initializeDataSourceBuilder().build();
}
}
@Test
public void test() throws Exception {
sqlExecutor.exec("create table test(id varchar(32))");
try {
BatchAsyncJobContainer jobContainer = asyncJobService.batch();
for (int i = 0; i < 100; i++) {
jobContainer.submit(() -> sqlExecutor.insert("insert into test values('test')", null), true);
}
jobContainer.submit(() -> {
Thread.sleep(200);
throw new RuntimeException();
}, true);
System.out.println(jobContainer.getResult().size());
} catch (Exception ignore) {
}
Assert.assertTrue(sqlExecutor.list("select * from test").isEmpty());
}
}

View File

@@ -0,0 +1,18 @@
spring:
aop:
auto: true
datasource:
url : jdbc:h2:mem:async_test_mem
username : sa
password :
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name : org.h2.Driver
initial-size: 100
max-active: 2000
hsweb:
app:
name: 异步任务
version: 3.0.0
logging:
level:
org.hswebframework: debug