概述
1 更新丢失
对于可重复读、读已提交隔离级别都会有更新丢失的问题。事务A、事务B开始读取到order_id=1的累计金额为0,事务A加一后执行更新并提交,此时数据库中累计金额为1。事务B也加一并执行更新且提交,覆盖了事务A的结果。两个事务完成后,数据库中累计金额为1。而我们的预期是2。
2 解决方案
2.1悲观锁
对数据加锁,同一时间内只能有一个事务进行更新。
2.2更改update语句
因为update语句是原子性的,对于update a = a + 1类型,即使在高并发下,依然是一条一条执行,执行多少次update就增加几次1。
原来:
accumulativeTotal=1
accumulativeTotal= accumulativeTotal +1
update accumulative_total = accumulativeTotal
改为:
accumulativeTotal=1
update accumulative_total = accumulative_total + 1
2.3乐观锁
乐观锁没有加锁,而是由应用程序判断是否可以更新。一般会通过引入版本号的方式来实现,在更新之前读取到版本号,update语句指定where条件版本号等于之前读到的,并让版本加一,也是依赖update语句的原子性。版本号等于之前读到的说明数据没有被更改,可以更新,否则放弃更新,重新读取更新。由于可能在事务中多次读取版本号,且版本号要最新的,因此隔离级别应为读已提交。
乐观锁适合于并发较低的情况,当并发高时,应用会不断地读取版本、放弃更新,不停空转,消耗cpu、数据库资源。
2.4分布式锁
分布式锁,比如redis实现的分布式锁,多了跟redis交互的时间,效率并不高。
3 实战
3.1说明
3.1.1 源码
https://gitee.com/tong-exists/concurrent-update-mysql
3.1.2 机器配置
数据库机器
虚拟机2GB内存,1核cpu
redis机器配置
虚拟机2GB内存,1核cpu
宿主机配置
16GB内存,8核cpu 2.4Ghz。go程序、jmeter都在宿主机运行。
3.1.3 表的初始状态
也是测试前的状态
3.1.4 测试
测试软件为jmeter
3.1.4.1 高并发测试配置
样本为5000个,100的并发
3.1.4.2 低并发测试配置
样本为5000个,10的并发
3.2更新丢失
3.2.1 代码
package main
import (
"github.com/gin-gonic/gin"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"log"
"net/http"
)
type Order struct {
OrderId int64 `gorm:"primaryKey"`
AccumulativeTotal float64
}
func (Order) TableName() string {
return "t_order"
}
// 更新丢失 代码
func main() {
//建立mysql连接
dsn := "root:mWwdsqw1s2x3x*@tcp(mysql3:3306)/mytest?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Println("mysql数据库连接失败,", err.Error())
return
}
//创建gin路由
r := gin.Default()
r.GET("/update", func(c *gin.Context) {
//开启事务
tx := db.Begin()
var order Order
tx.Raw("select order_id, accumulative_total from t_order where order_id = ?", 1).Scan(&order)
//内存中加一后更新
tx.Exec("update t_order set accumulative_total = ? where order_id = ?", order.AccumulativeTotal+1.0, 1)
//提交
tx.Commit()
c.JSON(http.StatusOK, gin.H{
"message": "ok",
})
})
r.Run(":8080") // 监听并在 0.0.0.0:8080 上启动服务
}
3.2.2 高并发测试
结果不是5000,错误
3.2.3 低并发测试
比高并发好点,但结果不是5000,错误
3.3悲观锁
3.3.1 代码
package main
import (
"github.com/gin-gonic/gin"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"log"
"net/http"
)
type Order struct {
OrderId int64 `gorm:"primaryKey"`
AccumulativeTotal float64
}
func (Order) TableName() string {
return "t_order"
}
func main() {
//建立mysql连接
dsn := "root:mWwdsqw1s2x3x*@tcp(mysql3:3306)/mytest?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Println("mysql数据库连接失败,", err.Error())
return
}
//创建gin路由
r := gin.Default()
r.GET("/update", func(c *gin.Context) {
//开启事务
tx := db.Begin()
var order Order
//加悲观锁
tx.Raw("select order_id, accumulative_total from t_order where order_id = ? for update", 1).Scan(&order)
//内存中加一后更新
tx.Exec("update t_order set accumulative_total = ? where order_id = ?", order.AccumulativeTotal+1.0, 1)
//提交
tx.Commit()
c.JSON(http.StatusOK, gin.H{
"message": "ok",
})
})
r.Run(":8080") // 监听并在 0.0.0.0:8080 上启动服务
}
3.3.2 高并发测试
结果正确
3.3.3 低并发测试
结果正确
3.4更改update语句
3.4.1 代码
package main
import (
"github.com/gin-gonic/gin"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"log"
"net/http"
)
type Order struct {
OrderId int64 `gorm:"primaryKey"`
AccumulativeTotal float64
}
func (Order) TableName() string {
return "t_order"
}
func main() {
//建立mysql连接
dsn := "root:mWwdsqw1s2x3x*@tcp(mysql3:3306)/mytest?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Println("mysql数据库连接失败,", err.Error())
return
}
//创建gin路由
r := gin.Default()
r.GET("/update", func(c *gin.Context) {
//开启事务
tx := db.Begin()
//依赖update的原子性加一
tx.Exec("update t_order set accumulative_total = ? where order_id = ?", gorm.Expr("accumulative_total + ?", 1.0), 1)
tx.Commit()
c.JSON(http.StatusOK, gin.H{
"message": "ok",
})
})
r.Run(":8080") // 监听并在 0.0.0.0:8080 上启动服务
}
3.4.2 高并发测试
结果正确
3.4.3 低并发测试
3.5乐观锁
数据库的全局隔离级别是REPEATABLE-READ,当前会话的隔离级别是READ-COMMITTED。乐观锁需要会话处于读已提交级别下。InnoDB在读已提交下需要binlog格式为ROW。
执行以下命令设置binlog格式。
3.5.1 代码
package main
import (
"database/sql"
"github.com/gin-gonic/gin"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"log"
"net/http"
)
type Order struct {
OrderId int64 `gorm:"primaryKey"`
AccumulativeTotal float64
Version int64
}
func (Order) TableName() string {
return "t_order"
}
func main() {
//建立mysql连接
dsn := "root:mWwdsqw1s2x3x*@tcp(mysql3:3306)/mytest?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Println("mysql数据库连接失败,", err.Error())
return
}
//创建gin路由
r := gin.Default()
r.GET("/update", func(c *gin.Context) {
//设置读已提交隔离级别
var opt sql.TxOptions
opt.Isolation = sql.LevelReadCommitted
opt.ReadOnly = false
//开启事务
tx := db.Begin(&opt)
var orderBefore Order
//读取版本号
tx.Raw("select order_id, accumulative_total, version from t_order where order_id = ?", 1).Scan(&orderBefore)
//尝试更新
affected := tx.Exec("update t_order set accumulative_total = ?, version = ? where order_id = ? and version = ?",
orderBefore.AccumulativeTotal+1.0, orderBefore.Version+1,
1, orderBefore.Version).RowsAffected
log.Println("更新结果: ", affected)
for affected != 1 { //更新失败,数据被其他事务更改了
//重新读取版本号
tx.Raw("select order_id, accumulative_total, version from t_order where order_id = ?", 1).Scan(&orderBefore)
//重新尝试更新
affected = tx.Exec("update t_order set accumulative_total = ?, version = ? where order_id = ? and version = ?",
orderBefore.AccumulativeTotal+1.0, orderBefore.Version+1,
1, orderBefore.Version).RowsAffected
log.Println("更新结果: ", affected)
}
//更新成功了,提交
tx.Commit()
c.JSON(http.StatusOK, gin.H{
"message": "ok",
})
})
r.Run(":8080") // 监听并在 0.0.0.0:8080 上启动服务
}
3.5.2 高并发测试
3.5.3 低并发测试
3.6分布式锁
3.6.1 代码
package main
import (
"context"
"github.com/gin-gonic/gin"
"github.com/go-redis/redis/v9"
"github.com/google/uuid"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"log"
"net/http"
)
type Order struct {
OrderId int64 `gorm:"primaryKey"`
AccumulativeTotal float64
Version int64
}
func (Order) TableName() string {
return "t_order"
}
// 加锁lua代码
const LOCK_LUA = "if redis.call("setnx", KEYS[1], KEYS[2]) == 1 thenn return redis.call("pexpire", KEYS[1], KEYS[3])nelsen return 0nend"
// 解锁lua代码
const UNLOCK_LUA = "if redis.call("get",KEYS[1]) == KEYS[2] thenn return redis.call("del",KEYS[1])nelsen return -1nend"
const LOCK_KEY = "lock_key"
func main() {
//建立redis连接
rdb := redis.NewClient(&redis.Options{
Addr: "mysql4:6379",
Password: "", // no password set
DB: 0, // use default DB
})
//建立mysql连接
dsn := "root:mWwdsqw1s2x3x*@tcp(mysql3:3306)/mytest?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Println("mysql数据库连接失败,", err.Error())
return
}
//创建gin路由
r := gin.Default()
r.GET("/update", func(c *gin.Context) {
// 加锁
ctx := context.Background()
value := uuid.NewString()
lockResult, _ := rdb.Eval(ctx, LOCK_LUA, []string{LOCK_KEY, value, "5000"}).Result()
for lockResult != int64(1) { // 加锁失败
// 重新尝试加锁
lockResult, _ = rdb.Eval(ctx, LOCK_LUA, []string{LOCK_KEY, value, "5000"}).Result()
}
//加锁成功
//开启事务
tx := db.Begin()
var orderBefore Order
//读取数据
tx.Raw("select order_id, accumulative_total, version from t_order where order_id = ?", 1).Scan(&orderBefore)
//内存中加一后更新
tx.Exec("update t_order set accumulative_total = ? where order_id = ?",
orderBefore.AccumulativeTotal+1.0, 1)
//解锁
unlockResult, _ := rdb.Eval(ctx, UNLOCK_LUA, []string{LOCK_KEY, value}).Result()
if unlockResult != int64(1) { //解锁失败,可能是方法太耗时,key过期了
//回滚
tx.Rollback()
c.JSON(http.StatusServiceUnavailable, gin.H{
"message": "no",
})
} else { //解锁成功
//提交
tx.Commit()
c.JSON(http.StatusOK, gin.H{
"message": "ok",
})
}
})
r.Run(":8080") // 监听并在 0.0.0.0:8080 上启动服务
}
3.6.2 高并发测试
、
3.6.3 低并发测试
4 对比分析
表格 1高并发
平均响应时间 | 最低响应时间 | 最高响应时间 | |
---|---|---|---|
update语句 | 387 | 16 | 741 |
悲观锁 | 532 | 14 | 786 |
乐观锁 | 770 | 31 | 999 |
分布式锁 | 1681 | 9 | 15013 |
由上图可知,在高并发下,update语句的平均响应时间最少,乐观锁要多于悲观锁。分布式锁因为额外与redis交互时间较多。
表格 2低并发
平均响应时间 | 最低响应时间 | 最高响应时间 | |
---|---|---|---|
update语句 | 30 | 4 | 77 |
悲观锁 | 45 | 5 | 149 |
分布式锁 | 60 | 6 | 375 |
乐观锁 | 69 | 6 | 134 |
由上图可知,在低并发下,还是update效率最高。最高响应时间最长的是分布式锁。这里乐观锁的平均响应时间比悲观锁要多。乐观锁、悲观锁没有优劣之分,适用场景不同。
最后
以上就是着急汉堡为你收集整理的mysql并发更新丢失解决方案实战 go语言1 更新丢失2 解决方案3 实战4 对比分析的全部内容,希望文章能够帮你解决mysql并发更新丢失解决方案实战 go语言1 更新丢失2 解决方案3 实战4 对比分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复