我是靠谱客的博主 名字长了才好记,最近开发中收集的这篇文章主要介绍golang有哪些方式解决并发竞争问题,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

除了数据库锁和分布式锁的方案之外,还有一些其他的方法可以用来防止用户同时多次导入:

1. 基于令牌桶机制的限流

您可以对每个用户的请求进行限流,限制每个用户在某个时间段内只允许执行一次导入请求。这种方法类似于令牌桶(Token Bucket)或漏桶算法,可以用于限制导入操作的频率。

  • 实现方式

    • 使用内存缓存或 Redis 来存储用户导入请求的状态。

    • 当用户请求导入时,检查是否还有“令牌”可以使用,没有则返回提示。

    • 这种方式的优点是简单,适合应用中需要限流的场景。

代码示例(基于 Redis):

import (
    "context"
    "time"
    "errors"

    "github.com/go-redis/redis/v8"
)

var redisClient = redis.NewClient(&redis.Options{
    Addr: "localhost:6379", // Redis 地址
})

// 限制用户每次只能有一个导入任务
func (s *sFaceGroup) ImportFaceInfo(ctx context.Context, in *v1.FaceInfoImportReq) (res *v1.FaceInfoImportRes, err error) {
    // 生成 Redis 键,例如 "import_lock_user_<GroupId>"
    lockKey := "import_lock_user_" + in.GroupId

    // 尝试获取锁,设置 5 分钟的有效期
    ok, err := redisClient.SetNX(ctx, lockKey, "locked", 5*time.Minute).Result()
    if err != nil {
        return nil, err
    }

    if !ok {
        // 如果无法获取锁,表示用户正在进行导入操作
        return nil, errors.New("当前分组正在导入中,请稍后再试")
    }

    // 确保导入操作完成后释放锁
    defer func() {
        // 删除锁
        _, _ = redisClient.Del(ctx, lockKey).Result()
    }()

    // 继续处理导入逻辑...
    tempDir := gfile.Temp(uuid.New().String())
    defer gfile.Remove(tempDir)
    // 其余代码省略...

    return
}

说明

  • 通过 SetNX 方法在 Redis 中创建一个唯一键(锁),并设置一定的有效期。

  • 如果获取锁失败,直接返回错误,提示用户稍后重试。

  • 在导入完成后,通过 Del 方法删除锁。

优点:实现简单,适合用于分布式环境。

2. 基于 GoChannel 的互斥

如果您使用的是单节点部署,可以使用 GoChannel 作为简易的同步机制。利用 Go 中的 chan 可以确保同一时间内某个用户只能有一个导入操作在进行。

  • 实现方式

    • 为每个用户创建一个唯一的 chan,通过它来实现互斥。

    • 当一个用户请求导入时,先检查是否已经有其他操作在进行,通过通道的阻塞特性来确保只有一个操作进行。

代码示例

import (
    "context"
    "sync"
)

var importLocks = struct {
    m sync.Map
}{}

// 获取用户锁的通道
func getUserLockChannel(userId string) chan struct{} {
    lock, _ := importLocks.m.LoadOrStore(userId, make(chan struct{}, 1))
    return lock.(chan struct{})
}

func (s *sFaceGroup) ImportFaceInfo(ctx context.Context, in *v1.FaceInfoImportReq) (res *v1.FaceInfoImportRes, err error) {
    userLockChannel := getUserLockChannel(in.GroupId)

    // 尝试获取锁,如果无法获取则返回错误
    select {
    case userLockChannel <- struct{}{}:
        // 成功获取锁
    default:
        // 如果通道满了,则说明有其他请求在进行中
        return nil, errors.New("当前分组正在导入中,请稍后再试")
    }

    // 确保在函数返回时释放锁
    defer func() {
        <-userLockChannel
    }()

    // 继续处理导入逻辑
    tempDir := gfile.Temp(uuid.New().String())
    defer gfile.Remove(tempDir)
    // 其余代码省略...

    return
}

说明

  • 使用 sync.Map 存储每个用户的锁通道,保证线程安全。

  • 通过向 userLockChannel 发送一个信号来获取锁,利用通道的容量为 1 的特性实现互斥。

  • 如果 default 语句触发,说明锁已被占用。

优点

  • 这种方式简单而高效,适用于单节点的情况,使用 Go 原生的特性来实现同步。

3. 分布式事务管理(基于数据库表锁)

