优化动态数据源

This commit is contained in:
zhouhao
2017-05-19 10:28:17 +08:00
parent 8afa483829
commit 1ca7b9114d
11 changed files with 269 additions and 52 deletions

View File

@@ -22,9 +22,18 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.hswebframework.web</groupId>
<artifactId>hsweb-boost-aop</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -1,37 +1,93 @@
package org.hswebframework.web.datasource;
import org.hswebframework.web.datasource.exception.DataSourceNotFoundException;
import org.hswebframework.web.datasource.switcher.DataSourceSwitcher;
/**
* TODO 完成注释
* 用于操作动态数据源,如获取当前使用的数据源,使用switcher切换数据源等
*
* @author zhouhao
* @see 3.0
*/
public final class DataSourceHolder {
/**
* 动态数据源切换器
*/
static DataSourceSwitcher dataSourceSwitcher;
/**
* 动态数据源服务
*/
static DynamicDataSourceService dynamicDataSourceService;
/**
* @return 动态数据源切换器
*/
public static DataSourceSwitcher switcher() {
return dataSourceSwitcher;
}
public static DynamicDataSource getDefaultDataSource() {
/**
* @return 默认数据源
*/
public static DynamicDataSource defaultDataSource() {
return dynamicDataSourceService.getDefaultDataSource();
}
public static DynamicDataSource getActiveDataSource() {
/**
* @return 当前使用的数据源
*/
public static DynamicDataSource currentDataSource() {
String id = dataSourceSwitcher.currentDataSourceId();
if (id == null) return getDefaultDataSource();
if (id == null) return defaultDataSource();
return dynamicDataSourceService.getDataSource(id);
}
public static DatabaseType getActiveDatabaseType() {
return getActiveDataSource().getType();
/**
* @return 当前使用的数据源是否为默认数据源
*/
public static boolean currentIsDefault() {
return dataSourceSwitcher.currentDataSourceId() == null;
}
public static DatabaseType getDefaultDatabaseType() {
return getDefaultDataSource().getType();
/**
* 判断指定id的数据源是否存在
*
* @param id 数据源id {@link DynamicDataSource#getId()}
* @return 数据源是否存在
*/
public static boolean existing(String id) {
try {
return dynamicDataSourceService.getDataSource(id) != null;
} catch (DataSourceNotFoundException e) {
return false;
}
}
/**
* @return 当前使用的数据源是否存在
*/
public static boolean currentExisting() {
if (currentIsDefault()) return true;
try {
return currentDataSource() != null;
} catch (DataSourceNotFoundException e) {
return false;
}
}
/**
* @return 当前数据库类型
*/
public static DatabaseType currentDatabaseType() {
return currentDataSource().getType();
}
/**
* @return 默认的数据库类型
*/
public static DatabaseType defaultDatabaseType() {
return defaultDataSource().getType();
}
}

View File

@@ -1,5 +1,8 @@
package org.hswebframework.web.datasource.annotation;
import org.hswebframework.web.datasource.DataSourceHolder;
import org.hswebframework.web.datasource.DynamicDataSource;
import java.lang.annotation.*;
/**
@@ -10,5 +13,17 @@ import java.lang.annotation.*;
@Documented
@Inherited
public @interface UseDataSource {
/**
* @return 数据源ID ,支持表达式如 : ${#param.id}
* @see DynamicDataSource#getId()
*/
String value();
/**
* @return 数据源不存在时, 是否使用默认数据源.
* 如果为{@code false},当数据源不存在的时候,
* 将抛出 {@link org.hswebframework.web.datasource.exception.DataSourceNotFoundException}
* @see DataSourceHolder#currentExisting()
*/
boolean fallbackDefault() default true;
}

View File

@@ -4,6 +4,7 @@ import org.hswebframework.web.datasource.DynamicDataSource;
import org.hswebframework.web.datasource.DynamicDataSourceProxy;
import org.hswebframework.web.datasource.DynamicDataSourceService;
import javax.annotation.PreDestroy;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Map;
@@ -25,6 +26,11 @@ public abstract class AbstractDynamicDataSourceService implements DynamicDataSou
this(dataSource instanceof DynamicDataSource ? (DynamicDataSource) dataSource : new DynamicDataSourceProxy(null, dataSource));
}
@PreDestroy
public void destroy() {
dataSourceStore.values().forEach(DataSourceCache::closeDataSource);
}
@Override
public DynamicDataSource getDataSource(String dataSourceId) {
DataSourceCache cache = dataSourceStore.get(dataSourceId);

View File

@@ -0,0 +1,78 @@
package org.hswebframework.web.datasource.starter;
import org.aopalliance.intercept.MethodInterceptor;
import org.hswebframework.web.AopUtils;
import org.hswebframework.web.ExpressionUtils;
import org.hswebframework.web.boost.aop.context.MethodInterceptorHolder;
import org.hswebframework.web.datasource.DataSourceHolder;
import org.hswebframework.web.datasource.annotation.UseDataSource;
import org.hswebframework.web.datasource.annotation.UseDefaultDataSource;
import org.hswebframework.web.datasource.exception.DataSourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.StaticMethodMatcherPointcutAdvisor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.lang.reflect.Method;
/**
* 通过aop方式进行对注解方式切换数据源提供支持
*
* @author zhouhao
* @since 3.0
*/
@Configuration
public class AopDataSourceSwitcherAutoConfiguration {
@Bean
public SwitcherMethodMatcherPointcutAdvisor switcherMethodMatcherPointcutAdvisor() {
return new SwitcherMethodMatcherPointcutAdvisor();
}
public static class SwitcherMethodMatcherPointcutAdvisor extends StaticMethodMatcherPointcutAdvisor {
private static final Logger logger = LoggerFactory.getLogger(SwitcherMethodMatcherPointcutAdvisor.class);
public SwitcherMethodMatcherPointcutAdvisor() {
setAdvice((MethodInterceptor) methodInvocation -> {
logger.debug("switch datasource...");
UseDataSource useDataSource = AopUtils.findAnnotation(methodInvocation.getThis().getClass(),
methodInvocation.getMethod(), UseDataSource.class);
if (useDataSource != null) {
String id = useDataSource.value();
if (id.contains("${")) {
MethodInterceptorHolder holder = MethodInterceptorHolder.create(methodInvocation);
id = ExpressionUtils.analytical(id, holder.getArgs(), "spel");
}
if (!DataSourceHolder.existing(id)) {
if (useDataSource.fallbackDefault()) {
DataSourceHolder.switcher().useDefault();
} else {
throw new DataSourceNotFoundException(id);
}
} else {
DataSourceHolder.switcher().use(id);
}
} else {
UseDefaultDataSource useDefaultDataSource = AopUtils.findAnnotation(methodInvocation.getThis().getClass(),
methodInvocation.getMethod(), UseDefaultDataSource.class);
if (useDefaultDataSource == null) {
logger.warn("can't found annotation: UseDefaultDataSource !");
}
DataSourceHolder.switcher().useDefault();
}
try {
return methodInvocation.proceed();
} finally {
DataSourceHolder.switcher().useLast();
}
});
}
@Override
public boolean matches(Method method, Class<?> aClass) {
return AopUtils.findAnnotation(aClass, method, UseDataSource.class) != null ||
AopUtils.findAnnotation(aClass, method, UseDefaultDataSource.class) != null;
}
}
}

View File

@@ -1,9 +1,11 @@
package org.hswebframework.web.datasource.switcher;
/**
* TODO 完成注释
* 动态数据源切换器,用于切换数据源操作
*
* @author zhouhao
* @see 3.0
* @see DefaultDataSourceSwitcher
*/
public interface DataSourceSwitcher {
@@ -25,12 +27,12 @@ public interface DataSourceSwitcher {
void useDefault();
/**
* @return 当前选择的数据源ID
* @return 当前选择的数据源ID, 如果为默认数据源则返回 {@code null}
*/
String currentDataSourceId();
/**
* 重置切换记录
* 重置切换记录,重置后,使用默认数据源
*/
void reset();
}

View File

@@ -8,26 +8,31 @@ import java.util.Deque;
import java.util.LinkedList;
/**
* TODO 完成注释
* 默认的动态数据源切换器,基于ThreadLocal,queue
*
* @author zhouhao
* @since 3.0
*/
public class DefaultDataSourceSwitcher implements DataSourceSwitcher {
//默认数据源标识
private static final String DEFAULT_DATASOURCE_ID = DataSourceSwitcher.class.getName() + "_default_";
private Logger logger = LoggerFactory.getLogger(this.getClass());
private Deque<String> getUsedHistoryQueue() {
// 从ThreadLocal中获取一个使用记录
return ThreadLocalUtils.get(DefaultDataSourceSwitcher.class.getName() + "_queue", LinkedList::new);
}
@Override
public void useLast() {
// 没有上一次了
if (getUsedHistoryQueue().size() == 0) {
return;
}
//移除队尾,则当前的队尾则为上一次的数据源
getUsedHistoryQueue().removeLast();
if (logger.isDebugEnabled()) {
String current = currentDataSourceId();
if (null != current)
@@ -38,6 +43,7 @@ public class DefaultDataSourceSwitcher implements DataSourceSwitcher {
@Override
public void use(String dataSourceId) {
//添加对队尾
getUsedHistoryQueue().addLast(dataSourceId);
if (logger.isDebugEnabled()) {
logger.debug("try use data source : {}", dataSourceId);

View File

@@ -11,9 +11,9 @@ import java.util.Properties;
* @author zhouhao
*/
public class AtomikosDataSourceConfig {
private int minPoolSize = 1;
private int maxPoolSize = 20;
private int borrowConnectionTimeout = 30;
private int minPoolSize = 5;
private int maxPoolSize = 200;
private int borrowConnectionTimeout = 60;
private int reapTimeout = 0;
private int maxIdleTime = 60;
private int maintenanceInterval = 60;
@@ -23,7 +23,8 @@ public class AtomikosDataSourceConfig {
private String testQuery = null;
private int maxLifetime = 0;
private Properties xaProperties = null;
private int initTimeOut = 10 * 1000;
//初始化超时时间
private int initTimeout = 10;
@Override
public int hashCode() {
@@ -50,7 +51,7 @@ public class AtomikosDataSourceConfig {
", testQuery='" + testQuery + '\'' +
", maxLifetime=" + maxLifetime +
", xaProperties=" + xaProperties +
", initTimeOut=" + initTimeOut +
", initTimeout=" + initTimeout +
'}';
}
@@ -150,12 +151,12 @@ public class AtomikosDataSourceConfig {
this.maxLifetime = maxLifetime;
}
public int getInitTimeOut() {
return initTimeOut;
public int getInitTimeout() {
return initTimeout;
}
public void setInitTimeOut(int initTimeOut) {
this.initTimeOut = initTimeOut;
public void setInitTimeout(int initTimeout) {
this.initTimeout = initTimeout;
}
public void putProperties(AtomikosDataSourceBean atomikosDataSourceBean) {
@@ -164,10 +165,12 @@ public class AtomikosDataSourceConfig {
}
atomikosDataSourceBean.setXaDataSourceClassName(getXaDataSourceClassName());
atomikosDataSourceBean.setBorrowConnectionTimeout(getBorrowConnectionTimeout());
try {
atomikosDataSourceBean.setLoginTimeout(getLoginTimeout());
} catch (SQLException e) {
e.printStackTrace();
if (loginTimeout != 0) {
try {
atomikosDataSourceBean.setLoginTimeout(getLoginTimeout());
} catch (SQLException e) {
e.printStackTrace();
}
}
atomikosDataSourceBean.setMaxIdleTime(getMaxIdleTime());
atomikosDataSourceBean.setMaxPoolSize(getMaxPoolSize());

View File

@@ -9,7 +9,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.jdbc.datasource.DataSourceUtils;
import javax.sql.DataSource;
import javax.sql.XADataSource;
@@ -19,7 +18,6 @@ import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -83,7 +81,7 @@ public class JtaDynamicDataSourceService extends AbstractDynamicDataSourceServic
logger.error("close xa datasource error", e);
}
} else {
logger.warn("XADataSource is not instanceof Closeable!", Thread.currentThread().getStackTrace());
logger.warn("XADataSource is not instanceof Closeable!", (Object) Thread.currentThread().getStackTrace());
}
}
};
@@ -91,21 +89,22 @@ public class JtaDynamicDataSourceService extends AbstractDynamicDataSourceServic
executor.execute(() -> {
try {
atomikosDataSourceBean.init();
downLatch.countDown();
successCounter.incrementAndGet();
downLatch.countDown();
} catch (Exception e) {
logger.error("init datasource {} error", id, e);
//atomikosDataSourceBean.close();
}
});
//初始化状态判断
executor.execute(() -> {
try {
Thread.sleep(config.getInitTimeOut());
Thread.sleep(config.getInitTimeout() * 1000);
} catch (InterruptedException ignored) {
} finally {
if (successCounter.get() == 0) {
// 初始化超时,认定为失败
logger.error("init timeout ({}ms)", config.getInitTimeOut());
logger.error("init timeout ({}ms)", config.getInitTimeout());
cache.closeDataSource();
if (downLatch.getCount() > 0)
downLatch.countDown();

View File

@@ -1,32 +1,52 @@
package org.hswebframework.web.datasource.jta;
import com.alibaba.druid.pool.DruidDataSource;
import org.hsweb.ezorm.rdb.RDBDatabase;
import org.hsweb.ezorm.rdb.executor.AbstractJdbcSqlExecutor;
import org.hsweb.ezorm.rdb.executor.SQL;
import org.hsweb.ezorm.rdb.executor.SqlExecutor;
import org.hsweb.ezorm.rdb.meta.RDBDatabaseMetaData;
import org.hsweb.ezorm.rdb.meta.RDBTableMetaData;
import org.hsweb.ezorm.rdb.meta.parser.H2TableMetaParser;
import org.hsweb.ezorm.rdb.meta.parser.MysqlTableMetaParser;
import org.hsweb.ezorm.rdb.meta.parser.OracleTableMetaParser;
import org.hsweb.ezorm.rdb.meta.parser.TableMetaParser;
import org.hsweb.ezorm.rdb.render.SqlRender;
import org.hsweb.ezorm.rdb.render.dialect.Dialect;
import org.hsweb.ezorm.rdb.render.dialect.H2RDBDatabaseMetaData;
import org.hsweb.ezorm.rdb.render.dialect.MysqlRDBDatabaseMetaData;
import org.hsweb.ezorm.rdb.render.dialect.OracleRDBDatabaseMetaData;
import org.hsweb.ezorm.rdb.simple.SimpleDatabase;
import org.hswebframework.expands.script.engine.DynamicScriptEngine;
import org.hswebframework.expands.script.engine.DynamicScriptEngineFactory;
import org.hswebframework.expands.script.engine.SpEL.SpElEngine;
import org.hswebframework.web.datasource.DataSourceHolder;
import org.hswebframework.web.datasource.DatabaseType;
import org.hswebframework.web.datasource.DynamicDataSourceProxy;
import org.hswebframework.web.datasource.service.DataSourceCache;
import org.hswebframework.web.datasource.annotation.UseDataSource;
import org.hswebframework.web.datasource.annotation.UseDefaultDataSource;
import org.hswebframework.web.datasource.starter.AopDataSourceSwitcherAutoConfiguration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.context.expression.*;
import org.springframework.core.convert.ConversionService;
import org.springframework.expression.spel.support.StandardTypeConverter;
import org.springframework.expression.spel.support.StandardTypeLocator;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.messaging.simp.SimpSessionScope;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Propagation;
@@ -36,7 +56,6 @@ import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;
/**
* TODO 完成注释
@@ -50,6 +69,8 @@ public class SimpleAtomikosTests {
@Configuration
@SpringBootApplication
@EnableJms
@ImportAutoConfiguration(AopDataSourceSwitcherAutoConfiguration.class)
@EnableAspectJAutoProxy
public static class Config {
@Bean
public SqlExecutor sqlExecutor() {
@@ -63,12 +84,12 @@ public class SimpleAtomikosTests {
@Override
public Connection getConnection() {
return DataSourceUtils.getConnection(DataSourceHolder.getActiveDataSource());
return DataSourceUtils.getConnection(DataSourceHolder.currentDataSource());
}
@Override
public void releaseConnection(Connection connection) throws SQLException {
DataSourceUtils.releaseConnection(connection, DataSourceHolder.getActiveDataSource());
DataSourceUtils.releaseConnection(connection, DataSourceHolder.currentDataSource());
}
};
}
@@ -96,28 +117,45 @@ public class SimpleAtomikosTests {
@Bean
public DynDsTest transTest(SqlExecutor sqlExecutor) {
return new DynDsTest(new SimpleDatabase(new DynDatabaseMeta(), sqlExecutor));
SimpleDatabase database = new SimpleDatabase(new DynDatabaseMeta(sqlExecutor), sqlExecutor);
database.setAutoParse(true);
return new DynDsTest(database);
}
public class DynDatabaseMeta extends RDBDatabaseMetaData {
private Map<DatabaseType, Dialect> dialectMap;
private Map<DatabaseType, RDBDatabaseMetaData> metaDataMap;
private Map<DatabaseType, TableMetaParser> parserMap;
public DynDatabaseMeta() {
public DynDatabaseMeta(SqlExecutor sqlExecutor) {
dialectMap = new HashMap<>();
metaDataMap = new HashMap<>();
parserMap = new HashMap<>();
dialectMap.put(DatabaseType.h2, Dialect.H2);
dialectMap.put(DatabaseType.mysql, Dialect.MYSQL);
dialectMap.put(DatabaseType.oracle, Dialect.ORACLE);
metaDataMap.put(DatabaseType.h2, new H2RDBDatabaseMetaData());
metaDataMap.put(DatabaseType.mysql, new MysqlRDBDatabaseMetaData());
metaDataMap.put(DatabaseType.oracle, new OracleRDBDatabaseMetaData());
parserMap.put(DatabaseType.h2, new H2TableMetaParser(sqlExecutor));
parserMap.put(DatabaseType.mysql, new MysqlTableMetaParser(sqlExecutor));
parserMap.put(DatabaseType.oracle, new OracleTableMetaParser(sqlExecutor));
}
@Override
public RDBTableMetaData putTable(RDBTableMetaData tableMetaData) {
return metaDataMap.get(DataSourceHolder.currentDatabaseType()).putTable(tableMetaData);
}
@Override
public TableMetaParser getParser() {
return parserMap.get(DataSourceHolder.currentDatabaseType());
}
@Override
public Dialect getDialect() {
return dialectMap.get(DataSourceHolder.getActiveDatabaseType());
return dialectMap.get(DataSourceHolder.currentDatabaseType());
}
@Override
@@ -127,12 +165,12 @@ public class SimpleAtomikosTests {
@Override
public SqlRender getRenderer(SqlRender.TYPE type) {
return metaDataMap.get(DataSourceHolder.getActiveDatabaseType()).getRenderer(type);
return metaDataMap.get(DataSourceHolder.currentDatabaseType()).getRenderer(type);
}
@Override
public String getName() {
return metaDataMap.get(DataSourceHolder.getActiveDatabaseType()).getName();
return metaDataMap.get(DataSourceHolder.currentDatabaseType()).getName();
}
}
}
@@ -157,6 +195,8 @@ public class SimpleAtomikosTests {
@Rollback(false)
public void test() {
try {
DataSourceHolder.switcher().reset();
dynDsTest.testCreateTable();
dynDsTest.testInsert();
DataSourceHolder.switcher().use("test_ds");
@@ -166,22 +206,20 @@ public class SimpleAtomikosTests {
DataSourceHolder.switcher().use("test_ds2");
dynDsTest.testCreateTable();
System.out.println(DataSourceHolder.getActiveDatabaseType());
System.out.println(DataSourceHolder.switcher().currentDataSourceId());
System.out.println(dynDsTest.testQuery());
DataSourceHolder.switcher().useLast();
System.out.println(DataSourceHolder.getActiveDatabaseType());
System.out.println(DataSourceHolder.switcher().currentDataSourceId());
System.out.println(dynDsTest.testQuery());
DataSourceHolder.switcher().useLast();
System.out.println(DataSourceHolder.getActiveDatabaseType());
System.out.println(DataSourceHolder.switcher().currentDataSourceId());
System.out.println(dynDsTest.testQuery());
jmsTemplate.convertAndSend("test", "hello");
Thread.sleep(1000);
// throw new RuntimeException();
} catch (SQLException e) {
e.printStackTrace();
} catch (InterruptedException e) {
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
}
}
@@ -207,6 +245,7 @@ public class SimpleAtomikosTests {
.exec();
}
public List testQuery() throws SQLException {
return database.getTable("s_user").createQuery().list();
}

View File

@@ -17,7 +17,7 @@ spring:
activemq:
broker-url: tcp://localhost:61616
in-memory: false
in-memory: true
logging:
level:
@@ -41,4 +41,8 @@ hsweb:
username: sa
password:
max-pool-size: 20
borrow-connection-timeout: 1000
borrow-connection-timeout: 1000
init-timeout: 20
config:
test: 123