From 4312f03be8e9dedf83daebc327cf89c12fb793e5 Mon Sep 17 00:00:00 2001 From: zhouhao Date: Mon, 15 May 2017 12:02:49 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96message?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hsweb-message/hsweb-message-websocket/pom.xml | 24 ++++-- .../message/DefaultWebSocketMessager.java | 79 +++++++++++++++---- .../web/socket/message/WebSocketMessager.java | 36 ++++++++- .../CommandWebSocketAutoConfiguration.java | 6 +- .../web/socket/TestProcessor.java | 47 ++--------- .../web/socket/WebSocketClientTests.java | 6 +- .../web/socket/WebSocketServerTests.java | 13 ++- .../src/test/resources/application.yml | 7 +- 8 files changed, 144 insertions(+), 74 deletions(-) diff --git a/hsweb-message/hsweb-message-websocket/pom.xml b/hsweb-message/hsweb-message-websocket/pom.xml index ba5df0108..f8b6d52cd 100644 --- a/hsweb-message/hsweb-message-websocket/pom.xml +++ b/hsweb-message/hsweb-message-websocket/pom.xml @@ -19,12 +19,12 @@ ${project.version} test - - org.hswebframework.web - hsweb-message-memory - ${project.version} - test - + + + + + + org.hswebframework.web @@ -32,6 +32,18 @@ ${project.version} test + + org.hswebframework.web + hsweb-concurrent-counter-redis + ${project.version} + test + + + + org.hswebframework.web + hsweb-concurrent-counter-api + ${project.version} + org.hswebframework.web diff --git a/hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/DefaultWebSocketMessager.java b/hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/DefaultWebSocketMessager.java index 7ccafc487..d3af9d60b 100644 --- a/hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/DefaultWebSocketMessager.java +++ b/hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/DefaultWebSocketMessager.java @@ -1,5 +1,8 @@ package org.hswebframework.web.socket.message; +import org.hswebframework.web.concurrent.counter.Counter; +import org.hswebframework.web.concurrent.counter.CounterManager; +import org.hswebframework.web.concurrent.counter.SimpleCounterManager; import org.hswebframework.web.message.MessageSubscribe; import org.hswebframework.web.message.Messager; import org.hswebframework.web.message.support.ObjectMessage; @@ -7,8 +10,11 @@ import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; +import java.util.Collection; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import static org.hswebframework.web.message.builder.StaticMessageBuilder.object; import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.*; @@ -23,61 +29,96 @@ public class DefaultWebSocketMessager implements WebSocketMessager { private Messager messager; public DefaultWebSocketMessager(Messager messager) { - this.messager = messager; + this(messager, new SimpleCounterManager()); } - // command, userId, sessionId + public DefaultWebSocketMessager(Messager messager, CounterManager counterManager) { + this.messager = messager; + this.counterManager = counterManager == null ? new SimpleCounterManager() : counterManager; + } + + // command, type, sessionId private final Map>> store = new ConcurrentHashMap<>(32); + private CounterManager counterManager = new SimpleCounterManager(); + + @Override public void onSessionConnect(WebSocketSession session) { } - @Override - public void onSessionClose(WebSocketSession session) { - + private String getSubTotalKey(String command, String type) { + return "sub_".concat(command) + .concat("_") + .concat(type) + .concat("_total"); } @Override - public void publish(String toUser, WebSocketMessage message) { + public int getSubscribeTotal(String command, String type) { + return (int) counterManager.getCounter(getSubTotalKey(command, type)).get(); + } + + @Override + public void onSessionClose(WebSocketSession session) { + store.values() //command + .stream().map(Map::values).flatMap(Collection::stream) + .map(sessionStore -> sessionStore.get(session.getId())) + .filter(Objects::nonNull) + .forEach(MessageSubscribeSession::cancel); + } + + @Override + public void publish(String command, String type, WebSocketMessage message) { messager.publish(object(message)) - .to(user(toUser)) + .to(TYPE_QUEUE.equals(type) ? queue("queue_" + command) : topic("topic_" + command)) .send(); } - private Map getSubSession(String command, String userId) { + private Map getSubSession(String command, String type) { return store.computeIfAbsent(command, cmd -> new ConcurrentHashMap<>(128)) - .computeIfAbsent(userId, uid -> new ConcurrentHashMap<>()); + .computeIfAbsent(type, t -> new ConcurrentHashMap<>()); } @Override - public boolean subscribe(String command, String userId, WebSocketSession socketSession) { - Map subscribeSessionStore = getSubSession(command, userId); + public boolean subscribe(String command, String type, WebSocketSession socketSession) { + Map subscribeSessionStore = getSubSession(command, type); subscribeSessionStore.computeIfAbsent(socketSession.getId(), sessionId -> { - MessageSubscribe> subscribe = messager.subscribe(user(userId)); + MessageSubscribe> subscribe = messager + .subscribe(TYPE_QUEUE.equals(type) ? queue("queue_" + command) : topic("topic_" + command)); subscribe.onMessage(message -> { try { if (!socketSession.isOpen()) { - deSubscribe(command, userId, socketSession); + deSubscribe(command, type, socketSession); + return; } socketSession.sendMessage(new TextMessage(((ObjectMessage) message).getObject().toString())); } catch (IOException e) { e.printStackTrace(); } }); - return new MessageSubscribeSession(subscribe, socketSession); + return new MessageSubscribeSession(subscribe, socketSession) { + @Override + public void cancel() { + super.cancel(); + counterManager.getCounter(getSubTotalKey(command, type)).decrement(); + } + }; }); - return false; + counterManager.getCounter(getSubTotalKey(command, type)).increment(); + return true; } @Override - public boolean deSubscribe(String command, String userId, WebSocketSession socketSession) { - Map subscribeSessionStore = getSubSession(command, userId); + public boolean deSubscribe(String command, String type, WebSocketSession socketSession) { + Map subscribeSessionStore = getSubSession(command, type); MessageSubscribeSession subscribeSession = subscribeSessionStore.get(socketSession.getId()); if (null != subscribeSession) { subscribeSession.getSubscribe().cancel(); subscribeSessionStore.remove(socketSession.getId()); + counterManager.getCounter(getSubTotalKey(command, type)).decrement(); + return true; } return false; } @@ -107,5 +148,9 @@ public class DefaultWebSocketMessager implements WebSocketMessager { public void setSession(WebSocketSession session) { this.session = session; } + + public void cancel() { + subscribe.cancel(); + } } } diff --git a/hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/WebSocketMessager.java b/hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/WebSocketMessager.java index ec08e7cac..e8390d469 100644 --- a/hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/WebSocketMessager.java +++ b/hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/WebSocketMessager.java @@ -9,10 +9,40 @@ import org.springframework.web.socket.WebSocketSession; * @author zhouhao */ public interface WebSocketMessager extends WebSocketSessionListener { - void publish(String toUser, WebSocketMessage message); - boolean subscribe(String command, String userId, WebSocketSession socketSession); + String TYPE_QUEUE = "queue"; - boolean deSubscribe(String command, String userId, WebSocketSession socketSession); + String TYPE_TOPIC = "topic"; + default void publishQueue(String command, WebSocketMessage message) { + publish(command, TYPE_QUEUE, message); + } + + default void publishTopic(String command, WebSocketMessage message) { + publish(command, TYPE_TOPIC, message); + } + + void publish(String command, String type, WebSocketMessage message); + + int getSubscribeTotal(String command, String type); + + boolean subscribe(String command, String type, WebSocketSession socketSession); + + default boolean subscribeQueue(String command, WebSocketSession socketSession) { + return subscribe(command, TYPE_QUEUE, socketSession); + } + + default boolean subscribeTopic(String command, WebSocketSession socketSession) { + return subscribe(command, TYPE_TOPIC, socketSession); + } + + boolean deSubscribe(String command, String type, WebSocketSession socketSession); + + default boolean deSubscribeQueue(String command, WebSocketSession socketSession) { + return deSubscribe(command, TYPE_QUEUE, socketSession); + } + + default boolean deSubscribeTopic(String command, WebSocketSession socketSession) { + return deSubscribe(command, TYPE_TOPIC, socketSession); + } } diff --git a/hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/starter/CommandWebSocketAutoConfiguration.java b/hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/starter/CommandWebSocketAutoConfiguration.java index 9ccb33986..7aeb24709 100644 --- a/hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/starter/CommandWebSocketAutoConfiguration.java +++ b/hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/starter/CommandWebSocketAutoConfiguration.java @@ -1,6 +1,7 @@ package org.hswebframework.web.socket.starter; import org.hswebframework.web.authorization.container.AuthenticationContainer; +import org.hswebframework.web.concurrent.counter.CounterManager; import org.hswebframework.web.message.Messager; import org.hswebframework.web.socket.WebSocketSessionListener; import org.hswebframework.web.socket.handler.CommandWebSocketMessageDispatcher; @@ -47,9 +48,12 @@ public class CommandWebSocketAutoConfiguration { @ConditionalOnBean(Messager.class) @ConditionalOnMissingBean(WebSocketMessager.class) public static class WebSocketMessagerConfiguration { + @Autowired(required = false) + private CounterManager counterManager; + @Bean public WebSocketMessager webSocketMessager(Messager messager) { - return new DefaultWebSocketMessager(messager); + return new DefaultWebSocketMessager(messager,counterManager); } } diff --git a/hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/TestProcessor.java b/hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/TestProcessor.java index 26bb8ca81..f8c1bb591 100644 --- a/hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/TestProcessor.java +++ b/hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/TestProcessor.java @@ -1,21 +1,11 @@ package org.hswebframework.web.socket; -import org.hswebframework.web.message.MessageSubscribe; -import org.hswebframework.web.message.Messager; -import org.hswebframework.web.message.support.ObjectMessage; import org.hswebframework.web.socket.message.WebSocketMessage; +import org.hswebframework.web.socket.message.WebSocketMessager; import org.hswebframework.web.socket.processor.CommandProcessor; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import static org.hswebframework.web.message.builder.StaticMessageBuilder.object; -import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.queue; - /** * TODO 完成注释 * @@ -24,9 +14,7 @@ import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder public class TestProcessor implements CommandProcessor, WebSocketSessionListener { @Autowired - private Messager messager; - - private final Map>> store = new ConcurrentHashMap<>(); + private WebSocketMessager messager; @Override public String getName() { @@ -34,30 +22,11 @@ public class TestProcessor implements CommandProcessor, WebSocketSessionListener } private void sub(WebSocketSession socketSession) { - MessageSubscribe> subscribe = - store.get(socketSession.getId()); - if (subscribe != null) return; - store.put(socketSession.getId(), messager - .>subscribe(queue("test")) //订阅 queue队列 - .onMessage(message -> { - try { - if (!socketSession.isOpen()) { - deSub(socketSession); - return; - } - socketSession.sendMessage(new TextMessage(message.getObject().toString())); - } catch (IOException e) { - e.printStackTrace(); - } - })); + messager.subscribeQueue(getName(), socketSession); } private void deSub(WebSocketSession socketSession) { - MessageSubscribe> subscribe = - store.get(socketSession.getId()); - if (subscribe == null) return; - subscribe.cancel(); - store.remove(socketSession.getId()); + messager.deSubscribeQueue(getName(), socketSession); } @Override @@ -83,10 +52,8 @@ public class TestProcessor implements CommandProcessor, WebSocketSessionListener } catch (InterruptedException e) { e.printStackTrace(); } - if (store.size() > 0) { - messager.publish(object(new WebSocketMessage(200, "hello" + total++))) - .to(queue("test")) //向队列发送消息 - .send(); + if (messager.getSubscribeTotal(getName(), WebSocketMessager.TYPE_QUEUE) > 0) { + messager.publishQueue(getName(), new WebSocketMessage(200, "hello" + total++)); System.out.println(total); } } @@ -95,8 +62,6 @@ public class TestProcessor implements CommandProcessor, WebSocketSessionListener @Override public void destroy() { - store.values().forEach(MessageSubscribe::cancel); - store.clear(); } @Override diff --git a/hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketClientTests.java b/hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketClientTests.java index ba972bce0..32d210390 100644 --- a/hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketClientTests.java +++ b/hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketClientTests.java @@ -10,9 +10,9 @@ import org.springframework.web.socket.handler.AbstractWebSocketHandler; public class WebSocketClientTests { public static void main(String[] args) throws Exception { - for (int i = 0; i < 10; i++) { +// for (int i = 0; i < 10; i++) { WebSocketClient client = new StandardWebSocketClient(); - String url = "ws://localhost:8080/socket"; + String url = "ws://localhost:8081/socket"; ListenableFuture future = client.doHandshake(new AbstractWebSocketHandler() { @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { @@ -22,7 +22,7 @@ public class WebSocketClientTests { WebSocketSession socketSession = future.get(); socketSession.sendMessage(new TextMessage("{\"command\":\"test\",\"parameters\":{\"type\":\"conn\"}}")); - } +// } System.in.read(); } } diff --git a/hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketServerTests.java b/hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketServerTests.java index f1ca8bcfa..7da32784b 100644 --- a/hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketServerTests.java +++ b/hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketServerTests.java @@ -1,5 +1,8 @@ package org.hswebframework.web.socket; +import org.hswebframework.web.concurrent.counter.Counter; +import org.hswebframework.web.concurrent.counter.CounterManager; +import org.hswebframework.web.counter.redis.RedissonCounterManager; import org.hswebframework.web.message.Messager; import org.hswebframework.web.message.jms.JmsMessager; import org.redisson.Redisson; @@ -36,7 +39,15 @@ public class WebSocketServerTests { return new TestProcessor(); } -// 使用redis + @Bean + public CounterManager counterManager() { + Config config = new Config(); + config.useSingleServer().setAddress("127.0.0.1:6379"); + RedissonClient client = Redisson.create(config); + return new RedissonCounterManager(client); + } + +// // 使用redis // @Bean(destroyMethod = "shutdown") // public RedissonClient redissonClient() { // Config config = new Config(); diff --git a/hsweb-message/hsweb-message-websocket/src/test/resources/application.yml b/hsweb-message/hsweb-message-websocket/src/test/resources/application.yml index c829ac18c..c98a2f690 100644 --- a/hsweb-message/hsweb-message-websocket/src/test/resources/application.yml +++ b/hsweb-message/hsweb-message-websocket/src/test/resources/application.yml @@ -1,10 +1,13 @@ spring: activemq: - in-memory: true + broker-url: tcp://localhost:61616 + in-memory: false jms: pub-sub-domain: true hsweb: app: name: websocket测试 - version: 3.0.0 \ No newline at end of file + version: 3.0.0 +server: + port: 8081 \ No newline at end of file