From 07e0fc41dc9adefa3ee17cada0acb8feb0653256 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Fri, 10 Apr 2020 18:43:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BC=93=E5=AD=98=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E5=A4=84=E7=90=86=EF=BC=8Credis=E5=87=BA=E9=94=99?= =?UTF-8?q?=E6=97=B6=E5=BF=BD=E7=95=A5=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cache/supports/RedisReactiveCache.java | 64 +++++++++++++------ 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/RedisReactiveCache.java b/hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/RedisReactiveCache.java index b995a7f57..1d3295082 100644 --- a/hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/RedisReactiveCache.java +++ b/hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/RedisReactiveCache.java @@ -1,5 +1,6 @@ package org.hswebframework.web.cache.supports; +import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.cache.ReactiveCache; import org.reactivestreams.Publisher; import org.springframework.data.redis.connection.ReactiveSubscription; @@ -14,13 +15,14 @@ import java.util.function.Function; import java.util.stream.StreamSupport; @SuppressWarnings("all") +@Slf4j public class RedisReactiveCache implements ReactiveCache { private ReactiveRedisOperations operations; private String redisKey; - private ReactiveCache localCache; + private ReactiveCache localCache; private String topicName; @@ -45,16 +47,26 @@ public class RedisReactiveCache implements ReactiveCache { public Flux getFlux(Object key) { return localCache .getFlux(key) - .switchIfEmpty(operations - .opsForHash() - .get(redisKey, key) - .flatMapIterable(r -> { - if (r instanceof Iterable) { - return ((Iterable) r); - } - return Collections.singletonList(r); - }) - .map(Function.identity())); + .switchIfEmpty(Flux.defer(() -> { + return operations + .opsForHash() + .get(redisKey, key) + .flatMapIterable(r -> { + if (r instanceof Iterable) { + return ((Iterable) r); + } + return Collections.singletonList(r); + }) + .map(r -> (E) r); + })) + .onErrorResume(err -> this.handleError((Throwable) err)); + + } + + protected Mono handleError(Throwable error) { + return Mono.fromRunnable(() -> { + log.error(error.getMessage(), error); + }); } @Override @@ -62,24 +74,29 @@ public class RedisReactiveCache implements ReactiveCache { return localCache.getMono(key) .switchIfEmpty(operations.opsForHash() .get(redisKey, key) + .map(v -> (E) v) .flatMap(r -> localCache.put(key, Mono.just(r)) - .thenReturn(r))); + .thenReturn(r))) + .onErrorResume(err -> this.handleError(err)); } @Override public Mono put(Object key, Publisher data) { if (data instanceof Mono) { - return ((Mono) data) + return ((Mono) data) .flatMap(r -> { return operations.opsForHash() .put(redisKey, key, r) .then(localCache.put(key, data)) .then(operations.convertAndSend(topicName, key)); - }).then(); + }) + .then() + .onErrorResume(err -> this.handleError(err)) + ; } if (data instanceof Flux) { - return ((Flux) data) + return ((Flux) data) .collectList() .flatMap(r -> { return operations.opsForHash() @@ -87,7 +104,10 @@ public class RedisReactiveCache implements ReactiveCache { .then(localCache.put(key, data)) .then(operations.convertAndSend(topicName, key)); - }).then(); + }) + .then() + .onErrorResume(err -> this.handleError(err)) + ; } return Mono.error(new UnsupportedOperationException("unsupport publisher:" + data)); } @@ -97,8 +117,11 @@ public class RedisReactiveCache implements ReactiveCache { return operations.opsForHash() .remove(redisKey, StreamSupport.stream(key.spliterator(), false).toArray()) .then(localCache.evictAll(key)) - .flatMap(nil -> Flux.fromIterable(key).flatMap(k -> operations.convertAndSend(topicName, key))) - .then(); + .flatMap(nil -> Flux.fromIterable(key) + .flatMap(k -> operations.convertAndSend(topicName, key)) + .then() + ) + .onErrorResume(err -> this.handleError(err)); } @Override @@ -112,7 +135,8 @@ public class RedisReactiveCache implements ReactiveCache { return operations.opsForHash() .multiGet(redisKey, Arrays.asList(keys)) .flatMapIterable(Function.identity()) - .map(r -> (E) r); + .map(r -> (E) r) + .onErrorResume(err -> this.handleError(err)); } @@ -123,6 +147,7 @@ public class RedisReactiveCache implements ReactiveCache { .remove(redisKey, key) .then(localCache.evict(key)) .then(operations.convertAndSend(topicName, key)) + .onErrorResume(err -> this.handleError(err)) .then(); } @@ -133,6 +158,7 @@ public class RedisReactiveCache implements ReactiveCache { .delete(redisKey) .then(localCache.clear()) .then(operations.convertAndSend(topicName, "___all")) + .onErrorResume(err -> this.handleError(err)) .then(); } }