Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging
Json-rpc没有定义消息如何传输,因此,Json-Rpc RpcRequest对象和RpcResponse对象需要一个传输载体,这里的传输对象主是TransportMessage,如下代码,这里的Content请求时为RcpRequest对象,答复时为RpcResponse对象,答复时Header一般情况下为空。
///
public class TransportMessage
{
public TransportMessage()
{
}
\[MethodImpl(MethodImplOptions.AggressiveInlining)\]
public TransportMessage(object content,object headers)
{
if (content == null)
throw new ArgumentNullException(nameof(content));
Content = content;
Headers = headers;
ContentType = content.GetType().FullName;
}
\[MethodImpl(MethodImplOptions.AggressiveInlining)\]
public TransportMessage(object content, object headers, string fullName)
{
if (content == null)
throw new ArgumentNullException(nameof(content));
Headers = headers;
Content = content;
ContentType = fullName;
}
/// <summary>
/// 消息Id。
/// </summary>
public string Id { get; set; }
/// <summary>
/// 消息内容。
/// </summary>
public object Content { get; set; }
/// <summary>
/// 消息传输Header
/// </summary>
public object Headers { get; set; }
/// <summary>
/// 内容类型。
/// </summary>
public string ContentType { get; set; }
/// <summary>
/// 是否调用消息。
/// </summary>
/// <returns>如果是则返回true,否则返回false。</returns>
\[MethodImpl(MethodImplOptions.AggressiveInlining)\]
public bool IsInvokeMessage()
{
return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
}
/// <summary>
/// 是否是调用结果消息。
/// </summary>
/// <returns>如果是则返回true,否则返回false。</returns>
\[MethodImpl(MethodImplOptions.AggressiveInlining)\]
public bool IsInvokeResultMessage()
{
return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
}
/// <summary>
/// 获取内容。
/// </summary>
/// <typeparam name="T">内容类型。</typeparam>
/// <returns>内容实例。</returns>
\[MethodImpl(MethodImplOptions.AggressiveInlining)\]
public T GetContent<T>()
{
return (T)Content;
}
/// <summary>
/// 获取Header。
/// </summary>
/// <typeparam name="T">Header类型。</typeparam>
/// <returns>Header实例。</returns>
\[MethodImpl(MethodImplOptions.AggressiveInlining)\]
public T GetHeaders<T>()
{
return (T)Headers;
}
/// <summary>
/// 创建一个调用传输消息。
/// </summary>
/// <param name="invokeMessage">调用实例。</param>
/// <returns>调用传输消息。</returns>
public static TransportMessage CreateInvokeMessage(JsonRequest invokeMessage,NameValueCollection nameValueCollection)
{
return new TransportMessage(invokeMessage, nameValueCollection, MessagePackTransportMessageType.jsonRequestTypeName)
{
Id = Guid.NewGuid().ToString("N")
};
}
/// <summary>
/// 创建一个调用结果传输消息。
/// </summary>
/// <param name="id">消息Id。</param>
/// <param name="invokeResultMessage">调用结果实例。</param>
/// <returns>调用结果传输消息。</returns>
public static TransportMessage CreateInvokeResultMessage(string id, JsonResponse jsonResponse,NameValueCollection nameValueCollection)
{
return new TransportMessage(jsonResponse, nameValueCollection, MessagePackTransportMessageType.jsonResponseTypeName)
{
Id = id
};
}
}
TransportMessage需要在dotnetty中传输,则需要对TransportMessage进行编码解码
消息编解码器
public interface ITransportMessageEncoder
{
byte[] Encode(TransportMessage message);
}
public interface ITransportMessageDecoder
{
TransportMessage Decode(byte[] data);
}
Json编解码
平时编码中经常用的方式
public sealed class JsonTransportMessageEncoder : ITransportMessageEncoder
{
#region Implementation of ITransportMessageEncoder
public byte\[\] Encode(TransportMessage message)
{
var content = JsonConvert.SerializeObject(message);
return Encoding.UTF8.GetBytes(content);
}
#endregion Implementation of ITransportMessageEncoder
}
public sealed class JsonTransportMessageDecoder : ITransportMessageDecoder
{
#region Implementation of ITransportMessageDecoder
public TransportMessage Decode(byte\[\] data)
{
var content = Encoding.UTF8.GetString(data);
var message = JsonConvert.DeserializeObject<TransportMessage>(content);
if (message.IsInvokeMessage())
{
message.Content = JsonConvert.DeserializeObject<JsonRequest>(message.Content.ToString());
}
if (message.IsInvokeResultMessage())
{
message.Content = JsonConvert.DeserializeObject<JsonResponse>(message.Content.ToString());
}
return message;
}
#endregion Implementation of ITransportMessageDecoder
}
MessagePack
官网地址:https://msgpack.org/
贴出代码,不过多的解释
[MessagePackObject]
public class MessagePackTransportMessage
{
public MessagePackTransportMessage(TransportMessage transportMessage)
{
Id = transportMessage.Id;
ContentType = transportMessage.ContentType;
object contentObject;
if (transportMessage.IsInvokeMessage())
{
contentObject = new MessagePackJsonRequest(transportMessage.GetContent<JsonRequest>());
}
else if (transportMessage.IsInvokeResultMessage())
{
contentObject = new MessagePackJsonResponse(transportMessage.GetContent<JsonResponse>());
}
else
{
throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
}
Content = SerializerUtilitys.Serialize(contentObject);
var headersObject = transportMessage.GetHeaders<NameValueCollection>();
Headers = SerializerUtilitys.Serialize(JsonConvert.SerializeObject(MessagePackTransportMessageType.NvcToDictionary(headersObject)));
}
public MessagePackTransportMessage()
{
}
\[Key(0)\]
public string Id { get; set; }
\[Key(1)\]
public byte\[\] Content { get; set; }
\[Key(2)\]
public byte\[\] Headers { get; set; }
\[Key(3)\]
public string ContentType { get; set; }
\[MethodImpl(MethodImplOptions.AggressiveInlining)\]
public bool IsInvokeMessage()
{
return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
}
\[MethodImpl(MethodImplOptions.AggressiveInlining)\]
public bool IsInvokeResultMessage()
{
return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
}
public TransportMessage GetTransportMessage()
{
var message = new TransportMessage
{
ContentType = ContentType,
Id = Id,
Content = null,
Headers = null,
};
object contentObject;
if (IsInvokeMessage())
{
contentObject =
SerializerUtilitys.Deserialize<MessagePackJsonRequest>(Content).GetJsonRequest();
}
else if (IsInvokeResultMessage())
{
contentObject =
SerializerUtilitys.Deserialize<MessagePackJsonResponse>(Content)
.GetJsonResponse();
}
else
{
throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
}
message.Content = contentObject;
var headers = SerializerUtilitys.Deserialize<string>(Headers);
message.Headers = JsonConvert.DeserializeObject(headers);
return message;
}
}
public sealed class MessagePackTransportMessageEncoder:ITransportMessageEncoder
{
#region Implementation of ITransportMessageEncoder
\[MethodImpl(MethodImplOptions.AggressiveInlining)\]
public byte\[\] Encode(TransportMessage message)
{
var transportMessage = new MessagePackTransportMessage(message)
{
Id = message.Id,
ContentType = message.ContentType,
};
return SerializerUtilitys.Serialize(transportMessage);
}
#endregion Implementation of ITransportMessageEncoder
}
public sealed class MessagePackTransportMessageDecoder : ITransportMessageDecoder
{
#region Implementation of ITransportMessageDecoder
\[MethodImpl(MethodImplOptions.AggressiveInlining)\]
public TransportMessage Decode(byte\[\] data)
{
var message = SerializerUtilitys.Deserialize<MessagePackTransportMessage>(data);
return message.GetTransportMessage();
}
#endregion Implementation of ITransportMessageDecoder
}
ProtoBuffer
这个应该听得比较多
[ProtoContract]
public class ProtoBufferTransportMessage
{
public ProtoBufferTransportMessage(TransportMessage transportMessage)
{
Id = transportMessage.Id;
ContentType = transportMessage.ContentType;
object contentObject;
if (transportMessage.IsInvokeMessage())
{
contentObject = new ProtoBufferJsonRequest(transportMessage.GetContent<JsonRequest>());
}
else if (transportMessage.IsInvokeResultMessage())
{
contentObject = new ProtoBufferJsonResponse(transportMessage.GetContent<JsonResponse>());
}
else
{
throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
}
Content = SerializerUtilitys.Serialize(contentObject);
Headers = SerializerUtilitys.Serialize(transportMessage.GetHeaders<NameValueCollection>());
}
public ProtoBufferTransportMessage()
{
}
\[ProtoMember(1)\]
public string Id { get; set; }
\[ProtoMember(2)\]
public byte\[\] Content { get; set; }
\[ProtoMember(3)\]
public byte\[\] Headers { get; set; }
\[ProtoMember(4)\]
public string ContentType { get; set; }
public bool IsInvokeMessage()
{
return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
}
public bool IsInvokeResultMessage()
{
return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
}
public TransportMessage GetTransportMessage()
{
var message = new TransportMessage
{
ContentType = ContentType,
Id = Id,
Content = null,
Headers = null,
};
object contentObject;
if (IsInvokeMessage())
{
contentObject =
SerializerUtilitys.Deserialize<ProtoBufferJsonRequest>(Content).GetJsonRequest();
}
else if (IsInvokeResultMessage())
{
contentObject =
SerializerUtilitys.Deserialize<ProtoBufferJsonResponse>(Content)
.GetJsonResponse();
}
else
{
throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
}
message.Content = contentObject;
message.Headers = SerializerUtilitys.Deserialize<NameValueCollection>(Headers);
return message;
}
}
public sealed class ProtoBufferTransportMessageEncoder : ITransportMessageEncoder
{
#region Implementation of ITransportMessageEncoder
public byte\[\] Encode(TransportMessage message)
{
var transportMessage = new ProtoBufferTransportMessage(message)
{
Id = message.Id,
ContentType = message.ContentType,
};
return SerializerUtilitys.Serialize(transportMessage);
}
#endregion Implementation of ITransportMessageEncoder
}
public sealed class ProtoBufferTransportMessageDecoder : ITransportMessageDecoder
{
#region Implementation of ITransportMessageDecoder
public TransportMessage Decode(byte\[\] data)
{
var message = SerializerUtilitys.Deserialize<ProtoBufferTransportMessage>(data);
return message.GetTransportMessage();
}
#endregion Implementation of ITransportMessageDecoder
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章