namespace app\api\controller;
use library\Controller;
use Stomp\Client;
use Stomp\Exception\StompException;
use Stomp\Network\Observer\Exception\HeartbeatException;
use Stomp\Network\Observer\ServerAliveObserver;
use Stomp\StatefulStomp;
use Exception;
use think\Db;
use think\worker\Server;
class Subscribe extends Controller
public $accessKey = “”;
public $accessSecret = “”;
//消费组id 在对应实例的消息转发 > 服务端订阅 > 消费组列表查看您的消费组ID
public $consumerGroupId = “”;
public $clientId = “”;
public $iotInstanceId = “”;
public $timeStamp = ‘’;
public $signMethod = “hmacsha1”;
public function initialize() { $this->accessKey = sysconf('storage_oss_keyid'); $this->accessSecret = sysconf('storage_oss_secret'); $this->timeStamp = round(microtime(true) * 1000); $this->iotInstanceId = sysconf('storage_aliyun_iotInstanceId'); $this->consumerGroupId = sysconf('storage_aliyun_consumerGroupId'); $this->clientId = $this->uuid(); $this->hostUrl = ""; //按阿里云文档拼接url } public function sub() { $client = $this->createClient(); try { $client->connect(); } catch (StompException $e) { echo "failed to connect to server, msg:" . $e->getMessage(), PHP_EOL; } //无异常时继续执行。 $stomp = new StatefulStomp($client); //查询设备,根据设备拼装topic $stomp->subscribe("/topic/#"); echo "connect success"; while (true) { try { // 检查连接状态 if (!$client->isConnected()) { echo "connection not exists, will reconnect after 10s.", PHP_EOL; sleep(10); //需要重新创建连接,官方给出的$client->connect()会报错 $client = $this->createClient(); $stomp = new StatefulStomp($client); $stomp->subscribe("/topic/#"); echo "connect success", PHP_EOL; } //处理消息业务逻辑。会监听到所有的topic,根据topic来处理数据 $msg = $stomp->read(); $headers = $msg->getHeaders(); $readBody = $msg->getBody(); $this->doBusiness($headers,$readBody); } catch (HeartbeatException $e) { echo 'The server failed to send us heartbeats within the defined interval.', PHP_EOL; $stomp->getClient()->disconnect(); } catch (Exception $e) { echo 'process message occurs error: ' . $e->getMessage(), PHP_EOL; $stomp->getClient()->disconnect(); } } } private function createClient() { $userName = $this->clientId . "|authMode=aksign" . ",signMethod=" . $this->signMethod . ",timestamp=" . $this->timeStamp . ",authId=" . $this->accessKey . ",iotInstanceId=" . $this->iotInstanceId . ",consumerGroupId=" . $this->consumerGroupId . "|"; $signContent = "authId=" . $this->accessKey . "×tamp=" . $this->timeStamp; //计算签名,password组装方法,请参见AMQP客户端接入说明文档。 $password = base64_encode(hash_hmac("sha1", $signContent, $this->accessSecret, $raw_output = TRUE)); //接入域名,url请参见AMQP客户端接入说明文档。 $client = new Client('ssl://url:61614'); $sslContext = ['ssl' => ['verify_peer' => true, 'verify_peer_name' => false],]; $client->getConnection()->setContext($sslContext); //服务端心跳监听。 $observer = new ServerAliveObserver(); $client->getConnection()->getObservers()->addObserver($observer); //心跳设置,需要云端每30s发送一次心跳包。 $client->setHeartbeat(0, 30000); $client->setLogin($userName, $password); return $client; } public function uuid() { $chars = md5(uniqid(mt_rand(), true)); $uuid = substr ( $chars, 0, 8 ) . '-' . substr ( $chars, 8, 4 ) . '-' . substr ( $chars, 12, 4 ) . '-' . substr ( $chars, 16, 4 ) . '-' . substr ( $chars, 20, 12 ); return $uuid ; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。