我是靠谱客的博主 着急汉堡,最近开发中收集的这篇文章主要介绍mysql并发更新丢失解决方案实战 go语言1 更新丢失2 解决方案3 实战4 对比分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

mysql并发更新丢失解决方案实战

1 更新丢失

对于可重复读、读已提交隔离级别都会有更新丢失的问题。事务A、事务B开始读取到order_id=1的累计金额为0,事务A加一后执行更新并提交,此时数据库中累计金额为1。事务B也加一并执行更新且提交,覆盖了事务A的结果。两个事务完成后,数据库中累计金额为1。而我们的预期是2。

在这里插入图片描述

2 解决方案

2.1悲观锁

对数据加锁,同一时间内只能有一个事务进行更新。

2.2更改update语句

因为update语句是原子性的,对于update a = a + 1类型,即使在高并发下,依然是一条一条执行,执行多少次update就增加几次1。

原来:

accumulativeTotal=1

accumulativeTotal= accumulativeTotal +1

update accumulative_total = accumulativeTotal

改为:

accumulativeTotal=1

update accumulative_total = accumulative_total + 1

2.3乐观锁

乐观锁没有加锁,而是由应用程序判断是否可以更新。一般会通过引入版本号的方式来实现,在更新之前读取到版本号,update语句指定where条件版本号等于之前读到的,并让版本加一,也是依赖update语句的原子性。版本号等于之前读到的说明数据没有被更改,可以更新,否则放弃更新,重新读取更新。由于可能在事务中多次读取版本号,且版本号要最新的,因此隔离级别应为读已提交。

乐观锁适合于并发较低的情况,当并发高时,应用会不断地读取版本、放弃更新,不停空转,消耗cpu、数据库资源。

2.4分布式锁

分布式锁,比如redis实现的分布式锁,多了跟redis交互的时间,效率并不高。

3 实战

3.1说明

3.1.1 源码

https://gitee.com/tong-exists/concurrent-update-mysql

3.1.2 机器配置

数据库机器

虚拟机2GB内存,1核cpu

redis机器配置

虚拟机2GB内存,1核cpu

宿主机配置

16GB内存,8核cpu 2.4Ghz。go程序、jmeter都在宿主机运行。

3.1.3 表的初始状态

也是测试前的状态
在这里插入图片描述

3.1.4 测试

测试软件为jmeter

3.1.4.1 高并发测试配置

样本为5000个,100的并发

在这里插入图片描述

3.1.4.2 低并发测试配置

样本为5000个,10的并发
在这里插入图片描述

3.2更新丢失

3.2.1 代码

package main
 
 import (
  "github.com/gin-gonic/gin"
  "gorm.io/driver/mysql"
  "gorm.io/gorm"
  "log"
  "net/http"
 )
 
 type Order struct {
  OrderId      int64 `gorm:"primaryKey"`
  AccumulativeTotal float64
 }
 
 func (Order) TableName() string {
  return "t_order"
 }
 
 // 更新丢失 代码
 func main() {
  //建立mysql连接
  dsn := "root:mWwdsqw1s2x3x*@tcp(mysql3:3306)/mytest?charset=utf8mb4&parseTime=True&loc=Local"
  db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  if err != nil {
    log.Println("mysql数据库连接失败,", err.Error())
    return
  }
  //创建gin路由
  r := gin.Default()
  r.GET("/update", func(c *gin.Context) {
    //开启事务
    tx := db.Begin()
    var order Order
    tx.Raw("select order_id, accumulative_total from t_order where order_id = ?", 1).Scan(&order)
    //内存中加一后更新
    tx.Exec("update t_order set accumulative_total = ? where order_id = ?", order.AccumulativeTotal+1.0, 1)
    //提交
    tx.Commit()
    c.JSON(http.StatusOK, gin.H{
     "message": "ok",
    })
  })
  r.Run(":8080") // 监听并在 0.0.0.0:8080 上启动服务
 }

3.2.2 高并发测试

结果不是5000,错误

在这里插入图片描述
在这里插入图片描述

3.2.3 低并发测试

比高并发好点,但结果不是5000,错误

在这里插入图片描述
在这里插入图片描述

3.3悲观锁

3.3.1 代码

