NSQ使用GoLang分发消息传递

 go教练   2019-06-11 14:57   18 人阅读  0 条评论

目前,随着微服务的爆炸式增长,使用消息队列以异步方式通信服务是很常见的。作为RabbitMQ或Kafka的大玩家已经通过互联网获得了很多教程,但最近我开始研究Go并且我想尝试一些消息队列,只是为了进行一些测试然后我找到了NSQ并且它自己在其网站上引入:“NSQ是一个实时分布式消息传递平台”

NSQ有两个主要组件,“NSQLOOKUPD”和“NSQD”

NSQD是接收,排队和向客户端传递消息的守护程序。

NSQLOOKUPD是管理拓扑信息的守护程序。客户端查询nsqlookupd以发现特定主题的nsqd生成器,nsqd节点广播主题和通道信息。

 微信截图_20190611095518.png

还有一个名为“NSQADMIN”的第三个组件,它是管理页面的UI。

对于我们的第一个测试,我们需要一个nsqd和一个nsqlookupd运行

对于我们的第一个测试,我们需要一个nsqd和一个nsqlookupd运行,

docker 运行:

# docker-compose.yml---version: '2'services: nsqlookupd: image: nsqio/nsq command: > /nsqlookupd -broadcast-address localhost:4160 ports: - "4160:4160" - "4161:4161" nsqd: image: nsqio/nsq command: > /nsqd -broadcast-address localhost -lookupd-tcp-address nsqlookupd:4160 ports: - "4150:4150" - "4151:4151"

我们可以使用以下命:

docker-compose up -d

使用NSQD和NSQLOOKUPD运行,让我们在Go中编写一个生产者和一个使用者,我将使用“nsq-event-bus”包,它是“go-nsq包的一个小包装”,生产者代码:

Package main import ( "github.com/rafaeljesus/nsq-event-bus" "log" ) type event struct{ Body string } func main() { topic := "events" emitter, err := bus.NewEmitter(bus.EmitterConfig{}) if err != nil { log.Fatal("[ERRO]", err) } message := "[Emitter 1] sending message" e := event{message} if err = emitter.Emit(topic, &e); err != nil { log.Println("error while was emitting message", err) } log.Println("[Message emitted]", message) }

消费者代码:

package main import ( "github.com/rafaeljesus/nsq-event-bus" "log" "sync" ) type event struct{ Body string } var wg sync.WaitGroup func main() { wg.Add(1) // just to test purposes, the program will await for one message if err := bus.On(bus.ListenerConfig{ Lookup: []string{"localhost:4161"}, Topic: "events", Channel: "consumer1", HandlerFunc: handler, }); err != nil { // handle failure to listen a message log.Println("Error while consuming message", err) } wg.Wait() } func handler(message *bus.Message) (reply interface{}, err error) { e := event{} if err = message.DecodePayload(&e); err != nil { // handle failure to decode a message log.Println("Error while consuming message", err) message.Finish() wg.Done() return } log.Println("[Consumer 1] Consuming message", e) message.Finish() wg.Done() return }

请注意,在消费者代码中,我使用了WaitGroup,因为在查找响应到达之前可能需要一段时间。要看到两个工作运行emitter.go或consumer.go(请记住,消费者将阻止终端,直到收到一条消息,然后你需要在其他终端运行发射器)

$ go run emitter.go 2017/03/01 01:46:31 INF 1 (localhost:4150) connecting to nsqd2017/03/01 01:46:31 [Message emitted] [Emitter 1] sending message$ go run consumer.go 2017/03/01 01:46:25 INF 1 [events/consumer1] querying nsqlookupd http://localhost:4161/lookup?topic=events2017/03/01 01:46:25 INF 1 [events/consumer1] (localhost:4150) connecting to nsqd2017/03/01 01:46:44 [Consumer 1] Consuming message {[Emitter 1] sending message}

以上就是今天给大家推荐的NSQ使用GoLang分发消息传递,如果你还想了解更多关于golang的知识技巧,可以继续关注我们http://fastgolang.com/

本文地址:http://fastgolang.com/56.html
版权声明:本文为原创文章,版权归 go教练 所有,欢迎分享本文,转载请保留出处!

 发表评论


表情

还没有留言,还不快点抢沙发?