可以通过在数据库中对记录进行表级锁定来确保同一用户的导入操作互斥。例如,可以利用 MySQL 的 表锁行锁,确保对相同 GroupId 的导入请求无法同时进行。

  • 实现方式

    • 利用 MySQL 的 SELECT ... FOR UPDATE,在数据库层面对相应的用户记录加锁,确保只有一个事务可以对该记录进行修改。

代码示例

func (s *sFaceGroup) ImportFaceInfo(ctx context.Context, in *v1.FaceInfoImportReq) (res *v1.FaceInfoImportRes, err error) {
    err = dao.FaceGroup.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
        // 使用 FOR UPDATE 锁定当前分组记录
        group, err := tx.Model(dao.FaceGroup.Table()).
            Where(dao.FaceGroup.Columns().Id, in.GroupId).
            ForUpdate().
            One()
        if err != nil {
            return cerrors.Wrap(err, consts.InternalServer)
        }
        if group.IsEmpty() {
            return errors.New("当前人脸分组不存在")
        }

        // 检查是否已经在导入中
        if group[dao.FaceGroup.Columns().IsImporting].Bool() {
            return errors.New("当前分组正在导入中,请稍后再试")
        }

        // 设置为导入中状态
        _, err = tx.Model(dao.FaceGroup.Table()).
            Where(dao.FaceGroup.Columns().Id, in.GroupId).
            Data(g.Map{dao.FaceGroup.Columns().IsImporting: true}).
            Update()
        if err != nil {
            return cerrors.Wrap(err, consts.InternalServer)
        }

        // 确保导入结束后将其状态恢复
        defer func() {
            _, _ = tx.Model(dao.FaceGroup.Table()).
                Where(dao.FaceGroup.Columns().Id, in.GroupId).
                Data(g.Map{dao.FaceGroup.Columns().IsImporting: false}).
                Update()
        }()

        // 继续处理导入逻辑
        tempDir := gfile.Temp(uuid.New().String())
        defer gfile.Remove(tempDir)
        // 其他处理代码...

        return nil
    })

    if err != nil {
        return nil, err
    }

    // 返回结果
    res = &v1.FaceInfoImportRes{
        Success: 0, // 根据实际导入结果设置
        Fail:    0, // 根据实际导入结果设置
    }
    return
}

说明

  • Transaction 确保对 FaceGroup 的锁定操作在事务内执行,以确保只有一个操作可以获取到该锁。

  • 如果其他请求在该事务结束之前尝试锁定相同记录,它们将被阻塞,直到该事务结束。

4. 消息队列(异步处理)

使用消息队列(如 RabbitMQ、Kafka)将用户的导入请求放入队列,然后由后台的 worker 依次处理每个请求。通过这种方式,可以确保同一时间只有一个导入请求被处理。

  • 实现方式

    • 用户提交导入请求时,将任务放入消息队列。

    • 后端服务从队列中读取任务并处理。

    • 可以在队列中确保相同用户的任务是按顺序执行的(即同一个 GroupId 不会被并发消费)。

优点

  • 将导入处理异步化,不阻塞用户的前端操作。

  • 任务调度完全由消息队列管理,易于扩展。


总结与选择建议

  • 数据库锁(悲观锁或乐观锁):可以有效地防止并发问题,适用于中小规模的系统。悲观锁适合强一致性需求,乐观锁适合对失败重试可以接受的场景。

  • Redis 分布式锁:适合分布式系统,保证跨实例的操作互斥性。简单易用,在分布式场景下非常有效。

  • GoChannel:适合单实例部署,通过 Goroutine 的通道特性来实现互斥,非常简洁高效。

  • 令牌桶限流:适合需要限制用户操作频率的场景,也可以防止并发操作。

  • 消息队列:适合需要将导入操作异步处理的场景,能有效避免用户等待,适用于任务量大且导入时间长的场景。

根据您的具体应用场景和部署架构,如果是单节点,可以考虑 GoChannel数据库悲观锁。如果是分布式系统,可以选择 Redis 分布式锁消息队列 来实现异步化和去重。


最后

以上就是名字长了才好记为你收集整理的golang有哪些方式解决并发竞争问题的全部内容,希望文章能够帮你解决golang有哪些方式解决并发竞争问题所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(113)

评论列表共有 0 条评论

立即
投稿
返回
顶部