我是靠谱客的博主 名字长了才好记,最近开发中收集的这篇文章主要介绍golang使用数据库的锁实现和解决并发问题,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

使用数据库的锁记录确实可能面临并发竞争问题,尤其是在高并发环境下。在您的场景中,如果两个导入请求在非常接近的时间点同时执行,并且都查询到了当前记录的 is_importingfalse,那么它们都可能继续进行后续的处理,从而造成并发操作问题

为了防止这种并发问题,可以通过数据库的原子操作事务机制来保证锁的正确性。以下是几种可以考虑的解决方案:

1. 使用数据库的事务和悲观锁

可以使用悲观锁(Pessimistic Lock)来确保在修改 is_importing 字段时,其他请求无法访问该行记录。例如,您可以在查询时使用 FOR UPDATE 锁定这行记录,以便只有当前操作持有锁的请求可以继续进行。

假设您的数据库是 MySQL,以下是如何实现悲观锁的示例:

import (
    "context"
    "errors"
    "github.com/gogf/gf/v2/database/gdb"
    "github.com/gogf/gf/v2/frame/g"
)

func (s *sFaceGroup) ImportFaceInfo(ctx context.Context, in *v1.FaceInfoImportReq) (res *v1.FaceInfoImportRes, err error) {
    err = dao.FaceGroup.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
        // 使用 FOR UPDATE 锁定当前分组记录
        group, err := tx.Model(dao.FaceGroup.Table()).
            Where(dao.FaceGroup.Columns().Id, in.GroupId).
            ForUpdate().
            One()
        if err != nil {
            return cerrors.Wrap(err, consts.InternalServer)
        }
        if group.IsEmpty() {
            return errors.New("当前人脸分组不存在")
        }

        // 检查是否已经在导入中
        if group[dao.FaceGroup.Columns().IsImporting].Bool() {
            return errors.New("当前分组正在导入中,请稍后再试")
        }

        // 设置为导入中状态
        _, err = tx.Model(dao.FaceGroup.Table()).
            Where(dao.FaceGroup.Columns().Id, in.GroupId).
            Data(g.Map{dao.FaceGroup.Columns().IsImporting: true}).
            Update()
        if err != nil {
            return cerrors.Wrap(err, consts.InternalServer)
        }

        // 确保导入完成后将其状态恢复
        defer func() {
            _, _ = tx.Model(dao.FaceGroup.Table()).
                Where(dao.FaceGroup.Columns().Id, in.GroupId).
                Data(g.Map{dao.FaceGroup.Columns().IsImporting: false}).
                Update()
        }()

        // 继续处理导入逻辑
        tempDir := gfile.Temp(uuid.New().String())
        defer gfile.Remove(tempDir)
        // 其他处理代码...

        return nil
    })

    if err != nil {
        return nil, err
    }

    // 返回结果
    res = &v1.FaceInfoImportRes{
        Success: 0, // 根据实际导入结果设置
        Fail:    0, // 根据实际导入结果设置
    }
    return
}

说明

  • 使用 Transaction 包装整个导入过程,确保数据库操作的一致性。

  • 使用 ForUpdate() 方法为 GROUP_ID 的记录加上排它锁(即悲观锁)。这样其他事务在这条记录上也会被阻塞,直到当前事务完成。

  • 在事务内将 is_importing 设置为 true,并在导入结束后设置为 false,无论导入成功还是失败,均需要保证状态的还原。

2. 使用数据库的乐观锁

乐观锁通过版本号(version)时间戳(timestamp)来确保同一时间内只有一个请求可以修改数据。这种方式适合高并发但冲突概率较低的场景,适用于需要尽量减少锁时间的业务场景。

具体做法是增加一个 version 字段,在更新时同时检查 version 的值是否符合预期:

  1. FaceGroup 表中添加一个 version 字段。

  2. 在更新记录时,带上版本号进行检查。

代码示例

import (
    "context"
    "errors"
    "github.com/gogf/gf/v2/frame/g"
    "github.com/gogf/gf/v2/database/gdb"
)

