我是靠谱客的博主 细腻黑猫,最近开发中收集的这篇文章主要介绍redis 队列及ACK代码实现概述,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

redis 队列及ACK代码实现

  • 概述
    • 流程图
    • 代码
      • RedisQueueAbstract
      • AckQueue
      • QueueTrait
    • 使用
      • 消费者
      • ACK

概述

使用PHP+Redis简单实现一下队列以及ACK,确保服务的可靠性。

流程图

流程图

代码

RedisQueueAbstract

<?php

namespace RedisQueue;

use RedisLibsRedisClient;
use RedisQueueQueueTrait;

abstract class RedisQueueAbstract
{
    use QueueTrait;

    // redis
    protected $redis;

    // 队列
    protected $queue;

    // 队列值
    protected $queueVal;

    // 自动 Ack
    protected $autoAck = true;

    // 队列阻塞时间
    protected $queueTimeout = 20;

    // 无数据休眠时间
    protected $sleepTimeout = 5;

    // 处理超时时间,最小10秒
    protected $execTimeout = 60;

    public function __construct()
    {
        $this->redis = (new RedisClient)->connect();
    }

    /**
     * 处理取出的队列数据
     *
     * @param array $data
     * @return mixed
     */
    abstract protected function handle(array $data);

    /**
     * 设置队列
     *
     * @param string $queue
     * @return self
     */
    public function setQueue(string $queue): self
    {
        $this->queue = $queue;
        return $this;
    }

    /**
     * 加入队列
     *
     * @param array $data
     * @return integer
     */
    public function dispatch(array $data): bool
    {
        return (bool) $this->redis->lPush($this->queue, $this->handleData($data));
    }

    /**
     * 启动消费者
     *
     * @return void
     */
    public function run(): void
    {
        $this->start();
    }

    /**
     * ACK操作,删除ack队列的消息
     *
     * @return void
     */
    public function ack(): void
    {
        $this->redis->lRem($this->getACKQueue(), $this->queueVal, 1);
    }

    /**
     * 开始
     *
     * @return void
     */
    private function start(): void
    {
        while (true) {
            try {
                $queueVal = $this->redis->bRPopLPush($this->queue, $this->getACKQueue(), $this->queueTimeout);
                if (! $queueVal) {
                    echo sprintf("sleep %d秒" . PHP_EOL, $this->sleepTimeout);
                    sleep($this->sleepTimeout);
                    continue;
                }
                $this->queueVal = $queueVal;
                $this->alterAckQueueVal();
                $this->handle($this->decodeData($this->queueVal)['data']);
                if ($this->autoAck) {
                    $this->ack();
                }
            } catch (Exception $e) {
                echo $e->getMessage();
                continue;
            }
        }
    }

    /**
     * 处理加入队列的数据
     *
     * @param array $data
     * @return string
     */
    private function handleData(array $data): string
    {
        return $this->encodeData([
            'queue' => $this->queue,
            'data' => $data,
            'join_date' => date('Y-m-d H:i:s'),
            'timeout' => $this->execTimeout
        ]);
    }

    /**
     * 修改刚加入ack队列的值
     *
     * @return void
     */
    private function alterAckQueueVal(): void
    {
        $val = $this->decodeData($this->queueVal);
        $val['pop_date'] = date('Y-m-d H:i:s');
        $val['timeout_date'] = date('Y-m-d H:i:s', (time() + min($this->execTimeout, 10)));
        $val['timeout'] = $this->execTimeout;
        $newQueueVal = $this->encodeData($val);
        $this->redis->multi()
            ->lPush($this->getACKQueue(), $newQueueVal)
            ->lRem($this->getACKQueue(), $this->queueVal, 1)
            ->exec();
        $this->queueVal = $newQueueVal;
    }
}

AckQueue

<?php

namespace RedisQueue;

use RedisLibsRedisClient;
use RedisQueueQueueTrait;

class AckQueue
{

    use QueueTrait;

    protected $redis;

    protected $ackQueue = 'list:ack';

    protected $queueValList = [];

    public function __construct()
    {
        $this->redis = (new RedisClient)->connect();
    }

    public function run(): void
    {
        try{
            if (($len = $this->redis->lLen($this->ackQueue)) == 0) {
                echo '无数据,跳过';
                return;
            }
            for ($i=0; $i < $len; $i++) {
                $queueVal = $this->redis->lIndex($this->getACKQueue(), $i);
                if (! $queueVal) {
                    break;
                }
                $queueValData = $this->decodeData($queueVal);
                $originQueue = $queueValData['queue'];
                if (! array_key_exists('timeout_date', $queueValData)) {
                    // 防止误处理刚加入队列的数据
                    if ((strtotime($queueValData['join_date']) + $queueValData['timeout']) > time()) {
                        echo '保险起见,暂不处理';
                        continue;
                    }
                    if (in_array($queueVal, $this->queueValList)) {
                        echo '第二次处理';
                        $this->makeValToOriginQueue($originQueue, $queueVal);
                    } else {
                        $this->queueValList[] = $queueVal;
                        echo '第一次不处理';
                        sleep(5);
                    }
                    $i--;
                    continue;
                }
                if (strtotime($queueValData['timeout_date']) < time()) {
                    $this->makeValToOriginQueue($originQueue, $queueVal);
                    $i--;
                    continue;
                }
            }
        } catch(Exception $e) {
            echo $e->getMessage();
        }
    }

    /**
     * 还原队列数据到原队列
     *
     * @param string $originQueue
     * @param string $queueVal
     * @return void
     */
    public function makeValToOriginQueue(string $originQueue, string $queueVal): void
    {
        $queueValData = $this->decodeData($queueVal);
        $queueValData['timeout_date'] =  date('Y-m-d H:i:s', (time() + min($queueValData['timeout'], 10)));
        $nowQueueVal = $this->encodeData($queueValData);
        $this->redis->multi()
            ->rPush($originQueue, $nowQueueVal)
            ->lRem($this->getACKQueue(), $queueVal, 1)
            ->exec();
    }
}

QueueTrait

<?php

namespace RedisQueue;

trait QueueTrait
{

    /**
     * 数据编码
     *
     * @param array $data
     * @return string
     */
    private function encodeData(array $data): string
    {
        return json_encode($data);
        return base64_encode(json_encode($data));
    }

    /**
     * 数据解码
     *
     * @return array
     */
    private function decodeData(string $data): array
    {
        return json_decode($data, true);
        return json_decode(base64_decode($data), true);
    }

    /**
     * 获取ACK队列
     *
     * @return string
     */
    private function getACKQueue(): string
    {
        return 'list:ack';
    }
}

使用

消费者

<?php

namespace Example;

use RedisQueueRedisQueueAbstract;;

class ExampleQueue extends RedisQueueAbstract
{

    protected $queue;
    
    protected $autoAck = false;
    
    protected $execTimeout = 30;


    protected function handle(array $data)
    {
        print_r($data);
    }
}
<?php

namespace Example;

require_once __DIR__ . '/../vendor/autoload.php';

use ExampleExampleQueue;

(new ExampleQueue())->run();

ACK

定时任务执行

<?php

namespace Example;

require_once __DIR__ . '/../vendor/autoload.php';

use RedisQueueAckQueue;

(new AckQueue())->run();

最后

以上就是细腻黑猫为你收集整理的redis 队列及ACK代码实现概述的全部内容,希望文章能够帮你解决redis 队列及ACK代码实现概述所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(67)

评论列表共有 0 条评论

立即
投稿
返回
顶部