refactor: 优化数据库连接获取逻辑

This commit is contained in:
zhouhao
2025-05-22 15:09:41 +08:00
parent 5cfcbe679a
commit 64257a2a94
3 changed files with 51 additions and 13 deletions

View File

@@ -17,6 +17,7 @@ import reactor.util.function.Tuples;
import javax.sql.DataSource;
import java.sql.Connection;
import java.util.function.Function;
@Slf4j
public class DefaultJdbcReactiveExecutor extends JdbcReactiveSqlExecutor {
@@ -52,6 +53,15 @@ public class DefaultJdbcReactiveExecutor extends JdbcReactiveSqlExecutor {
);
}
@Override
protected <T> Flux<T> doInConnection(Function<Connection, Publisher<T>> handler) {
return Flux
.using(this::getDataSourceAndConnection,
tp2 -> handler.apply(tp2.getT2()),
tp2 -> DataSourceUtils.releaseConnection(tp2.getT2(), tp2.getT1())
);
}
@Override
@Transactional(transactionManager = TransactionManagers.reactiveTransactionManager,readOnly = true)
public <E> Flux<E> select(String sql, ResultWrapper<E, ?> wrapper) {

View File

@@ -14,6 +14,7 @@ import org.hswebframework.web.exception.I18nSupportException;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.r2dbc.connection.ConnectionFactoryUtils;
import org.springframework.r2dbc.core.ConnectionAccessor;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
@@ -24,6 +25,7 @@ import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.Map;
import java.util.function.Function;
public class DefaultR2dbcExecutor extends R2dbcReactiveSqlExecutor {
@@ -88,13 +90,22 @@ public class DefaultR2dbcExecutor extends R2dbcReactiveSqlExecutor {
@Override
protected Mono<Connection> getConnection() {
if (DataSourceHolder.isDynamicDataSourceReady()) {
return DataSourceHolder.currentR2dbc()
.flatMap(R2dbcDataSource::getNative)
.flatMap(ConnectionFactoryUtils::getConnection);
} else {
return ConnectionFactoryUtils.getConnection(defaultFactory);
}
return ConnectionFactoryUtils
.getConnection(defaultFactory);
}
@Override
protected <T> Flux<T> doInConnection(Function<Connection, Publisher<T>> handler) {
return Flux.usingWhen(
ConnectionFactoryUtils.getConnection(defaultFactory),
handler,
source -> ConnectionFactoryUtils
.currentConnectionFactory(defaultFactory)
.then()
.onErrorResume(Exception.class, ex -> Mono.from(source.close()))
);
// return super.doWith(handler);
}
@Override

29
pom.xml
View File

@@ -90,7 +90,7 @@
<cglib.version>3.2.2</cglib.version>
<aspectj.version>1.6.12</aspectj.version>
<hsweb.ezorm.version>4.1.3-SNAPSHOT</hsweb.ezorm.version>
<hsweb.ezorm.version>4.2.0-SNAPSHOT</hsweb.ezorm.version>
<hsweb.utils.version>3.0.4</hsweb.utils.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<swagger.version>2.7.0</swagger.version>
@@ -401,13 +401,30 @@
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-bom</artifactId>
<version>${r2dbc.version}</version>
<type>pom</type>
<scope>import</scope>
<groupId>io.asyncer</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>1.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.hibernate.javax.persistence</groupId>
<artifactId>hibernate-jpa-2.1-api</artifactId>