我是靠谱客的博主 靓丽心锁,最近开发中收集的这篇文章主要介绍thinkphp6.0 阿里云 stomp前置条件实现效果:Composer.json代码,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
前置条件
thinkadmin
ThinkAdmin: 基于 ThinkPHP 微信后台管理平台(体验账号和密码都是 admin)https://gitee.com/zoujingli/ThinkAdmin
实现效果:
// 开启
php think xapi:CmdAliyunStomp start
//守卫进程
php think xapi:CmdAliyunStomp start -d
// 关闭
php think xapi:CmdAliyunStomp stop
//状态
php think xapi:CmdAliyunStomp status
//监听
php think xapi:CmdAliyunStomp listen
Composer.json
添加: stomp-php/stomp-php
"require": {
"stomp-php/stomp-php": "5.0.0"
},
代码
<?php
namespace appapicommand;
use PsrLogNullLogger;
use StompClient;
use StompExceptionStompException;
use StompNetworkObserverExceptionHeartbeatException;
use StompNetworkObserverServerAliveObserver;
use StompStatefulStomp;
use thinkadminCommand;
use thinkadminException;
use thinkadminserviceProcessService;
use thinkconsoleInput;
use thinkconsoleinputArgument;
use thinkconsoleinputOption;
use thinkconsoleOutput;
/**
* 阿里云 CmdAliyunStomp
*/
class CmdAliyunStomp extends Command
{
/**
* php think xapi:CmdAliyunStomp start -d
* Class OrderClean
* @package appdatacommand
*/
// $name=
const QUEUE_LISTEN = 'xapi:CmdAliyunStomp listen';
protected $name = 'xapi:CmdAliyunStomp';
//不能重复创建;
protected function configure()
{
$this->setName($this->name);
$this->addOption('daemon', 'd', Option::VALUE_NONE, 'The queue listen in daemon mode');
$this->addArgument('action', Argument::OPTIONAL, 'stop|start|status|listen', 'listen');
$this->addArgument('code', Argument::OPTIONAL, 'Taskcode');
$this->addArgument('spts', Argument::OPTIONAL, 'Separator');
$this->setDescription('xapi:CmdTest listen for my');
}
/**
* 业务指令执行
* @param Input $input
* @param Output $output
* @return void
* @throws Exception
*/
protected function execute(Input $input, Output $output)
{
$action = $input->hasOption('daemon') ? 'start' : $input->getArgument('action');
if (method_exists($this, $method = "{$action}Action"))
return $this->$method();
$this->output->error("># Wrong operation, Allow stop|start|status|listen");
}
/**
* 停止所有任务
* @throws thinkdbexceptionDataNotFoundException
* @throws thinkdbexceptionDbException
* @throws thinkdbexceptionModelNotFoundException
*/
protected function stopAction()
{
if (count($result = $this->process->thinkQuery($this->name)) < 1) {
$this->output->writeln("># There are no task processes to stop");
} else foreach ($result as $item) {
$this->process->close(intval($item['pid']));
$this->output->writeln("># Successfully sent end signal to process {$item['pid']}");
}
}
/**
* 启动后台任务
* @throws thinkdbexceptionDataNotFoundException
* @throws thinkdbexceptionDbException
* @throws thinkdbexceptionModelNotFoundException
*/
protected function startAction()
{
// SystemQueue::mk()->count();
$this->output->comment(">$ {$this->process->think(static::QUEUE_LISTEN)}");
if (count($result = $this->process->thinkQuery(static::QUEUE_LISTEN)) > 0) {
$this->output->writeln("># " . static::QUEUE_LISTEN . " already exist for pid {$result[0]['pid']}");
} else {
$this->process->thinkCreate(static::QUEUE_LISTEN, 1000);
if (count($result = $this->process->thinkQuery(static::QUEUE_LISTEN)) > 0) {
$this->output->writeln("># " . static::QUEUE_LISTEN . " started successfully for pid {$result[0]['pid']}");
} else {
$this->output->writeln("># " . static::QUEUE_LISTEN . " failed to start");
}
}
}
/**
* 查询兼听状态
* @throws thinkdbexceptionDataNotFoundException
* @throws thinkdbexceptionDbException
* @throws thinkdbexceptionModelNotFoundException
*/
protected function statusAction()
{
if (count($result = $this->process->thinkQuery(static::QUEUE_LISTEN)) > 0) {
$this->output->writeln("Listening for main process {$result[0]['pid']} running");
} else {
$this->output->writeln("The Listening main process is not running");
}
}
/**
* 立即监听任务
*/
protected function listenAction()
{
set_time_limit(0);
ignore_user_abort(true);
$this->app->db->setLog(new NullLogger());
$this->output->writeln("tYou can exit with <info>`CTRL-C`</info>");
$this->output->writeln('=============== LISTENING ===============');
$this->begin();
}
protected function begin()
{
//参数说明,请参见AMQP客户端接入说明文档。
$accessKey = config('aliyun.IotAppid');;
$accessSecret = config('aliyun.IotAppkey');
$consumerGroupId = "DEFAULT_GROUP";
//iotInstanceId:实例ID。
$iotInstanceId = "实例ID";
//随意填写
$clientId = "test1233434dfdfdf32";
$timeStamp = round(microtime(true) * 1000);
//签名方法:支持hmacmd5,hmacsha1和hmacsha256。
$signMethod = "hmacsha1";
//userName组装方法,请参见AMQP客户端接入说明文档。
//若使用二进制传输,则userName需要添加encode=base64参数,服务端会将消息体base64编码后再推送。具体添加方法请参见下一章节“二进制消息体说明”。
$userName = $clientId . "|authMode=aksign"
. ",signMethod=" . $signMethod
. ",timestamp=" . $timeStamp
. ",authId=" . $accessKey
. ",iotInstanceId=" . $iotInstanceId
. ",consumerGroupId=" . $consumerGroupId
. "|";
$signContent = "authId=" . $accessKey . "×tamp=" . $timeStamp;
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
$password = base64_encode(hash_hmac("sha1", $signContent, $accessSecret, $raw_output = TRUE));
$client = new Client('ssl://' . sysconf('device_instance_id').'.amqp.iothub.aliyuncs.com:61614');
$sslContext = ['ssl' => ['verify_peer' => true, 'verify_peer_name' => false],];
$client->getConnection()->setContext($sslContext);
//服务端心跳监听。
$observer = new ServerAliveObserver();
$client->getConnection()->getObservers()->addObserver($observer);
//心跳设置,需要云端每50s发送一次心跳包。
// $client->setHeartbeat(0, 5000);
$client->setHeartbeat(0, 5000);
$client->setLogin($userName, $password);
try {
$client->connect();
} catch (StompException $e) {
echo "failed to connect to server, msg:" . $e->getMessage(), PHP_EOL;
}
//无异常时继续执行。
$stomp = new StatefulStomp($client);
$stomp->subscribe('/topic/#');
$this->output->writeln('connect success .');
while (true) {
$this->output->writeln('ok');
try {
// 检查连接状态
if (!$client->isConnected()) {
echo "connection not exists, will reconnect after 10s.", PHP_EOL;
sleep(10);
$client->connect();
$stomp->subscribe('/topic/#');
echo "connect success.", PHP_EOL;
}
$msg = $stomp->read();
echo date('Y-m-d H:i:s');
echo $msg;
if (!empty($msg)) {
// 处理消息业务逻辑。
$model = MyService::instance()->insertLog($msg->getHeaders(), $msg->getBody());
}
// else{
// }
} catch (HeartbeatException $e) {
echo 'The server failed to send us heartbeats within the defined interval.', PHP_EOL;
$stomp->getClient()->disconnect();
} catch (Exception $e) {
echo 'process message occurs error: ' . $e->getMessage(), PHP_EOL;
$stomp->getClient()->disconnect();
}
}
}
}
最后
以上就是靓丽心锁为你收集整理的thinkphp6.0 阿里云 stomp前置条件实现效果:Composer.json代码的全部内容,希望文章能够帮你解决thinkphp6.0 阿里云 stomp前置条件实现效果:Composer.json代码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复