package main
 
 import (
  "github.com/gin-gonic/gin"
  "gorm.io/driver/mysql"
  "gorm.io/gorm"
  "log"
  "net/http"
 )
 
 type Order struct {
  OrderId      int64 `gorm:"primaryKey"`
  AccumulativeTotal float64
 }
 
 func (Order) TableName() string {
  return "t_order"
 }
 
 func main() {
  //建立mysql连接
  dsn := "root:mWwdsqw1s2x3x*@tcp(mysql3:3306)/mytest?charset=utf8mb4&parseTime=True&loc=Local"
  db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  if err != nil {
    log.Println("mysql数据库连接失败,", err.Error())
    return
  }
  //创建gin路由
  r := gin.Default()
  r.GET("/update", func(c *gin.Context) {
    //开启事务
    tx := db.Begin()
    var order Order
    //加悲观锁
    tx.Raw("select order_id, accumulative_total from t_order where order_id = ? for update", 1).Scan(&order)
    //内存中加一后更新
    tx.Exec("update t_order set accumulative_total = ? where order_id = ?", order.AccumulativeTotal+1.0, 1)
    //提交
    tx.Commit()
    c.JSON(http.StatusOK, gin.H{
     "message": "ok",
    })
  })
  r.Run(":8080") // 监听并在 0.0.0.0:8080 上启动服务
 }

3.3.2 高并发测试

结果正确

在这里插入图片描述
在这里插入图片描述

3.3.3 低并发测试

结果正确

在这里插入图片描述
在这里插入图片描述

3.4更改update语句

3.4.1 代码

package main
 
 import (
  "github.com/gin-gonic/gin"
  "gorm.io/driver/mysql"
  "gorm.io/gorm"
  "log"
  "net/http"
 )
 
 type Order struct {
  OrderId      int64 `gorm:"primaryKey"`
  AccumulativeTotal float64
 }
 
 func (Order) TableName() string {
  return "t_order"
 }
 
 func main() {
  //建立mysql连接
  dsn := "root:mWwdsqw1s2x3x*@tcp(mysql3:3306)/mytest?charset=utf8mb4&parseTime=True&loc=Local"
  db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  if err != nil {
    log.Println("mysql数据库连接失败,", err.Error())
    return
  }
  //创建gin路由
  r := gin.Default()
  r.GET("/update", func(c *gin.Context) {
    //开启事务
    tx := db.Begin()
    //依赖update的原子性加一
    tx.Exec("update t_order set accumulative_total = ? where order_id = ?", gorm.Expr("accumulative_total + ?", 1.0), 1)
    tx.Commit()
    c.JSON(http.StatusOK, gin.H{
     "message": "ok",
    })
  })
  r.Run(":8080") // 监听并在 0.0.0.0:8080 上启动服务
 }

3.4.2 高并发测试

结果正确

在这里插入图片描述
在这里插入图片描述

3.4.3 低并发测试

在这里插入图片描述
在这里插入图片描述

3.5乐观锁

数据库的全局隔离级别是REPEATABLE-READ,当前会话的隔离级别是READ-COMMITTED。乐观锁需要会话处于读已提交级别下。InnoDB在读已提交下需要binlog格式为ROW。

执行以下命令设置binlog格式。

在这里插入图片描述

3.5.1 代码

package main
 
 import (
  "database/sql"
  "github.com/gin-gonic/gin"
  "gorm.io/driver/mysql"
  "gorm.io/gorm"
  "log"
  "net/http"
 )
 
 type Order struct {
  OrderId      int64 `gorm:"primaryKey"`
  AccumulativeTotal float64
  Version      int64
 }
 
 func (Order) TableName() string {
  return "t_order"
 }
 
 func main() {
  //建立mysql连接
  dsn := "root:mWwdsqw1s2x3x*@tcp(mysql3:3306)/mytest?charset=utf8mb4&parseTime=True&loc=Local"
  db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  if err != nil {
    log.Println("mysql数据库连接失败,", err.Error())
    return
  }
  //创建gin路由
  r := gin.Default()
  r.GET("/update", func(c *gin.Context) {
    //设置读已提交隔离级别
    var opt sql.TxOptions
    opt.Isolation = sql.LevelReadCommitted
    opt.ReadOnly = false
    //开启事务
    tx := db.Begin(&opt)
    var orderBefore Order
    //读取版本号
    tx.Raw("select order_id, accumulative_total, version from t_order where order_id = ?", 1).Scan(&orderBefore)
    //尝试更新
    affected := tx.Exec("update t_order set accumulative_total = ?, version = ? where order_id = ? and version = ?",
     orderBefore.AccumulativeTotal+1.0, orderBefore.Version+1,
     1, orderBefore.Version).RowsAffected
    log.Println("更新结果: ", affected)
    for affected != 1 { //更新失败,数据被其他事务更改了
     //重新读取版本号
     tx.Raw("select order_id, accumulative_total, version from t_order where order_id = ?", 1).Scan(&orderBefore)
     //重新尝试更新
     affected = tx.Exec("update t_order set accumulative_total = ?, version = ? where order_id = ? and version = ?",
       orderBefore.AccumulativeTotal+1.0, orderBefore.Version+1,
       1, orderBefore.Version).RowsAffected
     log.Println("更新结果: ", affected)
    }
    //更新成功了,提交
    tx.Commit()
    c.JSON(http.StatusOK, gin.H{
     "message": "ok",
    })
  })
  r.Run(":8080") // 监听并在 0.0.0.0:8080 上启动服务
 }

