大家好,欢迎来到IT知识分享网。
什么是Nats
NATS:NATS是一个开源的、分布式的、高性能的消息传递系统,用于构建分布式和微服务应用程序。它提供了一种简单、可靠、高效的方式来传递信息,具有良好的可扩展性和容错性。
使用Go语言重写,能够达到每秒8-11百万个消息,整个程序很小只有3M。
NATS适合云基础设施的消息通信系统、IoT设备消息通信和微服务架构。
类似于 kafka等中间件,发布-订阅模型的设计。
常应用于:物联网消息传递,云原生和微服务。
支持的协议:
- TCP
- MQTT
- Websocket
Nats默认端口
- 监听客户端连接端口:4222
- 提供监控服务端http端口:8222
- 集群连接端口:6222
NATS官方学习文档
NATS.io – Cloud Native, Open Source, High-performance Messaging
nats下载地址
官网:https://nats.io/download/
github:
备注 不管是core-nats 或者nats-jetstream都是用这个nats-server执行程序,通过更改配置文件或者启动命令来开启nats-jetstream
部署
core-nats单节点部署和集群部署
单机部署
启动配置文件nats.conf
#nats.conf # 端口Client port of 4222 on all interfaces port: 4222 # http 可以用客户端工具连接nats 获取数据HTTP monitoring port http_port: 8222 authorization { user: nats password: timeout: 10 }
启动命令
运行 ./nats-server -c nats.conf
core-nats集群部署(三节点)
节点1
# node1 - nats.conf listen: 0.0.0.0:4222 # HTTP监控端口 http_port: 8222 # 客户端连接的认证信息 authorization { user: node1_user password: node1_pass timeout: 1 } # 集群定义 cluster { name: "nats-cluster" listen: 0.0.0.0:6222 routes = [ nats-route://route2_user:route2_pass@node2:6222, nats-route://route3_user:route3_pass@node3:6222 ] # 路由连接的认证信息 authorization { user: route1_user password: route1_pass timeout: 1 } } # 日志选项 log_file: "./nats.log" # PID进程文件 pid_file: "./nats.pid"
节点2
# node2 - nats.conf listen: 0.0.0.0:4222 # HTTP监控端口 http_port: 8222 # 客户端连接的认证信息 authorization { user: node2_user password: node2_pass timeout: 1 } # 集群定义 cluster { name: "nats-cluster" listen: 0.0.0.0:6222 routes = [ nats-route://route1_user:route1_pass@node1:6222, nats-route://route3_user:route3_pass@node3:6222 ] # 路由连接的认证信息 authorization { user: route2_user password: route2_pass timeout: 1 } } # 日志选项 log_file: "./nats.log" # PID进程文件 pid_file: "./nats.pid"
节点3
# node3 - nats.conf listen: 0.0.0.0:4222 # HTTP监控端口 http_port: 8222 # 客户端连接的认证信息 authorization { user: node3_user password: node3_pass timeout: 1 } # 集群定义 cluster { name: "nats-cluster" listen: 0.0.0.0:6222 routes = [ nats-route://route1_user:route1_pass@node1:6222, nats-route://route2_user:route2_pass@node2:6222 ] # 路由连接的认证信息 authorization { user: route3_user password: route3_pass timeout: 1 } } # 日志选项 log_file: "./nats.log" # PID进程文件 pid_file: "./nats.pid"
启动命令
每个节点 运行 nats-server -c nats.conf
nats-jetstream单机部署和集群部署
单机部署
#server_name=n1-c1 # Client port of 4222 on all interfaces port: 4222 # HTTP monitoring port http_port: 8222 jetstream { store_dir=/usr/local/nats/storage enabled: true max_mem_store: 100MB max_file_store: 500MB } # This is for clustering multiple servers together. #cluster { # It is recommended to set a cluster name # name: "my_cluster" # Route connections to be received on any interface on port 6222 # port: 6222 # Routes are protected, so need to use them with --routes flag # e.g. --routes=nats-route://ruser:T0pS3cr3t@otherdockerhost:6222 # authorization { # user: ruser # password: T0pS3cr3t # timeout: 2 # } # Routes are actively solicited and connected to from this server. # This Docker image has none by default, but you can pass a # flag to the nats-server docker image to create one to an existing server. # routes = [] #}
集群部署
节点1
server_name=n1-c1 listen=4222 accounts { $SYS { users = [ { user: "admin", pass: "$2a$11$DRh4C0KNbNnD8K/hb/buWe1zPxEHrLEiDmuq1Mi0rRJiH/W25Qidm" } ] } } jetstream { store_dir=/nats/storage } cluster { name: C1 listen: 0.0.0.0:6222 routes: [ nats://host_b:6222 nats://host_c:6222 ] }
节点2
server_name=n2-c1 listen=4222 accounts { $SYS { users = [ { user: "admin", pass: "$2a$11$DRh4C0KNbNnD8K/hb/buWe1zPxEHrLEiDmuq1Mi0rRJiH/W25Qidm" } ] } } jetstream { store_dir=/nats/storage } cluster { name: C1 listen: 0.0.0.0:6222 routes: [ nats://host_a:6222 nats://host_c:6222 ] }
节点3
server_name=n3-c1 listen=4222 accounts { $SYS { users = [ { user: "admin", pass: "$2a$11$DRh4C0KNbNnD8K/hb/buWe1zPxEHrLEiDmuq1Mi0rRJiH/W25Qidm" } ] } } jetstream { store_dir=/nats/storage } cluster { name: C1 listen: 0.0.0.0:6222 routes: [ nats://host_a:6222 nats://host_b:6222 ] }
Nats优点
Nats产品分类和产品选择
产品介绍
- Core-Nats核心
- Nats-JetStream
两者主要区别:
提供的消息质量:
- Nats核心只提供最多交付一次的消息质量。
- JetStream提供至少一次/恰好一次 QoS
消息存储:
- NATS核心不提供消息持久化,
- JetStream提供消息存储和过期策略和删除策略等。
Nats使用和配置文件讲解
NATS主题-讲解
格式
用”.”来分隔主题的层次
time.us time.us.east time.us.east.atlanta time.eu.east time.eu.east.warsaw
通配符讲解
nats中通配符有两种:”*”和”>”.
* 通配符用来匹配单个标记,例如:time.*.east 可以监听到这将匹配time.us.east和time.eu.east。不能匹配time.eu.us.east.
>通配符它将匹配一个或多个标记。
例如,time.us.>将匹配time.us.east和time.us.east.atlanta,而time.us.*只会匹配,time.us.east因为它不能匹配多个标记。
混合通配符
通配符*可以在同一主题中出现多次。两种类型都可以使用。例如,*.*.east.>将收到time.us.east.atlanta。
Nats如何解决消息队列中常见问题
订阅
订阅和kafka等常见的消息队列类似,值得强调的是nats中也有类似于kafka中的消费者组的概念
-不同的客户端用相同的消费者组去订阅同一个主图,服务端保证每个消息只被消费者组中的一个消费者消费。(提供的内置负载平衡功能)
-
确保应用程序容错
- 工作负载处理可以扩大或缩小
- 扩大或缩小消费者规模,无需重复发送消息
- 无需额外配置
- 队列组由应用程序及其队列订阅者定义,而不是由服务器配置定义
普通订阅 core-nats产品举例
nc, err := nats.Connect("demo.nats.io") if err != nil { log.Fatal(err) } defer nc.Close() // Use a WaitGroup to wait for a message to arrive wg := sync.WaitGroup{} wg.Add(1) // 异步订阅 if _, err := nc.Subscribe("updates", func(m *nats.Msg) { wg.Done() }); err != nil { log.Fatal(err) } //同步订阅 保证消息顺序 // sub, err := nc.SubscribeSync("updates") // Wait for a message to come in wg.Wait()
消费者组订阅实例代码:core-nats产品举例
package main import "github.com/nats-io/nats.go" func main() { connect, err := nats.Connect("") if err != nil { } connect.QueueSubscribeSync() //同步保证消息顺序 connect.QueueSubscribe() //异步不保证顺序 connect.QueueSubscribeSyncWithChan() //同步消息顺序发送到管道里接收 connect.ChanQueueSubscribe() //异步不保证顺序 }
消费者组订阅实例代码:nats-jetStream产品举例(有个Pull 拉去消息的概念)
import ( "github.com/nats-io/nats.go" "log" "time" ) func main() { nc, err := nats.Connect("") if err != nil { } js, err := nc.JetStream() if err != nil { log.Fatal(err) } js.QueueSubscribeSync() //顺序 js.QueueSubscribe() js.ChanQueueSubscribe() // 消息发布到指定管道 }
注意 JetStream这里订阅的有个常用的:通过消费者主动去服务端拉去消息的(拉取消费者在 NATS 服务器上产生的 CPU 负载较少,因此扩展性更好) PullSubscribe
func ExampleJetStream() { nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Use the JetStream context to produce and consumer messages // that have been persisted. js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) if err != nil { log.Fatal(err) } js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) js.Publish("foo", []byte("Hello JS!")) // Publish messages asynchronously. for i := 0; i < 500; i++ { js.PublishAsync("foo", []byte("Hello JS Async!")) } select { case <-js.PublishAsyncComplete(): case <-time.After(5 * time.Second): fmt.Println("Did not resolve in time") } // Create Pull based consumer with maximum 128 inflight. sub, _ := js.PullSubscribe("foo", "wq", nats.PullMaxWaiting(128)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for { select { case <-ctx.Done(): return default: } // Fetch will return as soon as any message is available rather than wait until the full batch size is available, using a batch size of more than 1 allows for higher throughput when needed. msgs, _ := sub.Fetch(10, nats.Context(ctx)) for _, msg := range msgs { msg.Ack() } } }
发布
core-nats产品简单发(核心版本的nats不保证可靠性交付:发送一次就不管了,适用于可容忍消息丢失的情况)
nc, err := nats.Connect("demo.nats.io", nats.Name("API PublishBytes Example")) if err != nil { log.Fatal(err) } defer nc.Close() if err := nc.Publish("updates", []byte("All is Well")); err != nil { log.Fatal(err) }
关于nats-jetstream的发布需要先穿就“流”,放到下面Jetstream中进行讲解
Nats-JetStream
概念
NATS 有一个内置的持久性引擎,称为JetStream,它允许存储消息并在以后重播。与需要您拥有有效订阅才能在消息发生时处理消息的NATS Core不同,JetStream 允许 NATS 服务器捕获消息并根据需要将其重播给消费者。此功能为您的 NATS 消息提供了不同的服务质量,并实现了容错和高可用性配置。
流创建及配置
用代码简单介绍一下流的创建个关键信息配置
package main import ( "github.com/nats-io/nats.go" "log" "time" ) func main() { nc, err := nats.Connect("") if err != nil { } js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) if err != nil { log.Fatal(err) } _, err = js.AddStream(&nats.StreamConfig{ Name: "device", //流名称 Subjects: []string{"device.*"}, //需要监视并且存储消息的主题名 Storage: nats.FileStorage, //消息存储方式 文件或者内存 Retention: nats.LimitsPolicy, //保留策略 Discard: nats.DiscardOld, //丢弃策略 最老的消息丢弃或者最新的 MaxAge: 48 * time.Hour, //存储消息的最大时间 Duplicates: time.Hour * 24, // 恰好一次消息的检测时间段 在消息头里如果加了消息id,那么24小时内存在相同的id就认为是同一个消息 }) } //这样就创建了一个名称为device的流
核心配置讲解
// jsm.go // StreamConfig 用于定义一个流, 大多数参数都有合理的默认值 // 如果 subject 没写, 就会分配一个随机的 subject type StreamConfig struct { // 名称 Name string // 描述 Description string // 对应的多个 Subject Subjects []string // 消息 3 种保留策略 // RetentionPolicy 最大消息数, 最大存储空间或者最大存活时间达到限制, 就可以删除消息 // InterestPolicy 需要所有 consumer 确认可以删除消息 // WorkQueuePolicy 只需要一个 consumer 确认可以删除消息 Retention RetentionPolicy // 最大 Consumer 数量 MaxConsumers int // 最大存储 Mgs 数量 MaxMsgs int64 // 最大储存占用 MaxBytes int64 // 消息 2 种淘汰策略 // DiscardOld 消息达到限制后, 丢弃最早的消息 // DiscardNew 消息达到限制后, 信息消息新推送会失败 Discard DiscardPolicy // 消息存活时间 MaxAge time.Duration // 每个 subject 最大消息数量 MaxMsgsPerSubject int64 // 每个消息最大大小 MaxMsgSize int32 // 支持文件储存和内存储存 2 种类型 Storage StorageType // 消息分片数量 Replicas int // 不需要 ack NoAck bool // ... }
订阅-消费者
消费者关键配置
// jsm.go type ConsumerConfig struct { // 名称 Durable string `json:"durable_name,omitempty"` // 描述 Description string `json:"description,omitempty"` // 交付 Subject DeliverSubject string `json:"deliver_subject,omitempty"` // 交付 Group DeliverGroup string `json:"deliver_group,omitempty"` // 交付策略 // 交付所有 (默认), 交付最后一个, 交付最新, 自定义开始序号, 自定义开始时间 DeliverPolicy DeliverPolicy `json:"deliver_policy"` // 开始序号 OptStartSeq uint64 `json:"opt_start_seq,omitempty"` // 开始时间 OptStartTime *time.Time `json:"opt_start_time,omitempty"` // ack 策略 // 不需要ack (默认), 隐式ack All , 每个都需要显示ack AckPolicy AckPolicy `json:"ack_policy"` // ack等待时间 AckWait time.Duration `json:"ack_wait,omitempty"` MaxDeliver int `json:"max_deliver,omitempty"` BackOff []time.Duration `json:"backoff,omitempty"` // 过滤的Subject FilterSubject string `json:"filter_subject,omitempty"` // 重试策略 // 尽快重试, ReplayOriginalPolicy 相同时间重试 ReplayPolicy ReplayPolicy `json:"replay_policy"` // 限速 RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec // 采样频率 SampleFrequency string `json:"sample_freq,omitempty"` // 最大等待数量 MaxWaiting int `json:"max_waiting,omitempty"` //最大Pending ack数量 MaxAckPending int `json:"max_ack_pending,omitempty"` // flow 控制 FlowControl bool `json:"flow_control,omitempty"` // 心跳时间 Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` // ... }
常见文件讲解
nats如何保证消息不重复
nats核心功能 只提供交付一次的消息质量,要向保证发送消息不重复只能在消息发送端做唯一限制。
nats-jetstream 提供恰好一次和至少一次的消息质量,对于发布方,它依赖于发布应用程序在消息头中附加唯一的消息或发布 ID,并依赖于服务器在可配置的滚动时间段内跟踪这些 ID,以便检测发布者是否两次发布同一条消息。对于订阅放可以加必要的外在设计增加接收消息不重复,例如唯一主键等。
如何保证消息不丢失
订阅方订阅每条消息需要主动向nats服务回复ack确认,否则nats则认为丢失,进行消息重播。进而实现可靠交付。
nats服务提供持久的消息保留策略。
发布方每发布一条消息都会收到nats的ack回复。
这保证了消息不丢失。
顺序消费问题
单个分区可以实现顺序消费,同步消费(重要消息放到单个分区)
nats也提供队列消费,Group的概念。进而提供负载均衡。
实战
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/113794.html