func (s *sFaceGroup) ImportFaceInfo(ctx context.Context, in *v1.FaceInfoImportReq) (res *v1.FaceInfoImportRes, err error) {
    // 获取人脸分组
    group, err := dao.FaceGroup.Ctx(ctx).
        Where(dao.FaceGroup.Columns().Id, in.GroupId).
        One()
    if err != nil {
        return nil, cerrors.Wrap(err, consts.InternalServer)
    }
    if group.IsEmpty() {
        return nil, errors.New("当前人脸分组不存在")
    }

    // 检查是否已经在导入中
    if group[dao.FaceGroup.Columns().IsImporting].Bool() {
        return nil, errors.New("当前分组正在导入中,请稍后再试")
    }

    // 获取当前版本号
    currentVersion := group[dao.FaceGroup.Columns().Version].Int()

    // 更新记录,设置导入中标志并使用版本号乐观锁
    result, err := dao.FaceGroup.Ctx(ctx).
        Where(dao.FaceGroup.Columns().Id, in.GroupId).
        Where(dao.FaceGroup.Columns().Version, currentVersion).
        Data(g.Map{
            dao.FaceGroup.Columns().IsImporting: true,
            dao.FaceGroup.Columns().Version:     currentVersion + 1,
        }).
        Update()
    if err != nil {
        return nil, cerrors.Wrap(err, consts.InternalServer)
    }

    affected, err := result.RowsAffected()
    if err != nil {
        return nil, cerrors.Wrap(err, consts.InternalServer)
    }

    if affected == 0 {
        return nil, errors.New("当前分组状态已被其他请求修改,请稍后重试")
    }

    // 确保导入结束后将其状态恢复
    defer func() {
        _, _ = dao.FaceGroup.Ctx(ctx).
            Where(dao.FaceGroup.Columns().Id, in.GroupId).
            Data(g.Map{
                dao.FaceGroup.Columns().IsImporting: false,
                dao.FaceGroup.Columns().Version:     currentVersion + 2,
            }).
            Update()
    }()

    // 继续处理导入逻辑
    tempDir := gfile.Temp(uuid.New().String())
    defer gfile.Remove(tempDir)
    // 其他处理代码...

    // 返回结果
    res = &v1.FaceInfoImportRes{
        Success: 0, // 根据实际导入结果设置
        Fail:    0, // 根据实际导入结果设置
    }
    return
}

说明

  • 使用 version 字段来实现乐观锁,每次更新时检查版本号是否匹配。

  • 如果更新时版本号不匹配,说明数据已经被其他请求修改,这种情况下返回错误提示用户稍后再试。

  • 通过这种方式确保只有一个请求可以成功更新 is_importing 状态。

3. 总结和选择建议

  • 悲观锁(FOR UPDATE

    • 适合:并发量较高的场景,需要强一致性,且可以容忍部分请求等待锁释放。

    • 优点:通过数据库直接控制并发,不会出现版本冲突问题。

    • 缺点:会导致持有锁的时间变长,其他请求会被阻塞。

  • 乐观锁(version 字段)

    • 适合:并发量高,但对部分请求失败可以接受的场景。乐观锁可以在冲突较少的情况下提高系统的吞吐量。

    • 优点:不需要长时间持有锁,允许其他请求读取数据。

    • 缺点:在冲突较高时,可能导致大量请求因为版本不匹配而失败。

  • Redis 分布式锁:适合分布式环境下的锁需求,可以保证多个实例对同一资源的访问控制。

对于您描述的用户导入操作,通常推荐使用悲观锁结合事务来保证同一时间只有一个导入请求能够执行,尤其是在需要严格保证操作互斥的情况下。这样可以确保导入操作不会因为并发竞争导致数据不一致。

如果系统对高并发有很高的要求,可以考虑结合使用 Redis 分布式锁进行控制。这样既能保证分布式环境中的一致性,也能适应多实例部署。


最后

以上就是名字长了才好记为你收集整理的golang使用数据库的锁实现和解决并发问题的全部内容,希望文章能够帮你解决golang使用数据库的锁实现和解决并发问题所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(96)

评论列表共有 0 条评论

立即
投稿
返回
顶部