赞
踩
- @PostConstruct
- public void subscribe() {
- pubSubStore.subscribe(PubSubType.DISPATCH, new PubSubListener<DispatchMessage>() {
- @Override
- public void onMessage(DispatchMessage message) {
- log.debug("subscribe: {}" ,message);
- Collection<SocketIOClient> clients = null;
- String room = message.getRoom();
- String namespace = message.getNamespace();
- Packet packet = message.getPacket();
- if (!ObjectUtils.isEmpty(namespace)&&!ObjectUtils.isEmpty(room)){
- SocketIONamespace socketIONamespace = socketIOServer.getNamespace(namespace);
- if (socketIONamespace != null){
- clients = socketIONamespace.getRoomOperations(room).getClients();
- }
- }else{
- clients = socketIOServer.getBroadcastOperations().getClients();
- }
- if(!CollectionUtils.isEmpty(clients)){
- clients.parallelStream().forEach(ioClient -> {
- ioClient.sendEvent(Event.Event,packet.getData());
- });
- }
- }
- }, DispatchMessage.class);
- }
- }
- @Component
- @Slf4j
- public class ClientUtil {
-
- @Resource
- private SocketIOServer socketIOServer;
-
- public void sendEvent(SocketIOClient client, MessageModel model) {
- log.debug("sendEvent--------------------------:{}", model);
- client.sendEvent(Event.Event, model);
- }
-
- public void sendNamespaceRoomEventExcluded(SocketIOClient client, MessageModel model) {
- log.debug("sendNamespaceRoomEventExcluded:{}", model);
- socketIOServer.getNamespace(model.getNamespace()).getRoomOperations(model.getRoomId()).sendEvent(Event.Event, client, model);
- }
-
- public void sendNamespaceRoomEvent(MessageModel model) {
- log.debug("sendNamespaceRoomEvent:{}", model);
- socketIOServer.getNamespace(model.getNamespace()).getRoomOperations(model.getRoomId()).sendEvent(Event.Event, model);
- }
-
- public void sendNamespaceEvent(MessageModel model) {
- log.debug("sendNamespaceRoomEvent:{}", model);
- socketIOServer.getNamespace(model.getNamespace()).getAllClients().parallelStream().forEach(client -> {
- client.sendEvent(Event.Event, model);
- });
- }
-
- public void sendRoomEvent(MessageModel model) {
- log.debug("sendNamespaceRoomEvent:{}", model);
- socketIOServer.getRoomOperations(model.getRoomId()).sendEvent(Event.Event, model);
- }
- }
查看源码可发现sendEvent方法最终还是调用了dispatch方法,由此可见我们只需要实现订阅就行。
- /**
- * broadcast interface
- *
- */
- public interface BroadcastOperations extends ClientOperations {
-
- Collection<SocketIOClient> getClients();
-
- <T> void send(Packet packet, BroadcastAckCallback<T> ackCallback);
-
- void sendEvent(String name, SocketIOClient excludedClient, Object... data);
-
- void sendEvent(String name, Predicate<SocketIOClient> excludePredicate, Object... data);
-
- <T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback);
-
- <T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback);
-
- <T> void sendEvent(String name, Object data, Predicate<SocketIOClient> excludePredicate, BroadcastAckCallback<T> ackCallback);
-
- }
- @Override
- public void sendEvent(String name, Predicate<SocketIOClient> excludePredicate, Object... data) {
- Packet packet = new Packet(PacketType.MESSAGE, EngineIOVersion.UNKNOWN);
- packet.setSubType(PacketType.EVENT);
- packet.setName(name);
- packet.setData(Arrays.asList(data));
-
- for (SocketIOClient client : clients) {
- packet.setEngineIOVersion(client.getEngineIOVersion());
- if (excludePredicate.test(client)) {
- continue;
- }
- client.send(packet);
- }
- dispatch(packet);
- }
- private void dispatch(Packet packet) {
- this.storeFactory.pubSubStore().publish(
- PubSubType.DISPATCH,
- new DispatchMessage(this.room, packet, this.namespace));
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。