ThinkAdmin: 基于 ThinkPHP 微信后台管理平台(体验账号和密码都是 admin)https://gitee.com/zoujingli/ThinkAdmin
// 开启
php think xapi:CmdAliyunStomp start
php think xapi:CmdAliyunStomp start -d
// 关闭
php think xapi:CmdAliyunStomp stop
php think xapi:CmdAliyunStomp status
php think xapi:CmdAliyunStomp listen
添加: stomp-php/stomp-php
"require": {
"stomp-php/stomp-php": "5.0.0"
namespace appapicommand;
use PsrLogNullLogger;
use StompClient;
use StompExceptionStompException;
use StompNetworkObserverExceptionHeartbeatException;
use StompNetworkObserverServerAliveObserver;
use StompStatefulStomp;
use thinkadminCommand;
use thinkadminException;
use thinkadminserviceProcessService;
use thinkconsoleInput;
use thinkconsoleinputArgument;
use thinkconsoleinputOption;
use thinkconsoleOutput;
* 阿里云 CmdAliyunStomp
class CmdAliyunStomp extends Command
* php think xapi:CmdAliyunStomp start -d
* Class OrderClean
* @package appdatacommand
// $name=
const QUEUE_LISTEN = 'xapi:CmdAliyunStomp listen';
protected $name = 'xapi:CmdAliyunStomp';
protected function configure()
$this->addOption('daemon', 'd', Option::VALUE_NONE, 'The queue listen in daemon mode');
$this->addArgument('action', Argument::OPTIONAL, 'stop|start|status|listen', 'listen');
$this->addArgument('code', Argument::OPTIONAL, 'Taskcode');
$this->addArgument('spts', Argument::OPTIONAL, 'Separator');
$this->setDescription('xapi:CmdTest listen for my');
* 业务指令执行
* @param Input $input
* @param Output $output
* @return void
* @throws Exception
protected function execute(Input $input, Output $output)
$action = $input->hasOption('daemon') ? 'start' : $input->getArgument('action');
if (method_exists($this, $method = "{$action}Action"))
return $this->$method();
$this->output->error("># Wrong operation, Allow stop|start|status|listen");
* 停止所有任务
* @throws thinkdbexceptionDataNotFoundException
* @throws thinkdbexceptionDbException
* @throws thinkdbexceptionModelNotFoundException
protected function stopAction()
if (count($result = $this->process->thinkQuery($this->name)) < 1) {
$this->output->writeln("># There are no task processes to stop");
} else foreach ($result as $item) {
$this->output->writeln("># Successfully sent end signal to process {$item['pid']}");
* 启动后台任务
* @throws thinkdbexceptionDataNotFoundException
* @throws thinkdbexceptionDbException
* @throws thinkdbexceptionModelNotFoundException
protected function startAction()
// SystemQueue::mk()->count();
$this->output->comment(">$ {$this->process->think(static::QUEUE_LISTEN)}");
if (count($result = $this->process->thinkQuery(static::QUEUE_LISTEN)) > 0) {
$this->output->writeln("># " . static::QUEUE_LISTEN . " already exist for pid {$result[0]['pid']}");
} else {
$this->process->thinkCreate(static::QUEUE_LISTEN, 1000);
if (count($result = $this->process->thinkQuery(static::QUEUE_LISTEN)) > 0) {
$this->output->writeln("># " . static::QUEUE_LISTEN . " started successfully for pid {$result[0]['pid']}");
} else {
$this->output->writeln("># " . static::QUEUE_LISTEN . " failed to start");
* 查询兼听状态
* @throws thinkdbexceptionDataNotFoundException
* @throws thinkdbexceptionDbException
* @throws thinkdbexceptionModelNotFoundException
protected function statusAction()
if (count($result = $this->process->thinkQuery(static::QUEUE_LISTEN)) > 0) {
$this->output->writeln("Listening for main process {$result[0]['pid']} running");
} else {
$this->output->writeln("The Listening main process is not running");
* 立即监听任务
protected function listenAction()
$this->app->db->setLog(new NullLogger());
$this->output->writeln("tYou can exit with <info>`CTRL-C`</info>");
$this->output->writeln('=============== LISTENING ===============');
protected function begin()
$accessKey = config('aliyun.IotAppid');;
$accessSecret = config('aliyun.IotAppkey');
$consumerGroupId = "DEFAULT_GROUP";
$iotInstanceId = "实例ID";
$clientId = "test1233434dfdfdf32";
$timeStamp = round(microtime(true) * 1000);
$signMethod = "hmacsha1";
$userName = $clientId . "|authMode=aksign"
. ",signMethod=" . $signMethod
. ",timestamp=" . $timeStamp
. ",authId=" . $accessKey
. ",iotInstanceId=" . $iotInstanceId
. ",consumerGroupId=" . $consumerGroupId
. "|";
$signContent = "authId=" . $accessKey . "×tamp=" . $timeStamp;
$password = base64_encode(hash_hmac("sha1", $signContent, $accessSecret, $raw_output = TRUE));
$client = new Client('ssl://' . sysconf('device_instance_id').'.amqp.iothub.aliyuncs.com:61614');
$sslContext = ['ssl' => ['verify_peer' => true, 'verify_peer_name' => false],];
$observer = new ServerAliveObserver();
// $client->setHeartbeat(0, 5000);
$client->setHeartbeat(0, 5000);
$client->setLogin($userName, $password);
try {
} catch (StompException $e) {
echo "failed to connect to server, msg:" . $e->getMessage(), PHP_EOL;
$stomp = new StatefulStomp($client);
$this->output->writeln('connect success .');
while (true) {
try {
// 检查连接状态
if (!$client->isConnected()) {
echo "connection not exists, will reconnect after 10s.", PHP_EOL;
echo "connect success.", PHP_EOL;
$msg = $stomp->read();
echo date('Y-m-d H:i:s');
echo $msg;
if (!empty($msg)) {
// 处理消息业务逻辑。
$model = MyService::instance()->insertLog($msg->getHeaders(), $msg->getBody());
// else{
// }
} catch (HeartbeatException $e) {
echo 'The server failed to send us heartbeats within the defined interval.', PHP_EOL;
} catch (Exception $e) {
echo 'process message occurs error: ' . $e->getMessage(), PHP_EOL;
以上就是靓丽心锁最近收集整理的关于thinkphp6.0 阿里云 stomp前置条件实现效果:Composer.json代码的全部内容,更多相关thinkphp6.0内容请搜索靠谱客的其他文章。
发表评论 取消回复