Recently, when working on a toy project, I used MQ to separate two services, one of which is a scenario similar to broadcasting, and the services that meet the conditions respond.
The direct demo is as follows, mainly the configuration of Exchange and Auto Ack.
The MQ server is my old laptop, and it runs quite impressively on the internal network of 1000M hehe
package main
import (
"fmt"
"time"
"github.com/streadway/amqp"
)
func Api(port string) {
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", "dqn", "dqn", "192.168.1.104", 5672))
if err != nil {
println(err.Error())
}
chann, err := conn.Channel()
// publish each sec
go func() {
_ = chann.ExchangeDeclare("ApiSend", "fanout", false, false, false, false, nil)
for true {
chann.Publish("ApiSend", "", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("data from api[" + port + "]"),
})
time.Sleep(1 * time.Nanosecond)
}
}()
_, _ = chann.QueueDeclare("ApiGetQueue", false, false, false, false, nil)
_ = chann.ExchangeDeclare("ApiGet", "fanout", false, false, false, false, nil)
_ = chann.QueueBind("ApiGetQueue", "", "ApiGet", false, nil)
consumeChan, err := chann.Consume("ApiGetQueue", "", false, false, false, false, nil)
if err != nil {
println(err.Error())
}
for d := range consumeChan {
println("[api get] --> ", string(d.Body))
d.Ack(false)
}
}
func Data(port string) {
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", "dqn", "dqn", "192.168.1.104", 5672))
if err != nil {
println(err.Error())
}
chann, err := conn.Channel()
// publish each sec
_ = chann.ExchangeDeclare("ApiGet", "fanout", false, false, false, false, nil)
_, _ = chann.QueueDeclare("ApiSendGet"+port, false, false, false, false, nil)
_ = chann.ExchangeDeclare("ApiSend", "fanout", false, false, false, false, nil)
_ = chann.QueueBind("ApiSendGet"+port, "", "ApiSend", false, nil)
consumeChan, err := chann.Consume("ApiSendGet"+port, "", false, false, false, false, nil)
if err != nil {
println(err.Error())
}
for d := range consumeChan {
println("[data "+port+" get] --> ", string(d.Body))
chann.Publish("ApiGet", "", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("data from data[" + port + "]"),
})
d.Ack(false)
}
}
func main() {
go Api("1111")
go Data("2222")
Data("3333")
}