当前位置:   article > 正文

即时通讯:服务端SpringBoot+Netty-Socket.io,客户端Socket.io.js+Java版Socket.io-client

即时通讯:服务端SpringBoot+Netty-Socket.io,客户端Socket.io.js+Java版Socket.io-client

简介:服务端SpringBoot+Netty-Socket.io,客户端Socket.io.js或Java版Socket.io-client

基于socket.io:

引入:写在前面的话

1、当你使用IM通讯技术时,还在束缚于第三方SDK?

2、当你还在处于付费享用IM聊天,提供的服务,不妨咱自己来搭建聊天室,点对点聊天,群组聊天,实时推送等服务?

同时,这里也mark一下部分核心技术生态:

分布式任务调度框架:

分布式文件系统:

链路追踪:

数据同步:

Java生态圈:

 

场景描述:通过集成xxl-job分布式调度平台,设置cron时间表达式,开启job任务轮询方式,定时调度RestFul API完成相关业务逻辑。这里只举一例,比如消息的推送。

当然,若是单体应用的话,不走分布式,可依具体场景而定,结合springboot实现定时任务即可满足定时定点推送

  1. /**
  2. * @Author: X.D.Yang
  3. * @Date: 2018/7/15 15:35
  4. * @Description:
  5. */
  6. @SpringBootApplication
  7. @EnableScheduling
  8. public class Application {
  9. public static void main(String[] args) {
  10. //System.setProperty("es.set.netty.runtime.available.processors", "false");
  11. SpringApplication.run(Application.class, args);
  12. }
  13. }
  1. /**
  2. * @Author: X.D.Yang
  3. * @Date: 2018/7/15 15:40
  4. * @Description:
  5. */
  6. @Component
  7. public class QuartzService {
  8. private static final Logger logger = LoggerFactory.getLogger(QuartzService.class);
  9. //每天晚上十二点执行
  10. @Scheduled(cron = "0 0 0 * * ?")
  11. public void monitor() {
  12. //相关业务逻辑-todo
  13. logger.info("now time: {}", DateUtils.dateToStr(new Date()));
  14. }

引发思考:虽然能够实现定时定点推送,但在一些实时性要求非常高的场景显然是需要改进的,这里也mark一下对消息一些处理的场景。

同步:发出调用时,没有得到结果前则该调用不会返回,即调用者在主动等待这个调用的结果。

RestFul API>调用xxxx中心接口推送>实时性较高,业务逻辑简单,易维护。

异步:调用发出后,调用直接返回了,没有返回结果,被调用者通过状态通知等通知调用者,或者通过回调函数处理回调结果。

xxljob+mq>通知xxxx中心推送>程序写入性能较高,在一定程度上可降低业务强耦合,但可能存在延时问题-无法保证轮询的频率和消费的频率完全一致-MQ的消费可能由于网络或其它原因导致用户写入的数据不一定是可以马上看到的。当然,异步还可以通过多线程去处理,可依具体场景而定,过多的线程调度及上下文切换是需要消耗大量CPU资源的。

需要思考的是若有这样一个场景,A 系统收到一个请求,需要在自己的系统中去操作数据库,同时还需要在 BC 俩个系统中操作数据库,A写库 100ms,BC分别写库要 350ms、450ms。最终请求总延时是 100 + 350 + 450  = 900ms,接近 1s,特别是在To C的产品中,用户对体验是最关心的,还有一些网络的延迟等,用户只会觉得你们做的系统太过lj慢透了,一般来说每个请求在 200 ms 以内完成,对用户几乎是无感知的,其实从感觉上说就像我们点个按钮,几ms以后就直接返回了,哇!那第一感觉这网站做得真好,贼快!

削峰填谷:我们都了解一般的MySQL,每秒 2k 个请求扛得差不多了,当每秒并发请求数量突然会暴增到 4k+ 条。但系统若又是直接基于 MySQL 的,没有Redis缓存,大量的请求涌入到 MySQL(说明:当然就算加了redis缓存也会有缓存穿透-雪崩,没有统一解决一切场景的方案,只有对业务场景更合不合适-就像是一个“升级打怪”的过程),或许就直接把 MySQL 给怼死了,导致服务不可用,在高并发高可用高性能DT-AI时代,这是不能够接受的。但高峰期一过,就成了低峰期,每秒请求数量可能也就几十个请求,对整个系统来说几乎没有任何压力,使用 MQ每秒 4k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,A 系统则从MQ 中每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量,这样就算高峰期,A 系统也绝对不会至于挂掉不可用。

需要思考的是MQ 每秒钟 4k 个请求进来,就 2k 个请求出去,结果就导致在高峰期时间段,可能有几十万甚至几百万的请求积压在 MQ 中,当然这个短暂的高峰期积压是 ok 的,在高峰期过了之后,每秒钟就几十个请求进 MQ,但是 A 系统依然会按照每秒 2k 个请求的速度在处理,只要高峰期一过,A 系统就会快速将积压的消息给解决掉。总的来说上游限速发行,下游限速执行。这里也对mq-kafka生产者跟消费者作下说明:

  1. kafka:
  2. consumer:
  3. auto:
  4. commit:
  5. interval: 100
  6. offset:
  7. #该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
  8. #latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
  9. #earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
  10. reset: latest
  11. #并发数
  12. concurrency: 3
  13. #消息签收机制:手动签收
  14. enable:
  15. auto:
  16. commit: false
  17. #最大拉取数
  18. max:
  19. poll:
  20. records: 100
  21. #消费组
  22. group:
  23. id: consumer-group
  24. servers: ip:9092
  25. session:
  26. timeout: 6000
  27. zookeeper:
  28. connect: ip:2181
  29. producer:
  30. batch:
  31. size: 65536
  32. buffer:
  33. memory: 52428800
  34. max:
  35. request:
  36. size: 31457280
  37. servers: ip:9092

i、生产者>acks=0:生产者在成功写入消息之前不会等待任何来自服务器的响应;acks=1:只要集群的首领节点收到消息,生产者收到一个服务器成功响应;当acks=-1的时候分区leader必须等待消息被成功写入所有ISR副本(同步副本)才认为producer请求成功,提供最高的消息持久性保证,理论上吞吐率最差,生产者也可设置批量发送数据-batch-size 65536;
ii、消费者>批量消费@KafkaListener支持,设置batchListener为true:

  1. @Bean
  2. public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
  3. ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
  4. new ConcurrentKafkaListenerContainerFactory<>();
  5. factory.setConsumerFactory(consumerFactory);
  6. factory.setConcurrency(10);
  7. factory.getContainerProperties().setPollTimeout(1400);
  8. //设置为批量消费,每个批次数量在Kafka配置参数中设置
  9. factory.setBatchListener(true);
  10. //设置手动提交
  11. ackModefactory.getContainerProperties().setAckMode(
  12. ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  13.   return factory;
  14. }

其中containerFactory = “batchFactory”指定为批量消费,

  1. //批量消费
  2. @KafkaListener(topics = {"yxd179"},containerFactory="batchFactory")
  3. public void consumerBatch(List<ConsumerRecord<?, ?>> records, Acknowledgment ack){
  4. log.info("接收消息数量:{}",record.size());
  5.    //手动提交-业务逻辑成功处理后提交offset>消息重复消费
  6.    ack.acknowledge();
  7. }

当然,批量消费也可以结合springboot屏蔽kafka自动配置,引入我们自定义的配置:

@SpringBootApplication(scanBasePackages ={"com.yxd"},exclude = {KafkaAutoConfiguration.class})

其中新增Kafka配置项等不作具体阐述,多线程并发消费场景-不能保证原始分区消息的顺序,接入模拟大数据量批处理Test:

  1. @Test
  2. public void testSendKafka() throws InterruptedException {
  3. int clientTotal = 10000;
  4. int threadTotal = 200;
  5. ExecutorService executorService = Executors.newCachedThreadPool();
  6. //Semaphore信号量-流控手段-可对特定资源的允许同时访问的操作数量进行控制,例如:池化技术(连接池)中的并发数,有界阻塞容器的容量等
  7. final Semaphore semaphore = new Semaphore(threadTotal);
  8. //主线程等待多个工作线程结束,主线程调用-初始化计数
  9. final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
  10. for (int i = 0; i < clientTotal ; i++) {
  11. executorService.execute(() -> {
  12. try {
  13. //获取到许可,才可继续执行任务,若获取失败,则进入阻塞
  14. semaphore.acquire();
  15. String log = "TEST TEST TEST TEST TEST TEST TEST TEST";
  16. kafkaTemplate.send("yxd179", log);
  17. /*ListenableFuture<SendResult<String, String>> future =
  18. kafkaTemplate.send(TOPIC, JSON.toJSONString(log));
  19. //监听回调
  20. future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
  21. @Override
  22. public void onFailure(Throwable throwable) {
  23. log.info("## Send message fail insert...");
  24. log.error(throwable.getMessage());
  25. }
  26. @Override
  27. public void onSuccess(SendResult<String, String> result) {
  28. log.info("## Send message success ...");
  29. }
  30. });*/
  31. //处理完成之后,release释放许可,当然在一个线程中获得的许可可在另一个线程中释放
  32. semaphore.release();
  33. } catch (Exception e) {
  34. log.error("e >>> ", e);
  35. }
  36. //工作线程调用-计数减一
  37. countDownLatch.countDown();
  38. });
  39. }
  40. //主线程调用-阻塞,直到等待计数为0解除阻塞,各线程之间不再互相影响,可以继续做自己的事情了,不再执行下一个目标工作
  41. countDownLatch.await();
  42. executorService.shutdown();
  43. }

Ok,Now:

我们回到主题-即时通讯-聊天:开启长连接方式,实现即时通讯-在线实时聊天-实时推送>

首先,在pom.xml中引入依赖,

  1. <!-- netty-socketio 其版本:1.7.12 -->
  2. <dependency>
  3. <groupId>com.corundumstudio.socketio</groupId>
  4. <artifactId>netty-socketio</artifactId>
  5. <version>${netty-socketio.version}</version>
  6. </dependency>
  7. <!-- socket.io-client 其版本:1.0.0,也可通过github源码编译更高版本jar -->
  8. <dependency>
  9. <groupId>io.socket</groupId>
  10. <artifactId>socket.io-client</artifactId>
  11. <version>${socket.io-client.version}</version>
  12. </dependency>
  13. &
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/83529
推荐阅读
相关标签
  

闽ICP备14008679号