6 20
NSQ使用总结

最近, 由于需要使用NSQ作为队列的支撑工作, 于是对NSQ进行了相关的研究与源码的阅读.

以下是本人使用过程中对NSQ的一些认知与实践总结

1. NSQ基础

NSQ是一个基于Go语言的分布式实时消息平台。

NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。

1.1 NSQ工具组件主要包含:

nsqd:一个负责接收、排队、转发消息到客户端的守护进程

nsqlookupd:管理拓扑信息并提供最终一致性的发现服务的守护进程

nsqadmin:一套Web用户界面,可实时查看集群的统计数据和执行各种各样的管理任务

1.2 核心概念

topic: topic是nsq的消息发布的逻辑关键词。当程序初次发布带topic的消息时,如果topic不存在,则会被创建。

channels: 当生产者每次发布消息的时候,消息会采用多播的方式被拷贝到各个channel中,channel起到队列的作用。

messages: 数据流的形式。

1.3 消息的生命周期

NSQ提倡co-locating的部署方式, 也就是生产者与nsqd尽量在同一个实例环境中。生产者不需要负责发现其他的nsqd实例, 它们只管往自己的nsqd中投放消息。

具体的流程方式为:

  1. 生产者往本地的nsqd中发送消息.这个过程会开启一个连接, 并发送一个带有topic和消息体的PUB的命令到nsqd中. 我们假如是发送一个events的topic

  2. events topic 会对消息进行copy,并多路发送到各个channel中, 我们假设有三个channel, 那么这个流程会如下图描述所示:

    nsq

  3. 在channel中的每条消息会被放进队列中, 直到消息被worker所消费掉, 如果队列占用的内存超出限制, 消息会被写进硬盘

  4. nsqd节点会首先向nsqlookd节点广播它的位置信息, 一旦这些信息被nsqlookupd注册上, workers就会发现这些nsqd节点,包括这些节点的events topic

  5. 相关过程如下图

    nsqlookup

  6. 每个worker会订阅各个nsqd的host信息, 表示它已经准备好消息的接收.

1.4 与其它MQ的比较

  1. ActiveMQ apache开源项目, 支持master-slave broker-cluster集群,

  2. rabbitMQ erlang开发,比较重量级

  3. zeroMQ 跟以上两个比起来,这无疑是最快的,但最不稳定的,可以在此基础上开发分布式,坑比较多,需要自己慢慢填。

  4. kafka apache的另一个项目,scala语言开发, 速度很快,但客户端需要自己测试和开发

2. NSQ常见部署方式

2.1 下载

官网下载页

2.1 服务器端部署

启动nsqlookupd

$ nsqlookupd

运行两个测试的nsqd实例

$ nsqd --lookupd-tcp-address=localhost:4160 --data-path=/home/steve-3/nsq/data1

$ nsqd --worker-id=857 --lookupd-tcp-address=localhost:4160 --tcp-address=0.0.0.0:4250 --http-address=0.0.0.0:4251 --data-path=/home/steve-3/nsq/data2

运行前端监控

$ nsqadmin --lookupd-http-address=localhost:4161

默认情况下, 可以在浏览器输入: host-name:4171 打开监控面板

nsq_admin

由于nsq提供了一个unix-like的工具,所以我们可以在终端使用以下命令进行消息的发送测试:

$ curl -d 'hello world 1' 'http://127.0.0.1:4151/put?topic=test'
$ curl -d 'hello world 2' 'http://127.0.0.1:4251/put?topic=test'

发送后, 可以在监控面板观察页面数据的变化

使用go编写NSQ消费端的例子

consumer.go

package queue

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "log"
    "os"
)

type logger interface {
    Output(int, string) error
}

type Consumer struct {
    client      *nsq.Consumer
    config      *nsq.Config
    nsqds       []string
    nsqlookupds []string
    concurrency int
    channel     string
    topic       string
    level       nsq.LogLevel
    log         logger
    err         error
}

