我是靠谱客的博主 开放羊,最近开发中收集的这篇文章主要介绍解决方案(16) 异步落库方案,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

注意

Redis版本要求4以上。

前言

为了解决db写入pqs过高的问题,我们在db和业务之间,隔离了一层mq。

实际结果,哪怕接入了mq,消息消费也跟不上,依旧负载很高。如果降低mq pull频率,那么业务实时性会降低,这个不太接受。

为了解决这个场景,决定从【业务实时性】【降幂】【方案易用和复制】多个角度,设计了这一套异步落库方案。

方案的主题是,将第一手数据,从db改到redis。延长redis失效时间。确保晚上能够正确执行同步。

目前,线上使用稳定。

分析

● 【业务实时性】从实时上考虑,异步写入方式,使用mq削峰后,仍旧很高qps,继续降低拉取频率,只会降低实时性。所以考虑将可靠数据,从db转移至redis。
● 【降幂】异步存储的场景,大部分可以归结为某一个活动进度。这类模型有一个特性,就是以最新的结果为主,换句话说,不建议将每次更新,以日志的形式写入mq,再对每条update分次处理。一个用户单日更新一万次,实际上更新落盘只在乎最新的一次,而不是也在低峰值下,同步一万次。
● 【方案易用性和复制性】 该方案的实现,必须通用化,不能每一个业务场景,每一个业务模块,都重复实现。因为实现过程容易出错,新人和不同的开发同事,实现风格迥异,难以维护。

设计方案

主流程分析
在这里插入图片描述

● 通过将可靠模型存储,从db转移到redis。意味着模型的缓存失效时间,从原来的5-12分钟,升级为3-7天。保障【实时性】。
● 通过redis set集合特性,执行时,用户的进度key,实际上是去重了的,所以能够做到,只取最新一次。保障了【降幂】
● 将复杂的同步过程,形成框架,在集成进项目模块时,越简单,维护起来越简单。保障【方案易用性】

实现

● 仓库: github.com/fwhezfwhez/syncwd
● 通过开源,由社区反馈意见和bug。并在团队内,使用优化版的。

接入

require (
    github.com/fwhezfwhez/syncwd latest
)

纳入异步同步的表模型,必须实现以下方法

type ModelI interface {
	RedisKey() string            // 某个模型的rediskey
	TableName() string           // 某个模型的表名
	SyncToDB() error             // 执行更新进数据库的方法
}

样例
● 用户活动进度表需要异步,优先更新redis,晚上异步落盘

type UserProcess struct {
	UserName string `gorm:"column:user_name;default:"`
}

func (up UserProcess) TableName() string {
	return "user_process"
}
func (up UserProcess) RedisKey() string {
	return fmt.Sprintf("appsrv:%s", up.UserName)
}

func (up UserProcess) SyncToDB() error {
    // 伪代码,同步过程。实际应该由开发人员自己实现
	fmt.Println("成功同步:", Debug(up))
	return nil
}

func Debug(v interface{}) string {
	rs, _ := json.MarshalIndent(v, "  ", "  ")
	return string(rs)
}

接入同步计划
● 实际测试时,需要先执行prepareData(),再修改本地时间至明天,然后执行

package main

import (
	"fmt"
	"github.com/fwhezfwhez/syncwd"
	"github.com/fwhezfwhez/syncwd/testcase/case1/src"
	"github.com/garyburd/redigo/redis"
	 "gopkg.in/robfig/cron.v2"
	"time"
)

// 同步计划管理 应用内单例。支持并发(cron内的同步操作毋须做任务幂等)。
var sm *syncwd.SyncManager

var p *redis.Pool

func init() {
	p = src.NewPool("localhost:6379", "", 0)

	sm = syncwd.NewSyncManager(p, 5)
	sm.Add(src.UserInfo{})
}

func main() {
	//prepareData()

	c := cron.New()
	c.AddFunc("0 0 3 * * ?", func() {
		sm.Run()
	})

	c.Start()
	select {}
}

// 实际业务中,不需要用到它
func prepareData() {
	// 准备数据源
	var sd = syncwd.NewSyncwd()

	conn := p.Get()
	defer conn.Close()
	for i := 0; i < 10000; i ++ {
		ui := src.UserInfo{
			UserName: fmt.Sprintf("冯%d-%d", time.Now().Unix(), i),
		}

		if e := sd.Update(ui, conn); e != nil {
			panic(e)
		}
	}
}

更新操作变更
● 找到模块对应模块更新的位置,使用syncwd.Update(o, conn) 来替代有逻辑。

最后

以上就是开放羊为你收集整理的解决方案(16) 异步落库方案的全部内容,希望文章能够帮你解决解决方案(16) 异步落库方案所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部