From e98c6d50e7babe207fd621b400c998280c126691 Mon Sep 17 00:00:00 2001 From: zhouhao Date: Fri, 14 Apr 2023 15:52:42 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9B=BF=E6=8D=A2=E5=BC=83=E7=94=A8API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../authorization/AuthenticationTests.java | 4 +- .../basic/web/UserTokenWebFilter.java | 14 ++--- .../web/api/crud/entity/EntityFactory.java | 1 - .../crud/configuration/AutoDDLProcessor.java | 55 ++++++++++++------- .../configuration/EasyormConfiguration.java | 12 +++- .../web/crud/events/EntityEventHelper.java | 7 +-- .../web/context/ContextUtils.java | 11 +--- .../hswebframework/web/i18n/LocaleUtils.java | 3 +- .../web/i18n/WebFluxLocaleFilter.java | 2 +- .../web/logger/ReactiveLogger.java | 21 +++---- .../web/i18n/LocaleUtilsTest.java | 6 +- .../web/logger/ReactiveLoggerTest.java | 4 +- .../switcher/DefaultReactiveSwitcher.java | 5 +- hsweb-datasource/hsweb-datasource-jta/pom.xml | 1 + .../aop/ReactiveAopAccessLoggerSupport.java | 39 +++++++------ .../CustomJackson2jsonEncoderTest.java | 2 +- 16 files changed, 97 insertions(+), 90 deletions(-) diff --git a/hsweb-authorization/hsweb-authorization-api/src/test/java/org/hswebframework/web/authorization/AuthenticationTests.java b/hsweb-authorization/hsweb-authorization-api/src/test/java/org/hswebframework/web/authorization/AuthenticationTests.java index 01c4c4ebf..d624d8473 100644 --- a/hsweb-authorization/hsweb-authorization-api/src/test/java/org/hswebframework/web/authorization/AuthenticationTests.java +++ b/hsweb-authorization/hsweb-authorization-api/src/test/java/org/hswebframework/web/authorization/AuthenticationTests.java @@ -139,8 +139,8 @@ public class AuthenticationTests { .doOnEach(ReactiveLogger.on(SignalType.ON_NEXT,(ctx,signal)->{ System.out.println(ctx); })) - .subscriberContext(Context.of(ParsedToken.class, parsedToken)) - .subscriberContext(ReactiveLogger.start("rid","1")) + .contextWrite(Context.of(ParsedToken.class, parsedToken)) + .contextWrite(ReactiveLogger.start("rid","1")) .as(StepVerifier::create) .expectNext("admin") .verifyComplete(); diff --git a/hsweb-authorization/hsweb-authorization-basic/src/main/java/org/hswebframework/web/authorization/basic/web/UserTokenWebFilter.java b/hsweb-authorization/hsweb-authorization-basic/src/main/java/org/hswebframework/web/authorization/basic/web/UserTokenWebFilter.java index c98edba8c..711b32d73 100644 --- a/hsweb-authorization/hsweb-authorization-basic/src/main/java/org/hswebframework/web/authorization/basic/web/UserTokenWebFilter.java +++ b/hsweb-authorization/hsweb-authorization-basic/src/main/java/org/hswebframework/web/authorization/basic/web/UserTokenWebFilter.java @@ -48,23 +48,17 @@ public class UserTokenWebFilter implements WebFilter, BeanPostProcessor { .next() .map(token -> chain .filter(exchange) - .subscriberContext(Context.of(ParsedToken.class, token))) + .contextWrite(Context.of(ParsedToken.class, token))) .defaultIfEmpty(chain.filter(exchange)) .flatMap(Function.identity()) - .subscriberContext(ReactiveLogger.start("requestId", exchange.getRequest().getId())); + .contextWrite(ReactiveLogger.start("requestId", exchange.getRequest().getId())); -// return chain.filter(exchange) -// .subscriberContext(ContextUtils.acceptContext(ctx -> -// Flux.fromIterable(parsers) -// .flatMap(parser -> parser.parseToken(exchange)) -// .subscribe(token -> ctx.put(ParsedToken.class, token))) -// ) -// .subscriberContext(ReactiveLogger.start("requestId", exchange.getRequest().getId())) } @EventListener public void handleUserSign(AuthorizationSuccessEvent event) { - ReactiveUserTokenGenerator generator = event.getParameter("tokenType") + ReactiveUserTokenGenerator generator = event + .getParameter("tokenType") .map(tokenGeneratorMap::get) .orElseGet(() -> tokenGeneratorMap.get("default")); if (generator != null) { diff --git a/hsweb-commons/hsweb-commons-api/src/main/java/org/hswebframework/web/api/crud/entity/EntityFactory.java b/hsweb-commons/hsweb-commons-api/src/main/java/org/hswebframework/web/api/crud/entity/EntityFactory.java index 5f0947795..32a33f578 100644 --- a/hsweb-commons/hsweb-commons-api/src/main/java/org/hswebframework/web/api/crud/entity/EntityFactory.java +++ b/hsweb-commons/hsweb-commons-api/src/main/java/org/hswebframework/web/api/crud/entity/EntityFactory.java @@ -103,7 +103,6 @@ public interface EntityFactory { return getInstanceType(entityClass, false); } - @Nullable Class getInstanceType(Class entityClass, boolean autoRegister); /** diff --git a/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/configuration/AutoDDLProcessor.java b/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/configuration/AutoDDLProcessor.java index 28cff42f9..626d0b470 100644 --- a/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/configuration/AutoDDLProcessor.java +++ b/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/configuration/AutoDDLProcessor.java @@ -7,16 +7,19 @@ import lombok.extern.slf4j.Slf4j; import org.hswebframework.ezorm.rdb.metadata.RDBTableMetadata; import org.hswebframework.ezorm.rdb.operator.DatabaseOperator; import org.hswebframework.web.api.crud.entity.EntityFactory; +import org.hswebframework.web.crud.annotation.DDL; import org.hswebframework.web.crud.entity.factory.MapperEntityFactory; import org.hswebframework.web.crud.events.EntityDDLEvent; import org.hswebframework.web.event.GenericsPayloadApplicationEvent; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.core.annotation.AnnotatedElementUtils; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; import java.time.Duration; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -50,14 +53,23 @@ public class AutoDDLProcessor implements InitializingBean { @SneakyThrows public void afterPropertiesSet() { - List> entities = this.entities - .stream() - .map(e -> entityFactory.getInstanceType(e.getRealType(), true)) - .collect(Collectors.toList()); - if (properties.isAutoDdl()) { + List> readyToDDL = new ArrayList<>(this.entities.size()); + List> nonDDL = new ArrayList<>(); + + for (EntityInfo entity : this.entities) { + Class type = entityFactory.getInstanceType(entity.getRealType(), true); + DDL ddl = AnnotatedElementUtils.findMergedAnnotation(type, DDL.class); + if (properties.isAutoDdl() && (ddl == null || ddl.value())) { + readyToDDL.add(type); + } else { + nonDDL.add(type); + } + } + + if (!readyToDDL.isEmpty()) { //加载全部表信息 if (reactive) { - Flux.fromIterable(entities) + Flux.fromIterable(readyToDDL) .doOnNext(type -> log.trace("auto ddl for {}", type)) .map(type -> { RDBTableMetadata metadata = resolver.resolve(type); @@ -66,18 +78,18 @@ public class AutoDDLProcessor implements InitializingBean { return metadata; }) .flatMap(meta -> operator - .ddl() - .createOrAlter(meta) - .autoLoad(false) - .commit() - .reactive() - .subscribeOn(Schedulers.elastic()) - ) + .ddl() + .createOrAlter(meta) + .autoLoad(false) + .commit() + .reactive() + .subscribeOn(Schedulers.boundedElastic()), + 8) .doOnError((err) -> log.error(err.getMessage(), err)) .then() .block(Duration.ofMinutes(5)); } else { - for (Class type : entities) { + for (Class type : readyToDDL) { log.trace("auto ddl for {}", type); try { RDBTableMetadata metadata = resolver.resolve(type); @@ -94,13 +106,14 @@ public class AutoDDLProcessor implements InitializingBean { } } } - } else { - for (Class entity : entities) { - RDBTableMetadata metadata = resolver.resolve(entity); - operator.getMetadata() - .getCurrentSchema() - .addTable(metadata); - } + } + + for (Class entity : nonDDL) { + RDBTableMetadata metadata = resolver.resolve(entity); + operator + .getMetadata() + .getCurrentSchema() + .addTable(metadata); } } } diff --git a/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/configuration/EasyormConfiguration.java b/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/configuration/EasyormConfiguration.java index 393ca3cb8..6ecbb9214 100644 --- a/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/configuration/EasyormConfiguration.java +++ b/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/configuration/EasyormConfiguration.java @@ -28,10 +28,13 @@ import org.hswebframework.web.crud.generator.CurrentTimeGenerator; import org.hswebframework.web.crud.generator.DefaultIdGenerator; import org.hswebframework.web.crud.generator.MD5Generator; import org.hswebframework.web.crud.generator.SnowFlakeStringIdGenerator; +import org.hswebframework.web.crud.query.DefaultQueryHelper; +import org.hswebframework.web.crud.query.QueryHelper; import org.springframework.beans.BeansException; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -79,7 +82,7 @@ public class EasyormConfiguration { RDBDatabaseMetadata metadata = properties.createDatabaseMetadata(); syncSqlExecutor.ifPresent(metadata::addFeature); reactiveSqlExecutor.ifPresent(metadata::addFeature); - if (properties.isAutoDdl()) { + if (properties.isAutoDdl() && reactiveSqlExecutor.isPresent()) { for (RDBSchemaMetadata schema : metadata.getSchemas()) { schema.loadAllTableReactive() .block(Duration.ofSeconds(30)); @@ -95,6 +98,11 @@ public class EasyormConfiguration { return DefaultDatabaseOperator.of(metadata); } + @Bean + public QueryHelper queryHelper(DatabaseOperator databaseOperator) { + return new DefaultQueryHelper(databaseOperator); + } + @Bean public BeanPostProcessor autoRegisterFeature(RDBDatabaseMetadata metadata) { CompositeEventListener eventListener = new CompositeEventListener(); @@ -211,7 +219,7 @@ public class EasyormConfiguration { entityType, table.findFeatureNow( MappingFeatureType.columnPropertyMapping.createFeatureId(realType) - ),factory)); + ), factory)); } for (TableMetadataCustomizer customizer : customizers) { customizer.customTable(realType, table); diff --git a/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/events/EntityEventHelper.java b/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/events/EntityEventHelper.java index 3a6ffd8d9..cd7d52ff1 100644 --- a/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/events/EntityEventHelper.java +++ b/hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/events/EntityEventHelper.java @@ -31,8 +31,7 @@ public class EntityEventHelper { */ public static Mono isDoFireEvent(boolean defaultIfEmpty) { return Mono - .subscriberContext() - .flatMap(ctx -> Mono.justOrEmpty(ctx.getOrEmpty(doEventContextKey))) + .deferContextual(ctx -> Mono.justOrEmpty(ctx.getOrEmpty(doEventContextKey))) .defaultIfEmpty(defaultIfEmpty); } @@ -49,7 +48,7 @@ public class EntityEventHelper { * @return 流 */ public static Mono setDoNotFireEvent(Mono stream) { - return stream.subscriberContext(Context.of(doEventContextKey, false)); + return stream.contextWrite(Context.of(doEventContextKey, false)); } /** @@ -64,7 +63,7 @@ public class EntityEventHelper { * @return 流 */ public static Flux setDoNotFireEvent(Flux stream) { - return stream.subscriberContext(Context.of(doEventContextKey, false)); + return stream.contextWrite(Context.of(doEventContextKey, false)); } public static Mono publishSavedEvent(Object source, diff --git a/hsweb-core/src/main/java/org/hswebframework/web/context/ContextUtils.java b/hsweb-core/src/main/java/org/hswebframework/web/context/ContextUtils.java index 96d2d77c6..38eb71a9e 100644 --- a/hsweb-core/src/main/java/org/hswebframework/web/context/ContextUtils.java +++ b/hsweb-core/src/main/java/org/hswebframework/web/context/ContextUtils.java @@ -20,15 +20,8 @@ public class ContextUtils { @Deprecated public static Mono reactiveContext() { return Mono - .subscriberContext() - .handle((context, sink) -> { - if (context.hasKey(Context.class)) { - sink.next(context.get(Context.class)); - } else { - sink.complete(); - } - }) - .subscriberContext(acceptContext(ctx -> { + .deferContextual(context->Mono.justOrEmpty(context.getOrEmpty(Context.class))) + .contextWrite(acceptContext(ctx -> { })); } diff --git a/hsweb-core/src/main/java/org/hswebframework/web/i18n/LocaleUtils.java b/hsweb-core/src/main/java/org/hswebframework/web/i18n/LocaleUtils.java index fc01cef1a..4bfb38a45 100644 --- a/hsweb-core/src/main/java/org/hswebframework/web/i18n/LocaleUtils.java +++ b/hsweb-core/src/main/java/org/hswebframework/web/i18n/LocaleUtils.java @@ -113,8 +113,7 @@ public final class LocaleUtils { @SuppressWarnings("all") public static Mono currentReactive() { return Mono - .subscriberContext() - .map(ctx -> ctx.getOrDefault(Locale.class, DEFAULT_LOCALE)); + .deferContextual(ctx -> Mono.just(ctx.getOrDefault(Locale.class, DEFAULT_LOCALE))); } public static Mono doInReactive(Callable call) { return currentReactive() diff --git a/hsweb-core/src/main/java/org/hswebframework/web/i18n/WebFluxLocaleFilter.java b/hsweb-core/src/main/java/org/hswebframework/web/i18n/WebFluxLocaleFilter.java index 61a34e1d6..fc6a0d0fe 100644 --- a/hsweb-core/src/main/java/org/hswebframework/web/i18n/WebFluxLocaleFilter.java +++ b/hsweb-core/src/main/java/org/hswebframework/web/i18n/WebFluxLocaleFilter.java @@ -16,7 +16,7 @@ public class WebFluxLocaleFilter implements WebFilter { return chain .filter(exchange) .as(LocaleUtils::transform) - .subscriberContext(LocaleUtils.useLocale(getLocaleContext(exchange))); + .contextWrite(LocaleUtils.useLocale(getLocaleContext(exchange))); } public Locale getLocaleContext(ServerWebExchange exchange) { diff --git a/hsweb-core/src/main/java/org/hswebframework/web/logger/ReactiveLogger.java b/hsweb-core/src/main/java/org/hswebframework/web/logger/ReactiveLogger.java index cccc9aced..4e2d4215d 100644 --- a/hsweb-core/src/main/java/org/hswebframework/web/logger/ReactiveLogger.java +++ b/hsweb-core/src/main/java/org/hswebframework/web/logger/ReactiveLogger.java @@ -5,8 +5,10 @@ import org.hswebframework.web.utils.CollectionUtils; import org.slf4j.MDC; import reactor.core.publisher.*; import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -27,13 +29,13 @@ public class ReactiveLogger { public static Mono mdc(String key, String value) { return Mono .empty() - .subscriberContext(start(key, value)); + .contextWrite(start(key, value)); } public static Mono mdc(String... keyAndValue) { return Mono .empty() - .subscriberContext(start(keyAndValue)); + .contextWrite(start(keyAndValue)); } public static Function start(Map context) { @@ -43,13 +45,13 @@ public class ReactiveLogger { maybeContextMap.get().putAll(context); return ctx; } else { - return ctx.put(CONTEXT_KEY, new LinkedHashMap<>(context)); + return ctx.put(CONTEXT_KEY, new ConcurrentHashMap<>(context)); } }; } - public static void log(Context context, Consumer> logger) { + public static void log(ContextView context, Consumer> logger) { Optional> maybeContextMap = context.getOrEmpty(CONTEXT_KEY); if (!maybeContextMap.isPresent()) { logger.accept(new HashMap<>()); @@ -70,7 +72,7 @@ public class ReactiveLogger { return; } Optional> maybeContextMap - = signal.getContext().getOrEmpty(CONTEXT_KEY); + = signal.getContextView().getOrEmpty(CONTEXT_KEY); if (!maybeContextMap.isPresent()) { logger.accept(new HashMap<>(), signal); } else { @@ -87,8 +89,7 @@ public class ReactiveLogger { public static Mono mdc(Consumer> consumer) { return Mono - .subscriberContext() - .doOnNext(ctx -> { + .deferContextual(ctx -> { Optional> maybeContextMap = ctx.getOrEmpty(CONTEXT_KEY); if (maybeContextMap.isPresent()) { consumer.accept(maybeContextMap.get()); @@ -96,13 +97,13 @@ public class ReactiveLogger { consumer.accept(Collections.emptyMap()); log.warn("logger context is empty,please call publisher.subscriberContext(ReactiveLogger.mdc()) first!"); } - }) - .then(); + return Mono.empty(); + }); } public static BiConsumer> handle(BiConsumer> logger) { return (t, rFluxSink) -> { - log(rFluxSink.currentContext(), context -> { + log(rFluxSink.contextView(), context -> { logger.accept(t, rFluxSink); }); }; diff --git a/hsweb-core/src/test/java/org/hswebframework/web/i18n/LocaleUtilsTest.java b/hsweb-core/src/test/java/org/hswebframework/web/i18n/LocaleUtilsTest.java index 0ce7042a2..4bc2be7ff 100644 --- a/hsweb-core/src/test/java/org/hswebframework/web/i18n/LocaleUtilsTest.java +++ b/hsweb-core/src/test/java/org/hswebframework/web/i18n/LocaleUtilsTest.java @@ -20,7 +20,7 @@ public class LocaleUtilsTest { assertEquals(i.intValue(), 1); assertEquals(LocaleUtils.current(), Locale.ENGLISH); }) - .subscriberContext(LocaleUtils.useLocale(Locale.ENGLISH)) + .contextWrite(LocaleUtils.useLocale(Locale.ENGLISH)) .blockLast(); } @@ -32,7 +32,7 @@ public class LocaleUtilsTest { assertEquals(LocaleUtils.current(), Locale.ENGLISH); }) .as(LocaleUtils::transform) - .subscriberContext(LocaleUtils.useLocale(Locale.ENGLISH)) + .contextWrite(LocaleUtils.useLocale(Locale.ENGLISH)) .block(); LocaleUtils @@ -40,7 +40,7 @@ public class LocaleUtilsTest { assertEquals(LocaleUtils.current(), Locale.ENGLISH); return null; }) - .subscriberContext(LocaleUtils.useLocale(Locale.ENGLISH)) + .contextWrite(LocaleUtils.useLocale(Locale.ENGLISH)) .block(); } diff --git a/hsweb-core/src/test/java/org/hswebframework/web/logger/ReactiveLoggerTest.java b/hsweb-core/src/test/java/org/hswebframework/web/logger/ReactiveLoggerTest.java index 135a7ea9b..ada52e21f 100644 --- a/hsweb-core/src/test/java/org/hswebframework/web/logger/ReactiveLoggerTest.java +++ b/hsweb-core/src/test/java/org/hswebframework/web/logger/ReactiveLoggerTest.java @@ -23,7 +23,7 @@ public class ReactiveLoggerTest { log.info("test:{} {}", v, MDC.getCopyOfContextMap()); })) - .subscriberContext(ReactiveLogger.start("r", "1","t","1")) + .contextWrite(ReactiveLogger.start("r", "1","t","1")) .as(StepVerifier::create) .expectNextCount(5) .verifyComplete(); @@ -39,7 +39,7 @@ public class ReactiveLoggerTest { .handle(ReactiveLogger.handle((o, fluxSink) -> { log.info("test:{}", fluxSink.currentContext()); fluxSink.next(o); - })).subscriberContext(ReactiveLogger.start("r", "1")) + })).contextWrite(ReactiveLogger.start("r", "1")) .as(StepVerifier::create) .expectNextCount(5) .verifyComplete(); diff --git a/hsweb-datasource/hsweb-datasource-api/src/main/java/org/hswebframework/web/datasource/switcher/DefaultReactiveSwitcher.java b/hsweb-datasource/hsweb-datasource-api/src/main/java/org/hswebframework/web/datasource/switcher/DefaultReactiveSwitcher.java index b35912800..36b68daae 100644 --- a/hsweb-datasource/hsweb-datasource-api/src/main/java/org/hswebframework/web/datasource/switcher/DefaultReactiveSwitcher.java +++ b/hsweb-datasource/hsweb-datasource-api/src/main/java/org/hswebframework/web/datasource/switcher/DefaultReactiveSwitcher.java @@ -28,6 +28,7 @@ public class DefaultReactiveSwitcher implements ReactiveSwitcher { this.type=type; } + @Deprecated private Mono doInContext(Function, Mono> function) { return ContextUtils.reactiveContext() .map(ctx -> ctx.getOrDefault(ContextKey.>of(this.name), LinkedList::new)) @@ -38,12 +39,12 @@ public class DefaultReactiveSwitcher implements ReactiveSwitcher { private > R doInContext(R publisher, Consumer> consumer) { if (publisher instanceof Mono) { return (R)((Mono) publisher) - .subscriberContext(ContextUtils.acceptContext(ctx -> { + .contextWrite(ContextUtils.acceptContext(ctx -> { consumer.accept(ctx.getOrDefault(ContextKey.>of(this.name), LinkedList::new)); })); } else if (publisher instanceof Flux) { return (R)((Flux) publisher) - .subscriberContext(ContextUtils.acceptContext(ctx -> { + .contextWrite(ContextUtils.acceptContext(ctx -> { consumer.accept(ctx.getOrDefault(ContextKey.>of(this.name), LinkedList::new)); })); } diff --git a/hsweb-datasource/hsweb-datasource-jta/pom.xml b/hsweb-datasource/hsweb-datasource-jta/pom.xml index a7a8f48ac..0f1d07836 100644 --- a/hsweb-datasource/hsweb-datasource-jta/pom.xml +++ b/hsweb-datasource/hsweb-datasource-jta/pom.xml @@ -55,6 +55,7 @@ mysql mysql-connector-java + 8.0.29 test diff --git a/hsweb-logging/hsweb-access-logging-aop/src/main/java/org/hswebframework/web/logging/aop/ReactiveAopAccessLoggerSupport.java b/hsweb-logging/hsweb-access-logging-aop/src/main/java/org/hswebframework/web/logging/aop/ReactiveAopAccessLoggerSupport.java index f7130b11d..17ddec8f2 100644 --- a/hsweb-logging/hsweb-access-logging-aop/src/main/java/org/hswebframework/web/logging/aop/ReactiveAopAccessLoggerSupport.java +++ b/hsweb-logging/hsweb-access-logging-aop/src/main/java/org/hswebframework/web/logging/aop/ReactiveAopAccessLoggerSupport.java @@ -1,6 +1,7 @@ package org.hswebframework.web.logging.aop; -import lombok.*; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; import org.aopalliance.intercept.MethodInterceptor; import org.hswebframework.web.aop.MethodInterceptorHolder; import org.hswebframework.web.authorization.Authentication; @@ -9,7 +10,6 @@ import org.hswebframework.web.logger.ReactiveLogger; import org.hswebframework.web.logging.*; import org.hswebframework.web.logging.events.AccessLoggerAfterEvent; import org.hswebframework.web.logging.events.AccessLoggerBeforeEvent; -import org.hswebframework.web.utils.FluxCache; import org.hswebframework.web.utils.ReactiveWebUtils; import org.springframework.aop.support.StaticMethodMatcherPointcutAdvisor; import org.springframework.beans.factory.annotation.Autowired; @@ -22,16 +22,15 @@ import org.springframework.util.ConcurrentReferenceHashMap; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilterChain; -import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.context.Context; +import reactor.util.context.ContextView; +import javax.annotation.Nonnull; import java.lang.reflect.Method; -import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; /** * 使用AOP记录访问日志,并触发{@link AccessLoggerListener#onLogger(AccessLoggerInfo)} @@ -39,7 +38,7 @@ import java.util.concurrent.atomic.AtomicReference; * @author zhouhao * @since 3.0 */ -public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutAdvisor implements WebFilter{ +public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutAdvisor implements WebFilter { @Autowired(required = false) private final List loggerParsers = new ArrayList<>(); @@ -59,10 +58,10 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA Object response = methodInvocation.proceed(); if (response instanceof Mono) { return wrapMonoResponse(((Mono) response), info) - .subscriberContext(Context.of(AccessLoggerInfo.class, info)); + .contextWrite(Context.of(AccessLoggerInfo.class, info)); } else if (response instanceof Flux) { return wrapFluxResponse(((Flux) response), info) - .subscriberContext(Context.of(AccessLoggerInfo.class, info)); + .contextWrite(Context.of(AccessLoggerInfo.class, info)); } return response; }); @@ -70,29 +69,28 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA private Mono currentRequestInfo() { return Mono - .subscriberContext() - .handle((context, sink) -> { + .deferContextual(context -> { if (context.hasKey(RequestInfo.class)) { RequestInfo info = context.get(RequestInfo.class); ReactiveLogger.log(context, ctx -> info.setContext(new HashMap<>(ctx))); - sink.next(info); + return Mono.just(info); } + return Mono.empty(); }); } protected Flux wrapFluxResponse(Flux flux, AccessLoggerInfo loggerInfo) { - return Flux.deferWithContext(ctx -> this + return Flux.deferContextual(ctx -> this .currentRequestInfo() .doOnNext(loggerInfo::putAccessInfo) .then(beforeRequest(loggerInfo)) .thenMany(flux) .doOnError(loggerInfo::setException) .doFinally(signal -> completeRequest(loggerInfo, ctx))) - .subscriberContext(ReactiveLogger.start("accessLogId", loggerInfo.getId())); + .contextWrite(ReactiveLogger.start("accessLogId", loggerInfo.getId())); } private Mono beforeRequest(AccessLoggerInfo loggerInfo) { - AccessLoggerBeforeEvent event = new AccessLoggerBeforeEvent(loggerInfo); return Authentication .currentReactive() @@ -106,14 +104,14 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA "userName", auth.getUser().getName()) .thenReturn(auth); }) - .then(event.publish(eventPublisher)); + .then(Mono.defer(() -> event.publish(eventPublisher))); } - private void completeRequest(AccessLoggerInfo loggerInfo, Context ctx) { + private void completeRequest(AccessLoggerInfo loggerInfo, ContextView ctx) { loggerInfo.setResponseTime(System.currentTimeMillis()); new AccessLoggerAfterEvent(loggerInfo) .publish(eventPublisher) - .subscriberContext(ctx) + .contextWrite(ctx) .subscribe(); } @@ -189,7 +187,7 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA } @Override - public boolean matches(Method method, Class aClass) { + public boolean matches(@Nonnull Method method,@Nonnull Class aClass) { AccessLogger ann = AnnotationUtils.findAnnotation(method, AccessLogger.class); if (ann != null && ann.ignore()) { return false; @@ -200,10 +198,11 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA } @Override - public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { + @Nonnull + public Mono filter(@Nonnull ServerWebExchange exchange,@Nonnull WebFilterChain chain) { return chain .filter(exchange) - .subscriberContext(Context.of(RequestInfo.class, createAccessInfo(exchange))); + .contextWrite(Context.of(RequestInfo.class, createAccessInfo(exchange))); } private RequestInfo createAccessInfo(ServerWebExchange exchange) { diff --git a/hsweb-starter/src/test/java/org/hswebframework/web/starter/jackson/CustomJackson2jsonEncoderTest.java b/hsweb-starter/src/test/java/org/hswebframework/web/starter/jackson/CustomJackson2jsonEncoderTest.java index c9a6c79b4..206950454 100644 --- a/hsweb-starter/src/test/java/org/hswebframework/web/starter/jackson/CustomJackson2jsonEncoderTest.java +++ b/hsweb-starter/src/test/java/org/hswebframework/web/starter/jackson/CustomJackson2jsonEncoderTest.java @@ -56,7 +56,7 @@ public class CustomJackson2jsonEncoderTest { .as(DataBufferUtils::join) .map(buf -> buf.toString(StandardCharsets.UTF_8)) .doOnNext(System.out::println) - .subscriberContext(LocaleUtils.useLocale(locale)) + .contextWrite(LocaleUtils.useLocale(locale)) .as(StepVerifier::create) .expectNextMatches(verify) .verifyComplete();