基于DotNetty实现一个接口自动发布工具 – 通信实现

基于DotNetty实现一个接口自动发布工具 – 通信实现类库项目 是通信相关基础设施层 Codec 模块实现编码解码 Convention 模块定义约定 比如抽象的业务 Handler 消息载体 消息上下文 NettyContext 等消息类为 封装了消息头

大家好,欢迎来到IT知识分享网。

基于 DotNetty 实现通信

DotNetty : 是微软的 Azure 团队,使用 C#实现的 Netty 的版本发布。是.NET 平台的优秀网络库。

项目介绍

OpenDeploy.Communication 类库项目,是通信相关基础设施层

image

  • Codec 模块实现编码解码
  • Convention 模块定义约定,比如抽象的业务 Handler, 消息载体 NettyMessage, 消息上下文 ‘NettyContext’ 等

自定义消息格式

消息类为 NettyMessage ,封装了消息头 NettyHeader 和消息体 Body

image

NettyMessage

封装了消息头 NettyHeader 和消息体 Body

NettyMessage 点击查看代码

/// <summary> Netty消息 </summary> public class NettyMessage { /// <summary> 消息头 </summary> public NettyHeader Header { get; init; } = default!; /// <summary> 消息体(可空,可根据具体业务而定) </summary> public byte[]? Body { get; init; } /// <summary> 消息头转为字节数组 </summary> public byte[] GetHeaderBytes() { var headerString = Header.ToString(); return Encoding.UTF8.GetBytes(headerString); } /// <summary> 是否同步消息 </summary> public bool IsSync() => Header.Sync; /// <summary> 创建Netty消息工厂方法 </summary> public static NettyMessage Create(string endpoint, bool sync = false, byte[]? body = null) { return new NettyMessage { Header = new NettyHeader { EndPoint = endpoint, Sync = sync }, Body = body }; } /// <summary> 序列化为JSON字符串 </summary> public override string ToString() => Header.ToString(); }

NettyHeader

消息头,包含请求唯一标识,是否同步消息,终结点等, 在传输数据时会序列化为 JSON

NettyHeader 点击查看代码

/// <summary> Netty消息头 </summary> public class NettyHeader { /// <summary> 请求消息唯一标识 </summary> public Guid RequestId { get; init; } = Guid.NewGuid(); /// <summary> 是否同步消息, 默认false是异步消息 </summary> public bool Sync { get; init; } /// <summary> 终结点 (借鉴MVC,约定为Control/Action模式) </summary> public string EndPoint { get; init; } = string.Empty; /// <summary> 序列化为JSON字符串 </summary> public override string ToString() => this.ToJsonString(); }


  • 请求消息唯一标识 RequestId , 用来唯一标识消息, 主要用于 发送同步请求, 因为默认的消息是异步的,只管发出去,不需要等待响应
  • 是否同步消息 Sync , 可以不需要,主要为了可视化,便于调试
  • 终结点 EndPoint , (借鉴 MVC,约定为 Control/Action 模式), 服务端直接解析出对应的处理器

编码器

DefaultEncoder 点击查看代码

