当前位置:   article > 正文

netty-socketio 集群随记

netty-socketio 集群

实现netty-socketio集群的方式

代码实例

  1. @PostConstruct
  2. public void subscribe() {
  3. pubSubStore.subscribe(PubSubType.DISPATCH, new PubSubListener<DispatchMessage>() {
  4. @Override
  5. public void onMessage(DispatchMessage message) {
  6. log.debug("subscribe: {}" ,message);
  7. Collection<SocketIOClient> clients = null;
  8. String room = message.getRoom();
  9. String namespace = message.getNamespace();
  10. Packet packet = message.getPacket();
  11. if (!ObjectUtils.isEmpty(namespace)&&!ObjectUtils.isEmpty(room)){
  12. SocketIONamespace socketIONamespace = socketIOServer.getNamespace(namespace);
  13. if (socketIONamespace != null){
  14. clients = socketIONamespace.getRoomOperations(room).getClients();
  15. }
  16. }else{
  17. clients = socketIOServer.getBroadcastOperations().getClients();
  18. }
  19. if(!CollectionUtils.isEmpty(clients)){
  20. clients.parallelStream().forEach(ioClient -> {
  21. ioClient.sendEvent(Event.Event,packet.getData());
  22. });
  23. }
  24. }
  25. }, DispatchMessage.class);
  26. }
  27. }
  1. @Component
  2. @Slf4j
  3. public class ClientUtil {
  4. @Resource
  5. private SocketIOServer socketIOServer;
  6. public void sendEvent(SocketIOClient client, MessageModel model) {
  7. log.debug("sendEvent--------------------------:{}", model);
  8. client.sendEvent(Event.Event, model);
  9. }
  10. public void sendNamespaceRoomEventExcluded(SocketIOClient client, MessageModel model) {
  11. log.debug("sendNamespaceRoomEventExcluded:{}", model);
  12. socketIOServer.getNamespace(model.getNamespace()).getRoomOperations(model.getRoomId()).sendEvent(Event.Event, client, model);
  13. }
  14. public void sendNamespaceRoomEvent(MessageModel model) {
  15. log.debug("sendNamespaceRoomEvent:{}", model);
  16. socketIOServer.getNamespace(model.getNamespace()).getRoomOperations(model.getRoomId()).sendEvent(Event.Event, model);
  17. }
  18. public void sendNamespaceEvent(MessageModel model) {
  19. log.debug("sendNamespaceRoomEvent:{}", model);
  20. socketIOServer.getNamespace(model.getNamespace()).getAllClients().parallelStream().forEach(client -> {
  21. client.sendEvent(Event.Event, model);
  22. });
  23. }
  24. public void sendRoomEvent(MessageModel model) {
  25. log.debug("sendNamespaceRoomEvent:{}", model);
  26. socketIOServer.getRoomOperations(model.getRoomId()).sendEvent(Event.Event, model);
  27. }
  28. }

源码

查看源码可发现sendEvent方法最终还是调用了dispatch方法,由此可见我们只需要实现订阅就行。

  1. /**
  2. * broadcast interface
  3. *
  4. */
  5. public interface BroadcastOperations extends ClientOperations {
  6. Collection<SocketIOClient> getClients();
  7. <T> void send(Packet packet, BroadcastAckCallback<T> ackCallback);
  8. void sendEvent(String name, SocketIOClient excludedClient, Object... data);
  9. void sendEvent(String name, Predicate<SocketIOClient> excludePredicate, Object... data);
  10. <T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback);
  11. <T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback);
  12. <T> void sendEvent(String name, Object data, Predicate<SocketIOClient> excludePredicate, BroadcastAckCallback<T> ackCallback);
  13. }
  1. @Override
  2. public void sendEvent(String name, Predicate<SocketIOClient> excludePredicate, Object... data) {
  3. Packet packet = new Packet(PacketType.MESSAGE, EngineIOVersion.UNKNOWN);
  4. packet.setSubType(PacketType.EVENT);
  5. packet.setName(name);
  6. packet.setData(Arrays.asList(data));
  7. for (SocketIOClient client : clients) {
  8. packet.setEngineIOVersion(client.getEngineIOVersion());
  9. if (excludePredicate.test(client)) {
  10. continue;
  11. }
  12. client.send(packet);
  13. }
  14. dispatch(packet);
  15. }
  1. private void dispatch(Packet packet) {
  2. this.storeFactory.pubSubStore().publish(
  3. PubSubType.DISPATCH,
  4. new DispatchMessage(this.room, packet, this.namespace));
  5. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/973949
推荐阅读
相关标签
  

闽ICP备14008679号