最详细的nats讲解-部署加应用

最详细的nats讲解-部署加应用nats 讲解 nats 部署 nats 学习 消息队列

大家好,欢迎来到IT知识分享网。

什么是Nats

NATS:NATS是一个开源的、分布式的、高性能的消息传递系统,用于构建分布式和微服务应用程序。它提供了一种简单、可靠、高效的方式来传递信息,具有良好的可扩展性和容错性。

使用Go语言重写,能够达到每秒8-11百万个消息,整个程序很小只有3M。

NATS适合云基础设施的消息通信系统、IoT设备消息通信和微服务架构。

类似于 kafka等中间件,发布-订阅模型的设计。

最详细的nats讲解-部署加应用

常应用于:物联网消息传递,云原生和微服务。

支持的协议:

  1. TCP
  2. MQTT
  3. Websocket

Nats默认端口

  1. 监听客户端连接端口:4222
  2. 提供监控服务端http端口:8222
  3. 集群连接端口:6222

NATS官方学习文档

NATS.io – Cloud Native, Open Source, High-performance Messaging

nats下载地址

官网:https://nats.io/download/

github:

备注  不管是core-nats  或者nats-jetstream都是用这个nats-server执行程序,通过更改配置文件或者启动命令来开启nats-jetstream

部署

core-nats单节点部署和集群部署

单机部署

启动配置文件nats.conf

#nats.conf # 端口Client port of 4222 on all interfaces port: 4222 # http 可以用客户端工具连接nats 获取数据HTTP monitoring port http_port: 8222 authorization { user: nats password: timeout: 10 } 

启动命令

运行  ./nats-server -c nats.conf 

core-nats集群部署(三节点)

节点1

# node1 - nats.conf listen: 0.0.0.0:4222 # HTTP监控端口 http_port: 8222 # 客户端连接的认证信息 authorization { user: node1_user password: node1_pass timeout: 1 } # 集群定义 cluster { name: "nats-cluster" listen: 0.0.0.0:6222 routes = [ nats-route://route2_user:route2_pass@node2:6222, nats-route://route3_user:route3_pass@node3:6222 ] # 路由连接的认证信息 authorization { user: route1_user password: route1_pass timeout: 1 } } # 日志选项 log_file: "./nats.log" # PID进程文件 pid_file: "./nats.pid" 

节点2

# node2 - nats.conf listen: 0.0.0.0:4222 # HTTP监控端口 http_port: 8222 # 客户端连接的认证信息 authorization { user: node2_user password: node2_pass timeout: 1 } # 集群定义 cluster { name: "nats-cluster" listen: 0.0.0.0:6222 routes = [ nats-route://route1_user:route1_pass@node1:6222, nats-route://route3_user:route3_pass@node3:6222 ] # 路由连接的认证信息 authorization { user: route2_user password: route2_pass timeout: 1 } } # 日志选项 log_file: "./nats.log" # PID进程文件 pid_file: "./nats.pid" 

节点3

# node3 - nats.conf listen: 0.0.0.0:4222 # HTTP监控端口 http_port: 8222 # 客户端连接的认证信息 authorization { user: node3_user password: node3_pass timeout: 1 } # 集群定义 cluster { name: "nats-cluster" listen: 0.0.0.0:6222 routes = [ nats-route://route1_user:route1_pass@node1:6222, nats-route://route2_user:route2_pass@node2:6222 ] # 路由连接的认证信息 authorization { user: route3_user password: route3_pass timeout: 1 } } # 日志选项 log_file: "./nats.log" # PID进程文件 pid_file: "./nats.pid" 

启动命令

每个节点  运行  nats-server -c nats.conf
 

nats-jetstream单机部署和集群部署

单机部署

