大家好,欢迎来到IT知识分享网。
基于 DotNetty 实现通信
DotNetty : 是微软的 Azure 团队,使用 C#实现的 Netty 的版本发布。是.NET 平台的优秀网络库。
项目介绍
OpenDeploy.Communication 类库项目,是通信相关基础设施层
Codec模块实现编码解码
Convention模块定义约定,比如抽象的业务 Handler, 消息载体NettyMessage, 消息上下文 ‘NettyContext’ 等
自定义消息格式
消息类为 NettyMessage ,封装了消息头 NettyHeader 和消息体 Body
NettyMessage
封装了消息头
NettyHeader和消息体Body
NettyMessage 点击查看代码
/// <summary> Netty消息 </summary> public class NettyMessage { /// <summary> 消息头 </summary> public NettyHeader Header { get; init; } = default!; /// <summary> 消息体(可空,可根据具体业务而定) </summary> public byte[]? Body { get; init; } /// <summary> 消息头转为字节数组 </summary> public byte[] GetHeaderBytes() { var headerString = Header.ToString(); return Encoding.UTF8.GetBytes(headerString); } /// <summary> 是否同步消息 </summary> public bool IsSync() => Header.Sync; /// <summary> 创建Netty消息工厂方法 </summary> public static NettyMessage Create(string endpoint, bool sync = false, byte[]? body = null) { return new NettyMessage { Header = new NettyHeader { EndPoint = endpoint, Sync = sync }, Body = body }; } /// <summary> 序列化为JSON字符串 </summary> public override string ToString() => Header.ToString(); }
NettyHeader
消息头,包含请求唯一标识,是否同步消息,终结点等, 在传输数据时会序列化为 JSON
NettyHeader 点击查看代码
/// <summary> Netty消息头 </summary> public class NettyHeader { /// <summary> 请求消息唯一标识 </summary> public Guid RequestId { get; init; } = Guid.NewGuid(); /// <summary> 是否同步消息, 默认false是异步消息 </summary> public bool Sync { get; init; } /// <summary> 终结点 (借鉴MVC,约定为Control/Action模式) </summary> public string EndPoint { get; init; } = string.Empty; /// <summary> 序列化为JSON字符串 </summary> public override string ToString() => this.ToJsonString(); }
- 请求消息唯一标识
RequestId, 用来唯一标识消息, 主要用于 发送同步请求, 因为默认的消息是异步的,只管发出去,不需要等待响应
- 是否同步消息
Sync, 可以不需要,主要为了可视化,便于调试
- 终结点
EndPoint, (借鉴 MVC,约定为 Control/Action 模式), 服务端直接解析出对应的处理器
编码器
DefaultEncoder 点击查看代码
public class DefaultEncoder : MessageToByteEncoder<NettyMessage> { protected override void Encode(IChannelHandlerContext context, NettyMessage message, IByteBuffer output) { //消息头转为字节数组 var headerBytes = message.GetHeaderBytes(); //写入消息头长度 output.WriteInt(headerBytes.Length); //写入消息头字节数组 output.WriteBytes(headerBytes); //写入消息体字节数组 if (message.Body != null && message.Body.Length > 0) { output.WriteBytes(message.Body); } } }
解码器
DefaultDecoder 点击查看代码
public class DefaultDecoder : MessageToMessageDecoder<IByteBuffer> { protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) { //消息总长度 var totalLength = input.ReadableBytes; //消息头长度 var headerLength = input.GetInt(input.ReaderIndex); //消息体长度 var bodyLength = totalLength - 4 - headerLength; //读取消息头字节数组 var headerBytes = new byte[headerLength]; input.GetBytes(input.ReaderIndex + 4, headerBytes, 0, headerLength); byte[]? bodyBytes = null; string? rawHeaderString = null; NettyHeader? header; try { //把消息头字节数组,反序列化为JSON rawHeaderString = Encoding.UTF8.GetString(headerBytes); header = JsonSerializer.Deserialize<NettyHeader>(rawHeaderString); } catch (Exception ex) { Logger.Error($"解码失败: {rawHeaderString}, {ex}"); return; } if (header is null) { Logger.Error($"解码失败: {rawHeaderString}"); return; } //读取消息体字节数组 if (bodyLength > 0) { bodyBytes = new byte[bodyLength]; input.GetBytes(input.ReaderIndex + 4 + headerLength, bodyBytes, 0, bodyLength); } //封装为NettyMessage对象 var message = new NettyMessage { Header = header, Body = bodyBytes, }; output.Add(message); } }
NettyServer 实现
NettyServer 点击查看代码
public static class NettyServer { /// <summary> /// 开启Netty服务 /// </summary> public static async Task RunAsync(int port = 20007) { var bossGroup = new MultithreadEventLoopGroup(1); var workerGroup = new MultithreadEventLoopGroup(); try { var bootstrap = new ServerBootstrap().Group(bossGroup, workerGroup); bootstrap .Channel<TcpServerSocketChannel>() .Option(ChannelOption.SoBacklog, 100) .Option(ChannelOption.SoReuseaddr, true) .Option(ChannelOption.SoReuseport, true) .ChildHandler(new ActionChannelInitializer<IChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast("framing-enc", new LengthFieldPrepender(4)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast("decoder", new DefaultDecoder()); pipeline.AddLast("encoder", new DefaultEncoder()); pipeline.AddLast("handler", new ServerMessageEntry()); })); var boundChannel = await bootstrap.BindAsync(port); Logger.Info($"NettyServer启动成功...{boundChannel}"); Console.ReadLine(); await boundChannel.CloseAsync(); Logger.Info($"NettyServer关闭监听了...{boundChannel}"); } finally { await Task.WhenAll( bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)) ); Logger.Info($"NettyServer退出了..."); } } }
- 服务端管道最后我们添加了
ServerMessageEntry,作为消息处理的入口
ServerMessageEntry 点击查看代码
public class ServerMessageEntry : ChannelHandlerAdapter { /// <summary> Netty处理器选择器 </summary> private readonly DefaultNettyHandlerSelector handlerSelector = new(); public ServerMessageEntry() { //注册Netty处理器 handlerSelector.RegisterHandlerTypes(typeof(EchoHandler).Assembly.GetTypes()); } /// <summary> 通道激活 </summary> public override void ChannelActive(IChannelHandlerContext context) { Logger.Warn($"ChannelActive: {context.Channel}"); } /// <summary> 通道关闭 </summary> public override void ChannelInactive(IChannelHandlerContext context) { Logger.Warn($"ChannelInactive: {context.Channel}"); } /// <summary> 收到客户端的消息 </summary> public override async void ChannelRead(IChannelHandlerContext context, object message) { if (message is not NettyMessage nettyMessage) { Logger.Error("从客户端接收消息为空"); return; } try { Logger.Info($"收到客户端的消息: {nettyMessage}"); //封装请求 var nettyContext = new NettyContext(context.Channel, nettyMessage); //选择处理器 AbstractNettyHandler handler = handlerSelector.SelectHandler(nettyContext); //处理请求 await handler.ProcessAsync(); } catch(Exception ex) { Logger.Error($"ServerMessageEntry.ChannelRead: {ex}"); } } }
- 按照约定, 把继承
AbstractNettyHandler的类视为业务处理器
ServerMessageEntry拿到消息后,首先把消息封装为NettyContext, 类似与 MVC 中的 HttpContext, 封装了请求和响应对象, 内部解析请求的EndPoint, 拆分为HandlerName,ActionName
DefaultNettyHandlerSelector提供注册处理器的方法RegisterHandlerTypes, 和选择处理器的方法SelectHandler
SelectHandler, 默认规则是查找已注册的处理器中以HandlerName开头的类型
AbstractNettyHandler的ProcessAsync方法,通过ActionName, 反射拿到MethodInfo, 调用终结点
NettyClient 实现
NettyClient 点击查看代码
public sealed class NettyClient(string serverHost, int serverPort) : IDisposable { public EndPoint ServerEndPoint { get; } = new IPEndPoint(IPAddress.Parse(serverHost), serverPort); private static readonly Bootstrap bootstrap = new(); private static readonly IEventLoopGroup eventLoopGroup = new SingleThreadEventLoop(); private bool _disposed; private IChannel? _channel; public bool IsConnected => _channel != null && _channel.Open; public bool IsWritable => _channel != null && _channel.IsWritable; static NettyClient() { bootstrap .Group(eventLoopGroup) .Channel<TcpSocketChannel>() .Option(ChannelOption.SoReuseaddr, true) .Option(ChannelOption.SoReuseport, true) .Handler(new ActionChannelInitializer<ISocketChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; //pipeline.AddLast("ping", new IdleStateHandler(0, 5, 0)); pipeline.AddLast("framing-enc", new LengthFieldPrepender(4)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast("decoder", new DefaultDecoder()); pipeline.AddLast("encoder", new DefaultEncoder()); pipeline.AddLast("handler", new ClientMessageEntry()); })); } /// <summary> 连接服务器 </summary> private async Task TryConnectAsync() { try { if (IsConnected) { return; } _channel = await bootstrap.ConnectAsync(ServerEndPoint); } catch (Exception ex) { throw new Exception($"连接服务器失败 : {ServerEndPoint} {ex.Message}"); } } /// <summary> /// 发送消息 /// </summary> /// <param name="endpoint">终结点</param> /// <param name="sync">是否同步等待响应</param> /// <param name="body">正文</param> public async Task SendAsync(string endpoint, bool sync = false, byte[]? body = null) { var message = NettyMessage.Create(endpoint, sync, body); if (sync) { var task = ClientMessageSynchronizer.TryAdd(message); try { await SendAsync(message); await task; } catch { ClientMessageSynchronizer.TryRemove(message); throw; } } else { await SendAsync(message); } } /// <summary> /// 发送消息 /// </summary> private async Task SendAsync(NettyMessage message) { await TryConnectAsync(); await _channel!.WriteAndFlushAsync(message); } /// <summary> 释放连接(程序员手动释放, 一般在代码使用using语句,或在finally里面Dispose) </summary> public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// <summary> 释放连接 </summary> private void Dispose(bool disposing) { if (_disposed) { return; } //释放托管资源,比如嵌套的对象 if (disposing) { } //释放非托管资源 if (_channel != null) { _channel.CloseAsync(); _channel = null; } _disposed = true; } ~NettyClient() { Dispose(true); } }
NettyClient封装了 Netty 客户端逻辑,提供发送异步请求(默认)和发布同步请求方法
DotNetty默认不提供同步请求,但是有些情况我们需要同步等待服务器的响应,所有需要自行实现,实现也很简单,把消息 ID 缓存起来,收到服务器响应后激活就行了,具体实现在消息同步器ClientMessageSynchronizer, 就不贴了
总结
至此,我们实现了基于 DotNetty 搭建通信模块, 实现了客户端和服务器的编解码,处理器选择,客户端实现了同步消息等,大家可以在 ConsoleHost 结尾的控制台项目中,测试下同步和异步的消息,实现的简单的 Echo 模式
代码仓库
项目暂且就叫
OpenDeploy吧
- OpenDeploy: https://gitee.com/broadm-dotnet/OpenDeploy
欢迎大家拍砖,Star
下一步
计划下一步,基于WPF的客户端, 实现接口项目的配置与发现
文章转载自:Broadm
原文链接:https://www.cnblogs.com/broadm/p/17875559.html
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/118726.html

