赞
踩
什么是幂等性?
对于同一笔业务操作,不管调用多少次,得到的结果都是一样的。
3. 悲观锁方式,使用数据库中悲观锁实现。悲观锁类似于方式二中的Lock, 只不过是依靠数据库来实现的。数据库中悲观锁使用for update来实现的
4. 乐观锁方式,使用数据库中乐观锁实现。主要还是通过版本号,类似于java的cas操作,那么问题来了,在多线程的情况下,一个线程取得数据了,其它线程会怎么样呢?会不断重试,不会阻塞,可以理解为自旋,阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长。
在许多场景中,同步资源的锁定时间很短,为了这一小段时间去切换线程,线程挂起和恢复现场的花费可能会让系统得不偿失。如果物理机器有多个处理器,能够让两个或以上的线程同时并行执行,我们就可以让后面那个请求锁的线程不放弃CPU的执行时间,看看持有锁的线程是否很快就会释放锁。
而为了让当前线程“稍等一下”,我们需让当前线程进行自旋,如果在自旋完成后前面锁定同步资源的线程已经释放了锁,那么当前线程就可以不必阻塞而是直接获取同步资源,从而避免切换线程的开销。这就是自旋锁。
5. 唯一约束
常见方式:悲观锁,乐观锁,唯一约束
最优排序:乐观锁 > 唯一约束 > 悲观锁
乐观锁CAS的用法,比方说在couchbase中,key是自带版本号的
- object OriginCasStreamingTransfer {
- implicit val conf = ConfigFactory.load
-
- def main(args: Array[String]): Unit = {
- val cbSingleUerThreshold = conf.getInt("cb_single_user_threshold.size")
- val batchSize = conf.getInt("batch.size") // 消息批次处理
- val zkQuorum = conf.getString("test_kafka.zkQuorum")
- val brokers = conf.getString("test_kafka.brokers")
- val topics = conf.getString("test_kafka.topics")
- val group = conf.getString("test_kafka.group.id")
- val seconds: Long = 120 // spark streaming mini batch time interval 防止pending batch过多造成流任务失败
- val numThreads = 16
- val topicMap = topics.split(",").map((_, numThreads)).toMap
- val kafkaParams = Map(
- ("metadata.broker.list", brokers),
- ("zookeeper.connect", zkQuorum),
- ("auto.offset.reset", "smallest"),
- ("zookeeper.connection.timeout.ms", "30000"),
- ("group.id", group)
- )
-
- val sparkConf = new SparkConf()
- .setIfMissing("spark.master", "local[*]")
- .setAppName("CasStreamingTransfer")
- .set("spark.default.parallelism", "1000")
- .set("spark.executor.memory", "8g")
- val spark = SparkSession
- .builder()
- .appName("CasStreamingTransfer")
- .config(sparkConf)
- .getOrCreate()
- val ssc = new StreamingContext(spark.sparkContext, Seconds(seconds))
-
- val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, topicMap,
- StorageLevel.MEMORY_AND_DISK_SER) // key value key解码格式 value解码格式
-
- stream.foreachRDD{each_rdd =>
- each_rdd.repartition(1000).foreachPartition{ each_partition =>
- while (each_partition.hasNext){
- var batchCount = 0
- val batchRecord = ListBuffer[String]()
- while (batchCount < batchSize && each_partition.hasNext){
- val item = each_partition.next()
- batchRecord += item._1
- batchCount += 1
- }
- val errorReadResult = collection.mutable.Set[String]()
- val readResult = CbManager.getInstance().readCasAsync("test_playback_record_couchbase", batchRecord.asJava, errorReadResult.asJava)
- // println("readResult: " + readResult)
- val writeResult = collection.mutable.Map[CbCasEntity, Array[Byte]]()
- for ((key, value) <- readResult.asScala){
- val uid = key.getId // 取出的用户uid
- // println("uid: " + uid)
- val cas = key.getCas // 取出的用户的couchbase当中的cas值
- val parseResult = PlayRecord.UserPlayRecords.parseFrom(value) // 解析couchbase当中的value字段
-
- // 解析
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。