//初始化消费端
func NewConsumer(topic, channel string) *Consumer {
    return &Consumer{
        log:         log.New(os.Stderr, "", log.LstdFlags),
        config:      nsq.NewConfig(),
        level:       nsq.LogLevelInfo,
        channel:     channel,
        topic:       topic,
        concurrency: 1,
    }
}

func (c *Consumer) SetLogger(log logger, level nsq.LogLevel) {
    c.level = level
    c.log = log
}

func (c *Consumer) SetMap(options map[string]interface{}) {
    for k, v := range options {
        c.Set(k, v)
    }
}

func (c *Consumer) Set(option string, value interface{}) {
    switch option {
    case "topic":
        c.topic = value.(string)
    case "channel":
        c.channel = value.(string)
    case "concurrency":
        c.concurrency = value.(int)
    case "nsqd":
        c.nsqds = []string{value.(string)}
    case "nsqlookupd":
        c.nsqlookupds = []string{value.(string)}
    case "nsqds":
        s, err := strings(value)
        if err != nil {
            c.err = fmt.Errorf("%q: %v", option, err)
            return
        }
        c.nsqds = s
    case "nsqlookupds":
        s, err := strings(value)
        if err != nil {
            c.err = fmt.Errorf("%q: %v", option, err)
            return
        }
        c.nsqlookupds = s
    default:
        err := c.config.Set(option, value)
        if err != nil {
            c.err = err
        }
    }
}

func (c *Consumer) Start(handler nsq.Handler) error {

    if c.err != nil {
        return c.err
    }

    client, err := nsq.NewConsumer(c.topic, c.channel, c.config)
    if err != nil {
        return err
    }
    c.client = client
    client.SetLogger(c.log, c.level)
    client.AddConcurrentHandlers(handler, c.concurrency)
    return c.connect()
}

//连接到nsqd
func (c *Consumer) connect() error {

    if len(c.nsqds) == 0 && len(c.nsqlookupds) == 0 {
        return fmt.Errorf(`at least one "nsqd" or "nsqlookupd" address must be configured`)
    }

    if len(c.nsqds) > 0 {
        err := c.client.ConnectToNSQDs(c.nsqds)
        if err != nil {
            return err
        }
    }
    if len(c.nsqlookupds) > 0 {
        err := c.client.ConnectToNSQLookupds(c.nsqlookupds)
        if err != nil {
            return err
        }
    }
    return nil
}

//stop and wait
func (c *Consumer) Stop() error {
    c.client.Stop()
    <-c.client.StopChan
    return nil
}

func strings(v interface{}) ([]string, error) {
    switch v.(type) {
    case []string:
        return v.([]string), nil
    case []interface{}:
        var ret []string
        for _, e := range v.([]interface{}) {
            s, ok := e.(string)
            if !ok {
                return nil, fmt.Errorf("string expected")
            }
            ret = append(ret, s)
        }
        return ret, nil
    default:
        return nil, fmt.Errorf("strings expected")
    }
}

main.go


package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "your_go_path/project/queue"
)

func main() {
    done := make(chan bool)
    c := queue.NewConsumer("test", "testchan2")
    c.Set("nsqds", []string{"192.168.139.134:4150", "192.168.139.134:4250"})
    c.Set("concurrency", 15)
    c.Set("max_attempts", 10)
    c.Set("max_in_flight", 150)
    err := c.Start(nsq.HandlerFunc(func(msg *nsq.Message) error {
        fmt.Println("customer2:", string(msg.Body))
        return nil
    }))
    if err != nil {
        fmt.Errorf(err.Error())
    }
    <-done
}


使用总结

nsq大部分情况基本能满足我们作为消息队列的要求,而且性能与单点故障处理能力也比较出色.

但它不适用的地方主要有:

有顺序要求的消息

不支持副本集的集群方式