diff --git a/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/sql/DefaultJdbcReactiveExecutor.java b/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/sql/DefaultJdbcReactiveExecutor.java index 571a92714..7e05edfdd 100644 --- a/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/sql/DefaultJdbcReactiveExecutor.java +++ b/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/sql/DefaultJdbcReactiveExecutor.java @@ -17,6 +17,7 @@ import reactor.util.function.Tuples; import javax.sql.DataSource; import java.sql.Connection; +import java.util.function.Function; @Slf4j public class DefaultJdbcReactiveExecutor extends JdbcReactiveSqlExecutor { @@ -52,6 +53,15 @@ public class DefaultJdbcReactiveExecutor extends JdbcReactiveSqlExecutor { ); } + @Override + protected Flux doInConnection(Function> handler) { + return Flux + .using(this::getDataSourceAndConnection, + tp2 -> handler.apply(tp2.getT2()), + tp2 -> DataSourceUtils.releaseConnection(tp2.getT2(), tp2.getT1()) + ); + } + @Override @Transactional(transactionManager = TransactionManagers.reactiveTransactionManager,readOnly = true) public Flux select(String sql, ResultWrapper wrapper) { diff --git a/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/sql/DefaultR2dbcExecutor.java b/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/sql/DefaultR2dbcExecutor.java index f1247026b..888e0be3e 100644 --- a/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/sql/DefaultR2dbcExecutor.java +++ b/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/sql/DefaultR2dbcExecutor.java @@ -14,6 +14,7 @@ import org.hswebframework.web.exception.I18nSupportException; import org.reactivestreams.Publisher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.r2dbc.connection.ConnectionFactoryUtils; +import org.springframework.r2dbc.core.ConnectionAccessor; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import reactor.core.publisher.Flux; @@ -24,6 +25,7 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Date; import java.util.Map; +import java.util.function.Function; public class DefaultR2dbcExecutor extends R2dbcReactiveSqlExecutor { @@ -88,13 +90,22 @@ public class DefaultR2dbcExecutor extends R2dbcReactiveSqlExecutor { @Override protected Mono getConnection() { - if (DataSourceHolder.isDynamicDataSourceReady()) { - return DataSourceHolder.currentR2dbc() - .flatMap(R2dbcDataSource::getNative) - .flatMap(ConnectionFactoryUtils::getConnection); - } else { - return ConnectionFactoryUtils.getConnection(defaultFactory); - } + return ConnectionFactoryUtils + .getConnection(defaultFactory); + } + + @Override + protected Flux doInConnection(Function> handler) { + return Flux.usingWhen( + ConnectionFactoryUtils.getConnection(defaultFactory), + handler, + source -> ConnectionFactoryUtils + .currentConnectionFactory(defaultFactory) + .then() + .onErrorResume(Exception.class, ex -> Mono.from(source.close())) + ); + + // return super.doWith(handler); } @Override diff --git a/pom.xml b/pom.xml index e426afd76..7bb083ccd 100644 --- a/pom.xml +++ b/pom.xml @@ -90,7 +90,7 @@ 3.2.2 1.6.12 - 4.1.3-SNAPSHOT + 4.2.0-SNAPSHOT 3.0.4 3.0.2 2.7.0 @@ -401,13 +401,30 @@ - io.r2dbc - r2dbc-bom - ${r2dbc.version} - pom - import + io.asyncer + r2dbc-mysql + 1.4.1 + + org.postgresql + r2dbc-postgresql + 1.0.7.RELEASE + + + + io.r2dbc + r2dbc-h2 + 1.0.0.RELEASE + + + + io.r2dbc + r2dbc-spi + 1.0.0.RELEASE + + + org.hibernate.javax.persistence hibernate-jpa-2.1-api