我是靠谱客的博主 靓丽心锁,最近开发中收集的这篇文章主要介绍thinkphp6.0 阿里云 stomp前置条件实现效果:Composer.json代码,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前置条件

thinkadmin

ThinkAdmin: 基于 ThinkPHP 微信后台管理平台(体验账号和密码都是 admin)icon-default.png?t=M4ADhttps://gitee.com/zoujingli/ThinkAdmin

实现效果:

// 开启
php think xapi:CmdAliyunStomp start

//守卫进程
php think xapi:CmdAliyunStomp start -d

// 关闭
php think xapi:CmdAliyunStomp stop

//状态
php think xapi:CmdAliyunStomp status

//监听
php think xapi:CmdAliyunStomp listen

Composer.json

添加: stomp-php/stomp-php

 "require": {  
    
    "stomp-php/stomp-php": "5.0.0"

  },

代码

<?php


namespace appapicommand;



use PsrLogNullLogger;



use StompClient;
use StompExceptionStompException;
use StompNetworkObserverExceptionHeartbeatException;
use StompNetworkObserverServerAliveObserver;
use StompStatefulStomp;
use thinkadminCommand;
use thinkadminException;
use thinkadminserviceProcessService;
use thinkconsoleInput;
use thinkconsoleinputArgument;
use thinkconsoleinputOption;
use thinkconsoleOutput;


/**
 * 阿里云  CmdAliyunStomp
 */
class CmdAliyunStomp extends Command
{
    /**
     * php think  xapi:CmdAliyunStomp start -d
     * Class OrderClean
     * @package appdatacommand
     */
//    $name=

    const QUEUE_LISTEN = 'xapi:CmdAliyunStomp listen';

    protected $name = 'xapi:CmdAliyunStomp';

    //不能重复创建;
    protected function configure()
    {
        $this->setName($this->name);
        $this->addOption('daemon', 'd', Option::VALUE_NONE, 'The queue listen in daemon mode');
        $this->addArgument('action', Argument::OPTIONAL, 'stop|start|status|listen', 'listen');
        $this->addArgument('code', Argument::OPTIONAL, 'Taskcode');
        $this->addArgument('spts', Argument::OPTIONAL, 'Separator');
        $this->setDescription('xapi:CmdTest listen for my');

    }

    /**
     * 业务指令执行
     * @param Input $input
     * @param Output $output
     * @return void
     * @throws Exception
     */
    protected function execute(Input $input, Output $output)
    {

        $action = $input->hasOption('daemon') ? 'start' : $input->getArgument('action');
        if (method_exists($this, $method = "{$action}Action"))
            return $this->$method();

        $this->output->error("># Wrong operation, Allow stop|start|status|listen");
    }


    /**
     * 停止所有任务
     * @throws thinkdbexceptionDataNotFoundException
     * @throws thinkdbexceptionDbException
     * @throws thinkdbexceptionModelNotFoundException
     */
    protected function stopAction()
    {
        if (count($result = $this->process->thinkQuery($this->name)) < 1) {
            $this->output->writeln("># There are no task processes to stop");
        } else foreach ($result as $item) {
            $this->process->close(intval($item['pid']));
            $this->output->writeln("># Successfully sent end signal to process {$item['pid']}");
        }
    }

    /**
     * 启动后台任务
     * @throws thinkdbexceptionDataNotFoundException
     * @throws thinkdbexceptionDbException
     * @throws thinkdbexceptionModelNotFoundException
     */
    protected function startAction()
    {
        // SystemQueue::mk()->count();
        $this->output->comment(">$ {$this->process->think(static::QUEUE_LISTEN)}");
        if (count($result = $this->process->thinkQuery(static::QUEUE_LISTEN)) > 0) {
            $this->output->writeln("># " . static::QUEUE_LISTEN . " already exist for pid {$result[0]['pid']}");
        } else {
            $this->process->thinkCreate(static::QUEUE_LISTEN, 1000);
            if (count($result = $this->process->thinkQuery(static::QUEUE_LISTEN)) > 0) {

                $this->output->writeln("># " . static::QUEUE_LISTEN . " started successfully for pid {$result[0]['pid']}");

            } else {
                $this->output->writeln("># " . static::QUEUE_LISTEN . " failed to start");

            }
        }
    }