#server_name=n1-c1 # Client port of 4222 on all interfaces port: 4222 # HTTP monitoring port http_port: 8222 jetstream { store_dir=/usr/local/nats/storage enabled: true max_mem_store: 100MB max_file_store: 500MB } # This is for clustering multiple servers together. #cluster { # It is recommended to set a cluster name # name: "my_cluster" # Route connections to be received on any interface on port 6222 # port: 6222 # Routes are protected, so need to use them with --routes flag # e.g. --routes=nats-route://ruser:T0pS3cr3t@otherdockerhost:6222 # authorization { # user: ruser # password: T0pS3cr3t # timeout: 2 # } # Routes are actively solicited and connected to from this server. # This Docker image has none by default, but you can pass a # flag to the nats-server docker image to create one to an existing server. # routes = [] #} 

集群部署

节点1

server_name=n1-c1 listen=4222 accounts { $SYS { users = [ { user: "admin", pass: "$2a$11$DRh4C0KNbNnD8K/hb/buWe1zPxEHrLEiDmuq1Mi0rRJiH/W25Qidm" } ] } } jetstream { store_dir=/nats/storage } cluster { name: C1 listen: 0.0.0.0:6222 routes: [ nats://host_b:6222 nats://host_c:6222 ] }

节点2

server_name=n2-c1 listen=4222 accounts { $SYS { users = [ { user: "admin", pass: "$2a$11$DRh4C0KNbNnD8K/hb/buWe1zPxEHrLEiDmuq1Mi0rRJiH/W25Qidm" } ] } } jetstream { store_dir=/nats/storage } cluster { name: C1 listen: 0.0.0.0:6222 routes: [ nats://host_a:6222 nats://host_c:6222 ] }

节点3

server_name=n3-c1 listen=4222 accounts { $SYS { users = [ { user: "admin", pass: "$2a$11$DRh4C0KNbNnD8K/hb/buWe1zPxEHrLEiDmuq1Mi0rRJiH/W25Qidm" } ] } } jetstream { store_dir=/nats/storage } cluster { name: C1 listen: 0.0.0.0:6222 routes: [ nats://host_a:6222 nats://host_b:6222 ] }

Nats优点

Nats产品分类和产品选择

产品介绍

  • Core-Nats核心   
  • Nats-JetStream

两者主要区别:

提供的消息质量:

  1. Nats核心只提供最多交付一次的消息质量。
  2. JetStream提供至少一次/恰好一次 QoS

消息存储:

  1. NATS核心不提供消息持久化,
  2. JetStream提供消息存储和过期策略和删除策略等。

Nats使用和配置文件讲解

NATS主题-讲解

格式

用”.”来分隔主题的层次

time.us time.us.east time.us.east.atlanta time.eu.east time.eu.east.warsaw

通配符讲解

nats中通配符有两种:”*”和”>”.

* 通配符用来匹配单个标记,例如:time.*.east 可以监听到这将匹配time.us.easttime.eu.east。不能匹配time.eu.us.east.

最详细的nats讲解-部署加应用

>通配符它将匹配一个或多个标记

例如,time.us.>将匹配time.us.easttime.us.east.atlanta,而time.us.*只会匹配,time.us.east因为它不能匹配多个标记。

最详细的nats讲解-部署加应用

混合通配符

通配符*可以在同一主题中出现多次。两种类型都可以使用。例如,*.*.east.>将收到time.us.east.atlanta

Nats如何解决消息队列中常见问题

订阅

订阅和kafka等常见的消息队列类似,值得强调的是nats中也有类似于kafka中的消费者组的概念

-不同的客户端用相同的消费者组去订阅同一个主图,服务端保证每个消息只被消费者组中的一个消费者消费。(提供的内置负载平衡功能)

  • 确保应用程序容错

  • 工作负载处理可以扩大或缩小
  • 扩大或缩小消费者规模,无需重复发送消息
  • 无需额外配置
  • 队列组由应用程序及其队列订阅者定义,而不是由服务器配置定义

普通订阅 core-nats产品举例

nc, err := nats.Connect("demo.nats.io") if err != nil { log.Fatal(err) } defer nc.Close() // Use a WaitGroup to wait for a message to arrive wg := sync.WaitGroup{} wg.Add(1) // 异步订阅 if _, err := nc.Subscribe("updates", func(m *nats.Msg) { wg.Done() }); err != nil { log.Fatal(err) } //同步订阅 保证消息顺序 // sub, err := nc.SubscribeSync("updates") // Wait for a message to come in wg.Wait()

消费者组订阅实例代码:core-nats产品举例

package main import "github.com/nats-io/nats.go" func main() { connect, err := nats.Connect("") if err != nil { } connect.QueueSubscribeSync() //同步保证消息顺序 connect.QueueSubscribe() //异步不保证顺序 connect.QueueSubscribeSyncWithChan() //同步消息顺序发送到管道里接收 connect.ChanQueueSubscribe() //异步不保证顺序 } 

消费者组订阅实例代码:nats-jetStream产品举例(有个Pull 拉去消息的概念)

最详细的nats讲解-部署加应用

import ( "github.com/nats-io/nats.go" "log" "time" ) func main() { nc, err := nats.Connect("") if err != nil { } js, err := nc.JetStream() if err != nil { log.Fatal(err) } js.QueueSubscribeSync() //顺序 js.QueueSubscribe() js.ChanQueueSubscribe() // 消息发布到指定管道 }

注意 JetStream这里订阅的有个常用的:通过消费者主动去服务端拉去消息的(拉取消费者在 NATS 服务器上产生的 CPU 负载较少,因此扩展性更好)  PullSubscribe

func ExampleJetStream() { nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Use the JetStream context to produce and consumer messages // that have been persisted. js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) if err != nil { log.Fatal(err) } js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) js.Publish("foo", []byte("Hello JS!")) // Publish messages asynchronously. for i := 0; i < 500; i++ { js.PublishAsync("foo", []byte("Hello JS Async!")) } select { case <-js.PublishAsyncComplete(): case <-time.After(5 * time.Second): fmt.Println("Did not resolve in time") } // Create Pull based consumer with maximum 128 inflight. sub, _ := js.PullSubscribe("foo", "wq", nats.PullMaxWaiting(128)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for { select { case <-ctx.Done(): return default: } // Fetch will return as soon as any message is available rather than wait until the full batch size is available, using a batch size of more than 1 allows for higher throughput when needed. msgs, _ := sub.Fetch(10, nats.Context(ctx)) for _, msg := range msgs { msg.Ack() } } }

发布

core-nats产品简单发(核心版本的nats不保证可靠性交付:发送一次就不管了,适用于可容忍消息丢失的情况)

nc, err := nats.Connect("demo.nats.io", nats.Name("API PublishBytes Example")) if err != nil { log.Fatal(err) } defer nc.Close() if err := nc.Publish("updates", []byte("All is Well")); err != nil { log.Fatal(err) }

关于nats-jetstream的发布需要先穿就“流”,放到下面Jetstream中进行讲解

Nats-JetStream

概念

NATS 有一个内置的持久性引擎,称为JetStream,它允许存储消息并在以后重播。与需要您拥有有效订阅才能在消息发生时处理消息的NATS Core不同,JetStream 允许 NATS 服务器捕获消息并根据需要将其重播给消费者。此功能为您的 NATS 消息提供了不同的服务质量,并实现了容错和高可用性配置。

流创建及配置

用代码简单介绍一下流的创建个关键信息配置

package main import ( "github.com/nats-io/nats.go" "log" "time" ) func main() { nc, err := nats.Connect("") if err != nil { } js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) if err != nil { log.Fatal(err) } _, err = js.AddStream(&nats.StreamConfig{ Name: "device", //流名称 Subjects: []string{"device.*"}, //需要监视并且存储消息的主题名 Storage: nats.FileStorage, //消息存储方式 文件或者内存 Retention: nats.LimitsPolicy, //保留策略 Discard: nats.DiscardOld, //丢弃策略 最老的消息丢弃或者最新的 MaxAge: 48 * time.Hour, //存储消息的最大时间 Duplicates: time.Hour * 24, // 恰好一次消息的检测时间段 在消息头里如果加了消息id,那么24小时内存在相同的id就认为是同一个消息 }) } //这样就创建了一个名称为device的流

核心配置讲解

// jsm.go // StreamConfig 用于定义一个流, 大多数参数都有合理的默认值 // 如果 subject 没写, 就会分配一个随机的 subject type StreamConfig struct { // 名称 Name string // 描述 Description string // 对应的多个 Subject Subjects []string // 消息 3 种保留策略 // RetentionPolicy 最大消息数, 最大存储空间或者最大存活时间达到限制, 就可以删除消息 // InterestPolicy 需要所有 consumer 确认可以删除消息 // WorkQueuePolicy 只需要一个 consumer 确认可以删除消息 Retention RetentionPolicy // 最大 Consumer 数量 MaxConsumers int // 最大存储 Mgs 数量 MaxMsgs int64 // 最大储存占用 MaxBytes int64 // 消息 2 种淘汰策略 // DiscardOld 消息达到限制后, 丢弃最早的消息 // DiscardNew 消息达到限制后, 信息消息新推送会失败 Discard DiscardPolicy // 消息存活时间 MaxAge time.Duration // 每个 subject 最大消息数量 MaxMsgsPerSubject int64 // 每个消息最大大小 MaxMsgSize int32 // 支持文件储存和内存储存 2 种类型 Storage StorageType // 消息分片数量 Replicas int // 不需要 ack NoAck bool // ... }

订阅-消费者

消费者关键配置

// jsm.go type ConsumerConfig struct { // 名称 Durable string `json:"durable_name,omitempty"` // 描述 Description string `json:"description,omitempty"` // 交付 Subject DeliverSubject string `json:"deliver_subject,omitempty"` // 交付 Group DeliverGroup string `json:"deliver_group,omitempty"` // 交付策略 // 交付所有 (默认), 交付最后一个, 交付最新, 自定义开始序号, 自定义开始时间 DeliverPolicy DeliverPolicy `json:"deliver_policy"` // 开始序号 OptStartSeq uint64 `json:"opt_start_seq,omitempty"` // 开始时间 OptStartTime *time.Time `json:"opt_start_time,omitempty"` // ack 策略 // 不需要ack (默认), 隐式ack All , 每个都需要显示ack AckPolicy AckPolicy `json:"ack_policy"` // ack等待时间 AckWait time.Duration `json:"ack_wait,omitempty"` MaxDeliver int `json:"max_deliver,omitempty"` BackOff []time.Duration `json:"backoff,omitempty"` // 过滤的Subject FilterSubject string `json:"filter_subject,omitempty"` // 重试策略 // 尽快重试, ReplayOriginalPolicy 相同时间重试 ReplayPolicy ReplayPolicy `json:"replay_policy"` // 限速 RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec // 采样频率 SampleFrequency string `json:"sample_freq,omitempty"` // 最大等待数量 MaxWaiting int `json:"max_waiting,omitempty"` //最大Pending ack数量 MaxAckPending int `json:"max_ack_pending,omitempty"` // flow 控制 FlowControl bool `json:"flow_control,omitempty"` // 心跳时间 Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` // ... }

常见文件讲解

nats如何保证消息不重复

nats核心功能 只提供交付一次的消息质量,要向保证发送消息不重复只能在消息发送端做唯一限制。

nats-jetstream 提供恰好一次和至少一次的消息质量,对于发布方,它依赖于发布应用程序在消息头中附加唯一的消息或发布 ID,并依赖于服务器在可配置的滚动时间段内跟踪这些 ID,以便检测发布者是否两次发布同一条消息。对于订阅放可以加必要的外在设计增加接收消息不重复,例如唯一主键等。

如何保证消息不丢失

订阅方订阅每条消息需要主动向nats服务回复ack确认,否则nats则认为丢失,进行消息重播。进而实现可靠交付。

nats服务提供持久的消息保留策略。

发布方每发布一条消息都会收到nats的ack回复。

这保证了消息不丢失。

顺序消费问题

单个分区可以实现顺序消费,同步消费(重要消息放到单个分区)

nats也提供队列消费,Group的概念。进而提供负载均衡。

实战

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/113794.html

(0)
上一篇 2025-12-12 18:33
下一篇 2025-12-12 19:00

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信