public class DefaultEncoder : MessageToByteEncoder<NettyMessage> { protected override void Encode(IChannelHandlerContext context, NettyMessage message, IByteBuffer output) { //消息头转为字节数组 var headerBytes = message.GetHeaderBytes(); //写入消息头长度 output.WriteInt(headerBytes.Length); //写入消息头字节数组 output.WriteBytes(headerBytes); //写入消息体字节数组 if (message.Body != null && message.Body.Length > 0) { output.WriteBytes(message.Body); } } }

解码器

DefaultDecoder 点击查看代码

public class DefaultDecoder : MessageToMessageDecoder<IByteBuffer> { protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) { //消息总长度 var totalLength = input.ReadableBytes; //消息头长度 var headerLength = input.GetInt(input.ReaderIndex); //消息体长度 var bodyLength = totalLength - 4 - headerLength; //读取消息头字节数组 var headerBytes = new byte[headerLength]; input.GetBytes(input.ReaderIndex + 4, headerBytes, 0, headerLength); byte[]? bodyBytes = null; string? rawHeaderString = null; NettyHeader? header; try { //把消息头字节数组,反序列化为JSON rawHeaderString = Encoding.UTF8.GetString(headerBytes); header = JsonSerializer.Deserialize<NettyHeader>(rawHeaderString); } catch (Exception ex) { Logger.Error($"解码失败: {rawHeaderString}, {ex}"); return; } if (header is null) { Logger.Error($"解码失败: {rawHeaderString}"); return; } //读取消息体字节数组 if (bodyLength > 0) { bodyBytes = new byte[bodyLength]; input.GetBytes(input.ReaderIndex + 4 + headerLength, bodyBytes, 0, bodyLength); } //封装为NettyMessage对象 var message = new NettyMessage { Header = header, Body = bodyBytes, }; output.Add(message); } }

NettyServer 实现

NettyServer 点击查看代码

public static class NettyServer { /// <summary> /// 开启Netty服务 /// </summary> public static async Task RunAsync(int port = 20007) { var bossGroup = new MultithreadEventLoopGroup(1); var workerGroup = new MultithreadEventLoopGroup(); try { var bootstrap = new ServerBootstrap().Group(bossGroup, workerGroup); bootstrap .Channel<TcpServerSocketChannel>() .Option(ChannelOption.SoBacklog, 100) .Option(ChannelOption.SoReuseaddr, true) .Option(ChannelOption.SoReuseport, true) .ChildHandler(new ActionChannelInitializer<IChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast("framing-enc", new LengthFieldPrepender(4)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast("decoder", new DefaultDecoder()); pipeline.AddLast("encoder", new DefaultEncoder()); pipeline.AddLast("handler", new ServerMessageEntry()); })); var boundChannel = await bootstrap.BindAsync(port); Logger.Info($"NettyServer启动成功...{boundChannel}"); Console.ReadLine(); await boundChannel.CloseAsync(); Logger.Info($"NettyServer关闭监听了...{boundChannel}"); } finally { await Task.WhenAll( bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)) ); Logger.Info($"NettyServer退出了..."); } } }

  • 服务端管道最后我们添加了 ServerMessageEntry ,作为消息处理的入口

ServerMessageEntry 点击查看代码

public class ServerMessageEntry : ChannelHandlerAdapter { /// <summary> Netty处理器选择器 </summary> private readonly DefaultNettyHandlerSelector handlerSelector = new(); public ServerMessageEntry() { //注册Netty处理器 handlerSelector.RegisterHandlerTypes(typeof(EchoHandler).Assembly.GetTypes()); } /// <summary> 通道激活 </summary> public override void ChannelActive(IChannelHandlerContext context) { Logger.Warn($"ChannelActive: {context.Channel}"); } /// <summary> 通道关闭 </summary> public override void ChannelInactive(IChannelHandlerContext context) { Logger.Warn($"ChannelInactive: {context.Channel}"); } /// <summary> 收到客户端的消息 </summary> public override async void ChannelRead(IChannelHandlerContext context, object message) { if (message is not NettyMessage nettyMessage) { Logger.Error("从客户端接收消息为空"); return; } try { Logger.Info($"收到客户端的消息: {nettyMessage}"); //封装请求 var nettyContext = new NettyContext(context.Channel, nettyMessage); //选择处理器 AbstractNettyHandler handler = handlerSelector.SelectHandler(nettyContext); //处理请求 await handler.ProcessAsync(); } catch(Exception ex) { Logger.Error($"ServerMessageEntry.ChannelRead: {ex}"); } } }

