当前位置:   article > 正文

乐观锁实现接口幂等性_什么是幂等性,如何实现,以及乐观锁在项目中的实际用法...

接口幂等和乐观锁

3849ea044de551d81b921a68bc79756a.png

什么是幂等性

对于同一笔业务操作,不管调用多少次,得到的结果都是一样的。

  1. 普通方式 只适合单机
  2. jvm加锁方式
  • Lock只能在一个jvm中起效,如果多个请求都被同一套系统处理,上面这种使用Lock的方式是没有问题的,不过互联网系统中,多数是采用集群方式部署系统,同一套代码后面会部署多套,如果支付宝同时发来多个通知经过负载均衡转发到不同的机器,上面的锁就不起效了。此时对于多个请求相当于无锁处理了

3. 悲观锁方式,使用数据库中悲观锁实现。悲观锁类似于方式二中的Lock, 只不过是依靠数据库来实现的。数据库中悲观锁使用for update来实现的

  • 方式3可以保证接口的幂等性,不过存在缺点,如果业务耗时,并发情况下,后面线程会长期处于等待状态,占用很多线程,让这些线程处于无效等待状态

4. 乐观锁方式,使用数据库中乐观锁实现。主要还是通过版本号,类似于java的cas操作,那么问题来了,在多线程的情况下,一个线程取得数据了,其它线程会怎么样呢?会不断重试,不会阻塞,可以理解为自旋,阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长。

在许多场景中,同步资源的锁定时间很短,为了这一小段时间去切换线程,线程挂起和恢复现场的花费可能会让系统得不偿失。如果物理机器有多个处理器,能够让两个或以上的线程同时并行执行,我们就可以让后面那个请求锁的线程不放弃CPU的执行时间,看看持有锁的线程是否很快就会释放锁。

而为了让当前线程“稍等一下”,我们需让当前线程进行自旋,如果在自旋完成后前面锁定同步资源的线程已经释放了锁,那么当前线程就可以不必阻塞而是直接获取同步资源,从而避免切换线程的开销。这就是自旋锁。

5. 唯一约束

常见方式:悲观锁,乐观锁,唯一约束

最优排序:乐观锁 > 唯一约束 > 悲观锁

乐观锁CAS的用法,比方说在couchbase中,key是自带版本号的

  1. object OriginCasStreamingTransfer {
  2. implicit val conf = ConfigFactory.load
  3. def main(args: Array[String]): Unit = {
  4. val cbSingleUerThreshold = conf.getInt("cb_single_user_threshold.size")
  5. val batchSize = conf.getInt("batch.size") // 消息批次处理
  6. val zkQuorum = conf.getString("test_kafka.zkQuorum")
  7. val brokers = conf.getString("test_kafka.brokers")
  8. val topics = conf.getString("test_kafka.topics")
  9. val group = conf.getString("test_kafka.group.id")
  10. val seconds: Long = 120 // spark streaming mini batch time interval 防止pending batch过多造成流任务失败
  11. val numThreads = 16
  12. val topicMap = topics.split(",").map((_, numThreads)).toMap
  13. val kafkaParams = Map(
  14. ("metadata.broker.list", brokers),
  15. ("zookeeper.connect", zkQuorum),
  16. ("auto.offset.reset", "smallest"),
  17. ("zookeeper.connection.timeout.ms", "30000"),
  18. ("group.id", group)
  19. )
  20. val sparkConf = new SparkConf()
  21. .setIfMissing("spark.master", "local[*]")
  22. .setAppName("CasStreamingTransfer")
  23. .set("spark.default.parallelism", "1000")
  24. .set("spark.executor.memory", "8g")
  25. val spark = SparkSession
  26. .builder()
  27. .appName("CasStreamingTransfer")
  28. .config(sparkConf)
  29. .getOrCreate()
  30. val ssc = new StreamingContext(spark.sparkContext, Seconds(seconds))
  31. val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  32. ssc, kafkaParams, topicMap,
  33. StorageLevel.MEMORY_AND_DISK_SER) // key value key解码格式 value解码格式
  34. stream.foreachRDD{each_rdd =>
  35. each_rdd.repartition(1000).foreachPartition{ each_partition =>
  36. while (each_partition.hasNext){
  37. var batchCount = 0
  38. val batchRecord = ListBuffer[String]()
  39. while (batchCount < batchSize && each_partition.hasNext){
  40. val item = each_partition.next()
  41. batchRecord += item._1
  42. batchCount += 1
  43. }
  44. val errorReadResult = collection.mutable.Set[String]()
  45. val readResult = CbManager.getInstance().readCasAsync("test_playback_record_couchbase", batchRecord.asJava, errorReadResult.asJava)
  46. // println("readResult: " + readResult)
  47. val writeResult = collection.mutable.Map[CbCasEntity, Array[Byte]]()
  48. for ((key, value) <- readResult.asScala){
  49. val uid = key.getId // 取出的用户uid
  50. // println("uid: " + uid)
  51. val cas = key.getCas // 取出的用户的couchbase当中的cas值
  52. val parseResult = PlayRecord.UserPlayRecords.parseFrom(value) // 解析couchbase当中的value字段
  53. // 解析
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/698181
推荐阅读
相关标签
  

闽ICP备14008679号