    /**
     * 查询兼听状态
     * @throws thinkdbexceptionDataNotFoundException
     * @throws thinkdbexceptionDbException
     * @throws thinkdbexceptionModelNotFoundException
     */
    protected function statusAction()
    {
        if (count($result = $this->process->thinkQuery(static::QUEUE_LISTEN)) > 0) {
            $this->output->writeln("Listening for main process {$result[0]['pid']} running");
        } else {
            $this->output->writeln("The Listening main process is not running");
        }
    }

    /**
     * 立即监听任务
     */
    protected function listenAction()
    {
        set_time_limit(0);
        ignore_user_abort(true);
        $this->app->db->setLog(new NullLogger());
        $this->output->writeln("tYou can exit with <info>`CTRL-C`</info>");
        $this->output->writeln('=============== LISTENING ===============');

        $this->begin();

    }

    protected function begin()
    {
        //参数说明,请参见AMQP客户端接入说明文档。
        $accessKey = config('aliyun.IotAppid');;
        $accessSecret = config('aliyun.IotAppkey');
        $consumerGroupId = "DEFAULT_GROUP";
        //iotInstanceId:实例ID。
        $iotInstanceId = "实例ID";
        //随意填写
        $clientId = "test1233434dfdfdf32";
        $timeStamp = round(microtime(true) * 1000);
        //签名方法:支持hmacmd5,hmacsha1和hmacsha256。
        $signMethod = "hmacsha1";
        //userName组装方法,请参见AMQP客户端接入说明文档。
        //若使用二进制传输,则userName需要添加encode=base64参数,服务端会将消息体base64编码后再推送。具体添加方法请参见下一章节“二进制消息体说明”。
        $userName = $clientId . "|authMode=aksign"
            . ",signMethod=" . $signMethod
            . ",timestamp=" . $timeStamp
            . ",authId=" . $accessKey
            . ",iotInstanceId=" . $iotInstanceId
            . ",consumerGroupId=" . $consumerGroupId
            . "|";
        $signContent = "authId=" . $accessKey . "&timestamp=" . $timeStamp;
        //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
        $password = base64_encode(hash_hmac("sha1", $signContent, $accessSecret, $raw_output = TRUE));
        $client = new Client('ssl://' . sysconf('device_instance_id').'.amqp.iothub.aliyuncs.com:61614');
        $sslContext = ['ssl' => ['verify_peer' => true, 'verify_peer_name' => false],];
        $client->getConnection()->setContext($sslContext);
        //服务端心跳监听。
        $observer = new ServerAliveObserver();
        $client->getConnection()->getObservers()->addObserver($observer);
        //心跳设置,需要云端每50s发送一次心跳包。
//        $client->setHeartbeat(0, 5000);
        $client->setHeartbeat(0, 5000);
        $client->setLogin($userName, $password);
        try {
            $client->connect();
        } catch (StompException $e) {
            echo "failed to connect to server, msg:" . $e->getMessage(), PHP_EOL;
        }
        //无异常时继续执行。
        $stomp = new StatefulStomp($client);
        $stomp->subscribe('/topic/#');
        $this->output->writeln('connect success .');
        while (true) {
            $this->output->writeln('ok');
            try {
                // 检查连接状态
                if (!$client->isConnected()) {
                    echo "connection not exists, will reconnect after 10s.", PHP_EOL;
                    sleep(10);
                    $client->connect();
                    $stomp->subscribe('/topic/#');
                    echo "connect success.", PHP_EOL;
                }
                $msg = $stomp->read();
                echo date('Y-m-d H:i:s');
                echo $msg;
                if (!empty($msg)) {
                    // 处理消息业务逻辑。
                    $model = MyService::instance()->insertLog($msg->getHeaders(), $msg->getBody());
                }
//                else{
//                }

            } catch (HeartbeatException $e) {
                echo 'The server failed to send us heartbeats within the defined interval.', PHP_EOL;
                $stomp->getClient()->disconnect();
            } catch (Exception $e) {
                echo 'process message occurs error: ' . $e->getMessage(), PHP_EOL;
                $stomp->getClient()->disconnect();
            }
        }
    }
}

最后

以上就是靓丽心锁为你收集整理的thinkphp6.0 阿里云 stomp前置条件实现效果:Composer.json代码的全部内容,希望文章能够帮你解决thinkphp6.0 阿里云 stomp前置条件实现效果:Composer.json代码所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(62)

评论列表共有 0 条评论

立即
投稿
返回
顶部