概述
rabbitMq集成(rabbitMq.go)
package rabbitMQ
import (
"github.com/streadway/amqp"
"smartracing.cn/appointment/member/internal/conf"
)
type Delivery func(<-chan amqp.Delivery)
type MQServer interface {
CreateQueue(string) (amqp.Queue, error)
Publish(string, []byte) error
RegisterConsumer(qName string, delivery Delivery)
StartConsumer()
}
type RabbitMQ struct {
conn
*amqp.Connection
callback
map[string]Delivery
connNotify chan *amqp.Error
}
func (rabbitmq *RabbitMQ) RegisterConsumer(qName string, delivery Delivery) {
rabbitmq.callback[qName] = delivery
}
// 新建rabbitMq连接
func New(uri string) (MQServer, error) {
conn, err := amqp.Dial(uri)
if err != nil {
return nil, err
}
rabbit := &RabbitMQ{conn: conn, callback: make(map[string]Delivery)}
return rabbit, nil
}
// 创建队列
func (rabbitmq *RabbitMQ) CreateQueue(qName string) (amqp.Queue, error) {
ch, err := rabbitmq.conn.Channel()
if err != nil {
return amqp.Queue{}, err
}
defer ch.Close()
queue, err := ch.QueueDeclare(qName, true, false, false, false, nil)
if err != nil {
return amqp.Queue{}, err
}
return queue, nil
}
// 发布者
func (rabbitmq *RabbitMQ) Publish(qName string, body []byte) error {
queue, err := rabbitmq.CreateQueue(qName)
ch, err := rabbitmq.conn.Channel()
if err != nil {
return err
}
defer ch.Close()
err = ch.Publish(
"",
queue.Name,
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType:
"application/json",
Body:
body,
})
if err != nil {
return err
}
conf.Log.Infof("rabbitMQ publish queue:%s, message:%s", qName, string(body))
return nil
}
func SendRabbitMQ(queue string, body []byte) error {
r, err := New(conf.AppConfig.RabbitMqUrl)
if err != nil {
conf.Log.Errorf("New RabbitMQ Fatal:%v", err)
return err
}
if err := r.Publish(queue, body); err != nil {
return err
}
return nil
}
// 消费者
func (rabbitmq *RabbitMQ) StartConsumer() {
for queue, delivery := range rabbitmq.callback {
go func(qName string, delivery Delivery) {
queue, err := rabbitmq.CreateQueue(qName)
if err != nil {
conf.Log.Fatalf("rabbitmq create queue fatal:%v", err)
panic(err.Error())
}
ch, err := rabbitmq.conn.Channel()
if err != nil {
conf.Log.Fatalf("rabbitmq channel fatal:%v", err)
panic(err.Error())
}
defer ch.Close()
err = ch.Qos(
3,
0,
false,
)
if err != nil {
conf.Log.Fatalf("rabbitmq qos fatal:%v", err)
panic(err.Error())
}
msgs, err := ch.Consume(queue.Name, "", false, false, false, false, nil)
if err != nil {
conf.Log.Fatalf("rabbitmq consume fatal:%v", err)
panic(err.Error())
}
conf.Log.Infof("rabbitmq add consume success, consume:%s, delivery:%v", queue, delivery)
go delivery(msgs)
select {}
}(queue, delivery)
}
}
创建消费者
package rabbitMQ
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"smartracing.cn/appointment/member/internal/conf"
)
type MqData struct {
ID
string `json:"id"`
Name string `json:"name"`
}
func TextMqConsumer(delivery <-chan amqp.Delivery) {
for msg := range delivery {
conf.Log.Infof("Received TextDetectResult: %s", msg.Body)
fmt.Printf("Received TextDetectResult: %sn", msg.Body)
var data MqData
json.Unmarshal(msg.Body, &data)
msg.Ack(false)
}
}
发布消息
// 发送关注mq
mqData := map[string]interface{}{"uId": claims.Uid, "followId": memberId, "type": 1}
dataBytes, _ := json.Marshal(mqData)
errMq := rabbitMQ.SendRabbitMQ("subscribe", dataBytes)
if errMq != nil {
conf2.Log.Error(errMq.Error())
}
项目监听消息
// rabbitMq消费
func startRabbitMQ() error {
r, err := rabbitMQ.New(conf.AppConfig.RabbitMqUrl)
if err != nil {
conf.Log.Fatalf("New RabbitMQ Fatal:%s", err.Error())
return err
}
r.RegisterConsumer("subscribe", rabbitMQ.TextMqConsumer)
r.StartConsumer()
return nil
}
func main() {
g.Go(startApp)
//g.Go(startRabbitMQ)
if err := g.Wait(); err != nil {
log.Fatal(err)
}
}
最后
以上就是超级冬日为你收集整理的go队列方式集成rabbitMq的全部内容,希望文章能够帮你解决go队列方式集成rabbitMq所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复