  • 按照约定, 把继承 AbstractNettyHandler 的类视为业务处理器
  • ServerMessageEntry 拿到消息后,首先把消息封装为 NettyContext, 类似与 MVC 中的 HttpContext, 封装了请求和响应对象, 内部解析请求的 EndPoint, 拆分为 HandlerNameActionName
  • DefaultNettyHandlerSelector 提供注册处理器的方法 RegisterHandlerTypes, 和选择处理器的方法 SelectHandler
  • SelectHandler, 默认规则是查找已注册的处理器中以 HandlerName 开头的类型
  • AbstractNettyHandler 的 ProcessAsync 方法,通过 ActionName, 反射拿到 MethodInfo, 调用终结点

NettyClient 实现

NettyClient 点击查看代码

public sealed class NettyClient(string serverHost, int serverPort) : IDisposable { public EndPoint ServerEndPoint { get; } = new IPEndPoint(IPAddress.Parse(serverHost), serverPort); private static readonly Bootstrap bootstrap = new(); private static readonly IEventLoopGroup eventLoopGroup = new SingleThreadEventLoop(); private bool _disposed; private IChannel? _channel; public bool IsConnected => _channel != null && _channel.Open; public bool IsWritable => _channel != null && _channel.IsWritable; static NettyClient() { bootstrap .Group(eventLoopGroup) .Channel<TcpSocketChannel>() .Option(ChannelOption.SoReuseaddr, true) .Option(ChannelOption.SoReuseport, true) .Handler(new ActionChannelInitializer<ISocketChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; //pipeline.AddLast("ping", new IdleStateHandler(0, 5, 0)); pipeline.AddLast("framing-enc", new LengthFieldPrepender(4)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast("decoder", new DefaultDecoder()); pipeline.AddLast("encoder", new DefaultEncoder()); pipeline.AddLast("handler", new ClientMessageEntry()); })); } /// <summary> 连接服务器 </summary> private async Task TryConnectAsync() { try { if (IsConnected) { return; } _channel = await bootstrap.ConnectAsync(ServerEndPoint); } catch (Exception ex) { throw new Exception($"连接服务器失败 : {ServerEndPoint} {ex.Message}"); } } /// <summary> /// 发送消息 /// </summary> /// <param name="endpoint">终结点</param> /// <param name="sync">是否同步等待响应</param> /// <param name="body">正文</param> public async Task SendAsync(string endpoint, bool sync = false, byte[]? body = null) { var message = NettyMessage.Create(endpoint, sync, body); if (sync) { var task = ClientMessageSynchronizer.TryAdd(message); try { await SendAsync(message); await task; } catch { ClientMessageSynchronizer.TryRemove(message); throw; } } else { await SendAsync(message); } } /// <summary> /// 发送消息 /// </summary> private async Task SendAsync(NettyMessage message) { await TryConnectAsync(); await _channel!.WriteAndFlushAsync(message); } /// <summary> 释放连接(程序员手动释放, 一般在代码使用using语句,或在finally里面Dispose) </summary> public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// <summary> 释放连接 </summary> private void Dispose(bool disposing) { if (_disposed) { return; } //释放托管资源,比如嵌套的对象 if (disposing) { } //释放非托管资源 if (_channel != null) { _channel.CloseAsync(); _channel = null; } _disposed = true; } ~NettyClient() { Dispose(true); } }

  • NettyClient 封装了 Netty 客户端逻辑,提供发送异步请求(默认)和发布同步请求方法
  • DotNetty 默认不提供同步请求,但是有些情况我们需要同步等待服务器的响应,所有需要自行实现,实现也很简单,把消息 ID 缓存起来,收到服务器响应后激活就行了,具体实现在消息同步器 ClientMessageSynchronizer, 就不贴了

总结

至此,我们实现了基于 DotNetty 搭建通信模块, 实现了客户端和服务器的编解码,处理器选择,客户端实现了同步消息等,大家可以在 ConsoleHost 结尾的控制台项目中,测试下同步和异步的消息,实现的简单的 Echo 模式

代码仓库

项目暂且就叫 OpenDeploy 吧

  • OpenDeploy: https://gitee.com/broadm-dotnet/OpenDeploy

欢迎大家拍砖,Star

下一步

计划下一步,基于WPF的客户端, 实现接口项目的配置与发现

文章转载自:Broadm

原文链接:https://www.cnblogs.com/broadm/p/17875559.html

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/118726.html

(0)
上一篇 2025-11-10 22:20
下一篇 2025-11-10 22:33

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信