3.5.2 高并发测试

在这里插入图片描述
在这里插入图片描述

3.5.3 低并发测试

在这里插入图片描述
在这里插入图片描述

3.6分布式锁

3.6.1 代码

package main
 
 import (
  "context"
  "github.com/gin-gonic/gin"
  "github.com/go-redis/redis/v9"
  "github.com/google/uuid"
  "gorm.io/driver/mysql"
  "gorm.io/gorm"
  "log"
  "net/http"
 )
 
 type Order struct {
  OrderId      int64 `gorm:"primaryKey"`
  AccumulativeTotal float64
  Version      int64
 }
 
 func (Order) TableName() string {
  return "t_order"
 }
 
 // 加锁lua代码
 const LOCK_LUA = "if redis.call("setnx", KEYS[1], KEYS[2]) == 1 thenn  return redis.call("pexpire", KEYS[1], KEYS[3])nelsen  return 0nend"
 
 // 解锁lua代码
 const UNLOCK_LUA = "if redis.call("get",KEYS[1]) == KEYS[2] thenn  return redis.call("del",KEYS[1])nelsen  return -1nend"
 const LOCK_KEY = "lock_key"
 
 func main() {
  //建立redis连接
  rdb := redis.NewClient(&redis.Options{
    Addr:   "mysql4:6379",
    Password: "", // no password set
    DB:    0, // use default DB
  })
  //建立mysql连接
  dsn := "root:mWwdsqw1s2x3x*@tcp(mysql3:3306)/mytest?charset=utf8mb4&parseTime=True&loc=Local"
  db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  if err != nil {
    log.Println("mysql数据库连接失败,", err.Error())
    return
  }
  //创建gin路由
  r := gin.Default()
  r.GET("/update", func(c *gin.Context) {
    // 加锁
    ctx := context.Background()
    value := uuid.NewString()
    lockResult, _ := rdb.Eval(ctx, LOCK_LUA, []string{LOCK_KEY, value, "5000"}).Result()
    for lockResult != int64(1) { // 加锁失败
     // 重新尝试加锁
     lockResult, _ = rdb.Eval(ctx, LOCK_LUA, []string{LOCK_KEY, value, "5000"}).Result()
    }
    //加锁成功
    //开启事务
    tx := db.Begin()
    var orderBefore Order
    //读取数据
    tx.Raw("select order_id, accumulative_total, version from t_order where order_id = ?", 1).Scan(&orderBefore)
    //内存中加一后更新
    tx.Exec("update t_order set accumulative_total = ? where order_id = ?",
     orderBefore.AccumulativeTotal+1.0, 1)
    //解锁
    unlockResult, _ := rdb.Eval(ctx, UNLOCK_LUA, []string{LOCK_KEY, value}).Result()
    if unlockResult != int64(1) { //解锁失败,可能是方法太耗时,key过期了
     //回滚
     tx.Rollback()
     c.JSON(http.StatusServiceUnavailable, gin.H{
       "message": "no",
     })
    } else { //解锁成功
      //提交
     tx.Commit()
     c.JSON(http.StatusOK, gin.H{
       "message": "ok",
     })
    }
  })
  r.Run(":8080") // 监听并在 0.0.0.0:8080 上启动服务
 }

3.6.2 高并发测试

在这里插入图片描述
在这里插入图片描述

3.6.3 低并发测试

在这里插入图片描述
在这里插入图片描述

4 对比分析

表格 1高并发

平均响应时间最低响应时间最高响应时间
update语句38716741
悲观锁53214786
乐观锁77031999
分布式锁1681915013

由上图可知,在高并发下,update语句的平均响应时间最少,乐观锁要多于悲观锁。分布式锁因为额外与redis交互时间较多。

表格 2低并发

平均响应时间最低响应时间最高响应时间
update语句30477
悲观锁455149
分布式锁606375
乐观锁696134

由上图可知,在低并发下,还是update效率最高。最高响应时间最长的是分布式锁。这里乐观锁的平均响应时间比悲观锁要多。乐观锁、悲观锁没有优劣之分,适用场景不同。

最后

以上就是着急汉堡为你收集整理的mysql并发更新丢失解决方案实战 go语言1 更新丢失2 解决方案3 实战4 对比分析的全部内容,希望文章能够帮你解决mysql并发更新丢失解决方案实战 go语言1 更新丢失2 解决方案3 实战4 对比分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(108)

评论列表共有 0 条评论

立即
投稿
返回
顶部