内容简介:Peach是基于Github地址: https://github.com/xuanye/Peach
Peach是基于 DotNetty 的Socket网络通讯帮助类库,可以帮助开发者简化使用DotNetty,关于DotNetty可参考我之前的这篇文章。 Peach内置实现了一个基于文本协议的CommandLineProtocol,下面的实例以这个协议展开,最后以 DotBPE 中Amp协议来介绍下如何扩展自定义协议。
Github地址: https://github.com/xuanye/Peach
QuickStart 使用
添加引用
dotnet nuget add Peach
要使用Peach编写网络程序,一般只需要三个步骤
- 实现协议传输消息IMessage
- 实现协议打包和解包逻辑IProtocol
- 实现ISocketService完成服务端逻辑编写 <!-- more --> 在快速开始的实例中,我们使用内置的CommandLineProtocol,所以省去了步骤1,2让我们开始吧!
1 服务端
1.1 实现MyService
可分别重写
OnConnected OnDisConnected OnReceive OnException
public class MyService : Peach.AbsSocketService<Peach.Messaging.CommandLineMessage> { private readonly ILogger<MyService> _logger; public MyService(ILogger<MyService> logger) { _logger = logger; } public override void OnConnected(ISocketContext<CommandLineMessage> context) { _logger.LogInformation("client connected from {0}", context.RemoteEndPoint); base.OnConnected(context); } public override void OnDisconnected(ISocketContext<CommandLineMessage> context) { _logger.LogInformation("client disconnected from {0}", context.RemoteEndPoint); base.OnDisconnected(context); } public override void OnException(ISocketContext<CommandLineMessage> context, Exception ex) { _logger.LogError(ex,"client from {0}, occ error {1}", context.RemoteEndPoint,ex.Message); base.OnException(context, ex); } public override void OnReceive(ISocketContext<CommandLineMessage> context, CommandLineMessage msg) { string replyMessage = string.Empty; string replyCmd = string.Empty; switch (msg.Command) { case "echo": replyMessage = msg.Parameters[0]; replyCmd = "echo"; break; case "init": replyMessage = "ok"; replyCmd = "init_reply"; break; default: replyMessage = "error unknow command"; break; } Task.Run(async () => { await context.SendAsync(new CommandLineMessage(replyCmd, replyMessage)); }); } }
2. 挂载服务
服务默认挂载在5566端口
static void Main(string[] args) { var builder = new HostBuilder() .ConfigureServices((context,services) => { //协议 services.AddSingleton<IProtocol<CommandLineMessage>, CommandLineProtocol>(); //挂载服务逻辑 services.AddSingleton<ISocketService<CommandLineMessage>, MyService>(); //添加挂载的宿主服务 services.AddTcpServer<CommandLineMessage>(); }) .ConfigureLogging( logger => { logger.AddConsole(); } ); builder.RunConsoleAsync().Wait(); }
2. 客户端
2.1 使用内置的TcpClient
监听接收消息和链接的消息,如下所示:
TcpClient<CommandLineMessage> client = new TcpClient<CommandLineMessage>(new CommandLineProtocol()); client.OnReceived += Client_OnReceived; client.OnConnected += Client_OnConnected; Task.Run(async () => { //连接服务器 var socketContext = await client.ConnectAsync(new IPEndPoint(Hey.IPUtility.GetLocalIntranetIP(), 5566)); //发送消息 var initCmd = new Hey.Messaging.CommandLineMessage("init"); await socketContext.SendAsync(initCmd); }).Wait();
可用的事件:
OnReceived OnError OnConnected OnDisconnected OnIdleState
3. 自定义协议
Peach支持使用自定义协议,扩展协议需要自行实现两个接口:
3.1. IMessage 接口
实现类具体实现通讯消息的内容载体,只需实现如何获取消息长度的属性
public interface IMessage { int Length { get; } }
3.2. IProtocol 接口
实现类需要描述消息头信息和具体打包解包逻辑,头信息描述参见 ProtocolMeta
字段描述
/// <summary> /// 协议接口 /// </summary> /// <typeparam name="TMessage"></typeparam> public interface IProtocol<TMessage> where TMessage : Messaging.IMessage { ProtocolMeta GetProtocolMeta(); TMessage Parse(Buffer.IBufferReader reader); void Pack(Buffer.IBufferWriter writer, TMessage message); }
3.3 Amp协议
为了更好让读者理解自定义协议的操作,这里以DotBPE中的Amp协议为例,来具体讲解一下,先来看下Amp协议的说明:
0 1 2 3 4 5 6 7 8 9 10 11 12 13 1415 16171819 20 <length>-21 +------------+----------+---------+------+-------------+---------+---------+--------+------------+ | <ver/argc> | <length> | <seq> |<type>| <serviceId> | <msgId> | <code> | <codec>| <data> | +------------+----------+---------+------+-------------+---------+---------+--------+------------+
Amp协议固定包头上21个字节,说明如下:
- ver/argc = 版本 固定填1
- length = 为总包长
- seq = 请求序列号
- type = 消息类型
- 1 = Request 请求消息
- 2 = Response 响应消息
- 3 = Notify 通知消息
- 4 = OneWayRequest 调用不关心返回值
- serId = serviceId 服务号
- msgId = msgId 消息ID
- code = 当 type = 0 (请求时)固定传0 ,其他即为响应码,如果响应码不为0 则认为请求失败,具体错误码再定义
- codecType = 编码方式 0=默认 Protobuf 1=MessagePack 2=JSON
- data = 实际的业务数据
3.3.1 AmpMessage实现
为了避免干扰因素,这里的代码去除了一些,辅助行的字段和方法,AmpMessage其实是主要用于描述头信息的,并且包含body的buffer数据 Data
字段,并实现获取消息体Length的方法(用于发送消息时,计算缓冲区)
public class AmpMessage : Peach.Messaging.IMessage { /// <summary> /// 第一个版本为18个字节头固定长度 /// </summary> public const int VERSION_0_HEAD_LENGTH = 18; /// <summary> /// 现有版本21个字节头固定长度 /// </summary> public const int VERSION_1_HEAD_LENGTH = 21; /// <summary> /// 状态码 /// </summary> public int Code { get; set; } //0 默认为Protobuf 1 MessagePack 2 = JSON public CodecType CodecType { get; set; } /// <summary> /// 实际的请求数据 /// </summary> public byte[] Data { get; set; } public int Length { get { var hl = Version == 0 ? VERSION_0_HEAD_LENGTH : VERSION_1_HEAD_LENGTH; if(Data == null) { return hl; } return hl + this.Data.Length; } } /// <summary> /// 消息标识 /// </summary> public string Id => $"{ServiceId}|{MessageId}|{Sequence}"; /// <summary> /// 调用服务的唯一消息号 确定哪个方法 /// </summary> public ushort MessageId { get; set; } /// <summary> /// 请求的序列号 /// </summary> public int Sequence { get; set; } /// <summary> /// 调用服务的唯一服务号 确定哪个服务 /// </summary> public int ServiceId { get; set; } /// <summary> /// 协议版本0/1 /// </summary> public byte Version { get; set; } public InvokeMessageType InvokeMessageType { get; set; } } public enum InvokeMessageType : byte { Request = 1, Response = 2, Notify = 3, OnewayRequest=4 //请求且不等待回复 }
3.3.2 AmpProtocol的实现
AmpProtocol中的实现主要是对ProtocolMeta描述,代码中已有详细注释,至于打包和解包,就是根据协议Write或者Read对应的数据类型即可
/// <summary> /// Amp Protocol /// </summary> public class AmpProtocol : IProtocol<AmpMessage> { private readonly ISerializer _serializer; public AmpProtocol(ISerializer serializer) { this._serializer = serializer; } static readonly ProtocolMeta AMP_PROTOCOL_META = new ProtocolMeta { InitialBytesToStrip = 0, //读取时需要跳过的字节数 LengthAdjustment = -5, //包实际长度的纠正,如果包长包括包头和包体,则要减去Length之前的部分 LengthFieldLength = 4, //长度字段的字节数 整型为4个字节 LengthFieldOffset = 1, //长度属性的起始(偏移)位 MaxFrameLength = int.MaxValue, //最大的数据包字节数 HeartbeatInterval = 30 * 1000 // 30秒没消息发一个心跳包 }; public ProtocolMeta GetProtocolMeta() { return AMP_PROTOCOL_META; } public void Pack(IBufferWriter writer, AmpMessage message) { writer.WriteByte(message.Version); writer.WriteInt(message.Length); writer.WriteInt(message.Sequence); writer.WriteByte((byte)message.InvokeMessageType); if (message.Version == 0) { writer.WriteUShort((ushort)message.ServiceId); } else { writer.WriteInt(message.ServiceId); } writer.WriteUShort(message.MessageId); writer.WriteInt(message.Code); if(message.Version == 1) { writer.WriteByte(_serializer.CodecType); } if (message.Data != null) { writer.WriteBytes(message.Data); } } public AmpMessage Parse(IBufferReader reader) { if (reader.ReadableBytes == 0) { return null; } var msg = new AmpMessage {Version = reader.ReadByte()}; int headLength; if (msg.Version == 0 ) { headLength = AmpMessage.VERSION_0_HEAD_LENGTH; if (reader.ReadableBytes < AmpMessage.VERSION_0_HEAD_LENGTH - 1) { throw new RpcCodecException($"decode error ,ReadableBytes={reader.ReadableBytes+1},HEAD_LENGTH={AmpMessage.VERSION_0_HEAD_LENGTH}"); } } else if (msg.Version == 1 ) { headLength = AmpMessage.VERSION_1_HEAD_LENGTH; if (reader.ReadableBytes < AmpMessage.VERSION_1_HEAD_LENGTH - 1) { throw new RpcCodecException($"decode error ,ReadableBytes={reader.ReadableBytes+1},HEAD_LENGTH={AmpMessage.VERSION_1_HEAD_LENGTH}"); } } else { throw new RpcCodecException($"decode error ,{msg.Version} is not support"); } var length = reader.ReadInt(); msg.Sequence = reader.ReadInt(); var type = reader.ReadByte(); msg.InvokeMessageType = (InvokeMessageType)Enum.ToObject(typeof(InvokeMessageType), type); msg.ServiceId = msg.Version == 0 ? reader.ReadUShort() : reader.ReadInt(); msg.MessageId = reader.ReadUShort(); msg.Code = reader.ReadInt(); if (msg.Version == 1) { byte codeType = reader.ReadByte(); if (codeType != this._serializer.CodecType) { throw new RpcCodecException($"CodecType:{codeType} is not Match {this._serializer.CodecType}"); } msg.CodecType = (CodecType)Enum.ToObject(typeof(CodecType), codeType); } else { msg.CodecType = CodecType.Protobuf; } int left = length - headLength; if (left > 0) { if (left > reader.ReadableBytes) { throw new RpcCodecException("message not long enough!"); } msg.Data = new byte[left]; reader.ReadBytes(msg.Data); } return msg; } } }
4. 后记
Peach的产生主要是源于对DotBPE的重构,因为在其他项目中有关于通讯的其他需求,所以这块虽然比较简单,也可比较独立,所以单独开了一个库来实现对DotNetty的封装。另外欢迎各位dotnet core的同学一起学习交流 QQ群号:699044833
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 使用BeetleX构建基础的SSL网络通讯
- t-io 3.2.5 发布,让天下没有难开发的网络通讯
- 中兴通讯中标中国移动5G SA核心网商用网络
- 中兴通讯实力打造适配现在与未来网络的IPv6基石
- 中兴通讯获6项SDN/NFV大奖背后:网络转型走入“深水期”
- 全球十余家运营商遭黑客入侵,必要时可切断通讯网络
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Black Box Society
Frank Pasquale / Harvard University Press / 2015-1-5 / USD 35.00
Every day, corporations are connecting the dots about our personal behavior—silently scrutinizing clues left behind by our work habits and Internet use. The data compiled and portraits created are inc......一起来看看 《The Black Box Society》 这本书的介绍吧!