diff --git a/hsweb-message/hsweb-message-redis/pom.xml b/hsweb-message/hsweb-message-redis/pom.xml new file mode 100644 index 000000000..c3577e3ac --- /dev/null +++ b/hsweb-message/hsweb-message-redis/pom.xml @@ -0,0 +1,26 @@ + + + + hsweb-message + org.hswebframework.web + 3.0-SNAPSHOT + + 4.0.0 + + hsweb-message-redis + + + + + org.hswebframework.web + hsweb-message-api + ${project.version} + + + org.redisson + redisson + + + \ No newline at end of file diff --git a/hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissionMessageSubscribe.java b/hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissionMessageSubscribe.java new file mode 100644 index 000000000..d26e37b35 --- /dev/null +++ b/hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissionMessageSubscribe.java @@ -0,0 +1,82 @@ +package org.hswebframework.web.message.redis; + +import org.hswebframework.web.message.Message; +import org.hswebframework.web.message.MessageSubject; +import org.hswebframework.web.message.MessageSubscribe; +import org.hswebframework.web.message.support.TopicMessageSubject; +import org.hswebframework.web.message.support.UserMessageSubject; +import org.redisson.api.*; +import org.redisson.codec.SerializationCodec; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +/** + * @author zhouhao + */ +public class RedissionMessageSubscribe implements MessageSubscribe { + private MessageSubject iam; + private RedissonClient redisson; + + private boolean running = false; + + private List> consumers = new ArrayList<>(); + + public RedissionMessageSubscribe(MessageSubject iam, RedissonClient redisson) { + this.iam = iam; + this.redisson = redisson; + } + + public RedissionMessageSubscribe(RedissonClient redisson) { + this.redisson = redisson; + } + + @Override + public MessageSubscribe iam(MessageSubject iam) { + this.iam = iam; + return this; + } + + @Override + public MessageSubscribe onMessage(Consumer consumer) { + consumers.add(consumer); + if (!running) { + doRun(); + } + return this; + } + + private static SerializationCodec codec = new SerializationCodec(); + + private void doRun() { + if (iam instanceof UserMessageSubject) { + RQueue queue = redisson + .getQueue("queue_user_" + ((UserMessageSubject) iam).getUserId(), codec); + RCountDownLatch countDownLatch = redisson.getCountDownLatch("cdl_user_" + ((UserMessageSubject) iam).getUserId()); + Thread thread = new Thread(() -> { + while (running) { + try { + countDownLatch.trySetCount(1); + countDownLatch.await(); + consumers.forEach(cons -> cons.accept(queue.peek())); + } catch (InterruptedException e) { + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } + }); + running = true; + thread.start(); + return; + } + if (iam instanceof TopicMessageSubject) { + RTopic topic = redisson.getTopic("topic_" + ((TopicMessageSubject) iam).getTopic(), codec); + topic.addListener((channel, msg) -> consumers.forEach(cons -> cons.accept(msg))); + } + running = true; + } +} diff --git a/hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissonMessagePublish.java b/hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissonMessagePublish.java new file mode 100644 index 000000000..4aeb59a1b --- /dev/null +++ b/hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissonMessagePublish.java @@ -0,0 +1,83 @@ +package org.hswebframework.web.message.redis; + +import org.hswebframework.web.message.Message; +import org.hswebframework.web.message.MessagePublish; +import org.hswebframework.web.message.MessageSubject; +import org.hswebframework.web.message.support.MultipleUserMessageSubject; +import org.hswebframework.web.message.support.TopicMessageSubject; +import org.hswebframework.web.message.support.UserMessageSubject; +import org.redisson.api.RCountDownLatch; +import org.redisson.api.RQueue; +import org.redisson.api.RTopic; +import org.redisson.api.RedissonClient; +import org.redisson.codec.SerializationCodec; + +import java.util.function.Consumer; + +/** + * TODO 完成注释 + * + * @author zhouhao + */ +public class RedissonMessagePublish implements MessagePublish { + private MessageSubject from; + private MessageSubject to; + private RedissonClient redissonClient; + private Message message; + + public RedissonMessagePublish(RedissonClient redissonClient, Message message) { + this.redissonClient = redissonClient; + this.message = message; + } + + @Override + public MessagePublish from(MessageSubject subject) { + this.from = subject; + return this; + } + + @Override + public MessagePublish to(MessageSubject subject) { + this.to = subject; + return this; + } + + @Override + public MessagePublish deleteOnTimeout(long timeOutSecond) { + return null; + } + + private boolean useQueue() { + return to instanceof UserMessageSubject || to instanceof MultipleUserMessageSubject; + } + + private SerializationCodec codec = new SerializationCodec(); + + private Consumer queueConsumer = id -> { + RQueue queue = redissonClient.getQueue("queue_user_" + id, codec); + RCountDownLatch downLatch = redissonClient.getCountDownLatch("cdl_user_" + id); + queue.add(message); + downLatch.countDown(); + }; + + @Override + public T send() { + if (to instanceof UserMessageSubject) { + queueConsumer.accept(((UserMessageSubject) to).getUserId()); + } + if (to instanceof MultipleUserMessageSubject) { + ((MultipleUserMessageSubject) to).getUserIdList().forEach(queueConsumer); + } + if (to instanceof TopicMessageSubject) { + RTopic topic = redissonClient.getTopic("topic_" + ((TopicMessageSubject) to).getTopic(), codec); + topic.publish(message); + } + throw new UnsupportedOperationException(); + } + + @Override + public void send(Consumer responseConsumer) { + responseConsumer.accept(send()); + } + +} diff --git a/hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissonMessager.java b/hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissonMessager.java new file mode 100644 index 000000000..44919cf27 --- /dev/null +++ b/hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissonMessager.java @@ -0,0 +1,30 @@ +package org.hswebframework.web.message.redis; + +import org.hswebframework.web.message.*; +import org.redisson.api.RedissonClient; + +/** + * @author zhouhao + */ +public class RedissonMessager implements Messager { + + private RedissonClient redisson; + + public RedissonMessager(RedissonClient redisson) { + this.redisson = redisson; + } + + public void setRedisson(RedissonClient redisson) { + this.redisson = redisson; + } + + @Override + public MessagePublish publish(Message message) { + return new RedissonMessagePublish(redisson, message); + } + + @Override + public MessageSubscribe subscribe(MessageSubject subscribe) { + return new RedissionMessageSubscribe<>(subscribe, redisson); + } +} diff --git a/hsweb-message/hsweb-message-redis/src/test/java/org/hswebframework/web/message/redis/RedissonMessagerTest.java b/hsweb-message/hsweb-message-redis/src/test/java/org/hswebframework/web/message/redis/RedissonMessagerTest.java new file mode 100644 index 000000000..03f337ae8 --- /dev/null +++ b/hsweb-message/hsweb-message-redis/src/test/java/org/hswebframework/web/message/redis/RedissonMessagerTest.java @@ -0,0 +1,48 @@ +package org.hswebframework.web.message.redis; + +import org.hswebframework.web.message.Messager; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; + +import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.topic; + +/** + * TODO 完成注释 + * + * @author zhouhao + */ +public class RedissonMessagerTest { + + + public void testSimple() { + + } + + public static void main(String[] args) throws InterruptedException { + Config config = new Config(); + config.useSingleServer().setAddress("127.0.0.1:6379"); + RedissonClient redisson = Redisson.create(config); + Messager messager = new RedissonMessager(redisson); + + byte[] stat = new byte[1]; + +// new Thread(() -> { +// for (int i = 0; i < 1000; i++) { +// try { +// Thread.sleep(1000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// messager.publish(text("hello2")) +// .to(topic("test")) +// .from(user("admin")) +// .send(); +// } +// }).start(); + messager.subscribe(topic("test")) + .onMessage(System.out::println) + .onMessage(msg -> stat[0] = 1); + //redisson.shutdown(); + } +} \ No newline at end of file