企业级工作流解决方案(七)--微服务Tcp消息传输模型之消息编解码
阅读原文时间:2023年07月10日阅读:1

  Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging

  Json-rpc没有定义消息如何传输,因此,Json-Rpc RpcRequest对象和RpcResponse对象需要一个传输载体,这里的传输对象主是TransportMessage,如下代码,这里的Content请求时为RcpRequest对象,答复时为RpcResponse对象,答复时Header一般情况下为空。

///

/// 传输消息模型。 ///
public class TransportMessage
{

    public TransportMessage()  
    {  
    }  
    \[MethodImpl(MethodImplOptions.AggressiveInlining)\]  
    public TransportMessage(object content,object headers)  
    {  
        if (content == null)  
            throw new ArgumentNullException(nameof(content));

        Content = content;  
        Headers = headers;  
        ContentType = content.GetType().FullName;  
    }  
    \[MethodImpl(MethodImplOptions.AggressiveInlining)\]  
    public TransportMessage(object content, object headers, string fullName)  
    {  
        if (content == null)  
            throw new ArgumentNullException(nameof(content));

        Headers = headers;  
        Content = content;  
        ContentType = fullName;  
    }

    /// <summary>  
    /// 消息Id。  
    /// </summary>  
    public string Id { get; set; }

    /// <summary>  
    /// 消息内容。  
    /// </summary>  
    public object Content { get; set; }

    /// <summary>  
    /// 消息传输Header  
    /// </summary>  
    public object Headers { get; set; }

    /// <summary>  
    /// 内容类型。  
    /// </summary>  
    public string ContentType { get; set; }

    /// <summary>  
    /// 是否调用消息。  
    /// </summary>  
    /// <returns>如果是则返回true,否则返回false。</returns>  
    \[MethodImpl(MethodImplOptions.AggressiveInlining)\]  
    public bool IsInvokeMessage()  
    {  
        return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;  
    }

    /// <summary>  
    /// 是否是调用结果消息。  
    /// </summary>  
    /// <returns>如果是则返回true,否则返回false。</returns>  
    \[MethodImpl(MethodImplOptions.AggressiveInlining)\]  
    public bool IsInvokeResultMessage()  
    {  
        return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;  
    }

    /// <summary>  
    /// 获取内容。  
    /// </summary>  
    /// <typeparam name="T">内容类型。</typeparam>  
    /// <returns>内容实例。</returns>  
    \[MethodImpl(MethodImplOptions.AggressiveInlining)\]  
    public T GetContent<T>()  
    {  
        return (T)Content;  
    }

    /// <summary>  
    /// 获取Header。  
    /// </summary>  
    /// <typeparam name="T">Header类型。</typeparam>  
    /// <returns>Header实例。</returns>  
    \[MethodImpl(MethodImplOptions.AggressiveInlining)\]  
    public T GetHeaders<T>()  
    {  
        return (T)Headers;  
    }

    /// <summary>  
    /// 创建一个调用传输消息。  
    /// </summary>  
    /// <param name="invokeMessage">调用实例。</param>  
    /// <returns>调用传输消息。</returns>  
    public static TransportMessage CreateInvokeMessage(JsonRequest invokeMessage,NameValueCollection nameValueCollection)  
    {  
        return new TransportMessage(invokeMessage, nameValueCollection, MessagePackTransportMessageType.jsonRequestTypeName)  
        {  
            Id = Guid.NewGuid().ToString("N")  
        };  
    }

    /// <summary>  
    /// 创建一个调用结果传输消息。  
    /// </summary>  
    /// <param name="id">消息Id。</param>  
    /// <param name="invokeResultMessage">调用结果实例。</param>  
    /// <returns>调用结果传输消息。</returns>  
    public static TransportMessage CreateInvokeResultMessage(string id, JsonResponse jsonResponse,NameValueCollection nameValueCollection)  
    {  
        return new TransportMessage(jsonResponse, nameValueCollection, MessagePackTransportMessageType.jsonResponseTypeName)  
        {  
            Id = id  
        };  
    }  
}

  TransportMessage需要在dotnetty中传输,则需要对TransportMessage进行编码解码

消息编解码器

public interface ITransportMessageEncoder
{
byte[] Encode(TransportMessage message);
}
public interface ITransportMessageDecoder
{
TransportMessage Decode(byte[] data);
}

Json编解码

  平时编码中经常用的方式

public sealed class JsonTransportMessageEncoder : ITransportMessageEncoder
{
#region Implementation of ITransportMessageEncoder

    public byte\[\] Encode(TransportMessage message)  
    {  
        var content = JsonConvert.SerializeObject(message);  
        return Encoding.UTF8.GetBytes(content);  
    }

    #endregion Implementation of ITransportMessageEncoder  

}

public sealed class JsonTransportMessageDecoder : ITransportMessageDecoder
{
#region Implementation of ITransportMessageDecoder

    public TransportMessage Decode(byte\[\] data)  
    {  
        var content = Encoding.UTF8.GetString(data);  
        var message = JsonConvert.DeserializeObject<TransportMessage>(content);  
        if (message.IsInvokeMessage())  
        {  
            message.Content = JsonConvert.DeserializeObject<JsonRequest>(message.Content.ToString());  
        }  
        if (message.IsInvokeResultMessage())  
        {  
            message.Content = JsonConvert.DeserializeObject<JsonResponse>(message.Content.ToString());  
        }  
        return message;  
    }

    #endregion Implementation of ITransportMessageDecoder  
}

MessagePack

  官网地址:https://msgpack.org/

  贴出代码,不过多的解释

[MessagePackObject]
public class MessagePackTransportMessage
{
public MessagePackTransportMessage(TransportMessage transportMessage)
{
Id = transportMessage.Id;
ContentType = transportMessage.ContentType;

        object contentObject;  
        if (transportMessage.IsInvokeMessage())  
        {  
            contentObject = new MessagePackJsonRequest(transportMessage.GetContent<JsonRequest>());  
        }  
        else if (transportMessage.IsInvokeResultMessage())  
        {  
            contentObject = new MessagePackJsonResponse(transportMessage.GetContent<JsonResponse>());  
        }  
        else  
        {  
            throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");  
        }

        Content = SerializerUtilitys.Serialize(contentObject);

        var headersObject = transportMessage.GetHeaders<NameValueCollection>();

        Headers = SerializerUtilitys.Serialize(JsonConvert.SerializeObject(MessagePackTransportMessageType.NvcToDictionary(headersObject)));  
    }

    public MessagePackTransportMessage()  
    {  
    }

    \[Key(0)\]  
    public string Id { get; set; }

    \[Key(1)\]  
    public byte\[\] Content { get; set; }

    \[Key(2)\]  
    public byte\[\] Headers { get; set; }

    \[Key(3)\]  
    public string ContentType { get; set; }

    \[MethodImpl(MethodImplOptions.AggressiveInlining)\]  
    public bool IsInvokeMessage()  
    {  
        return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;  
    }

    \[MethodImpl(MethodImplOptions.AggressiveInlining)\]  
    public bool IsInvokeResultMessage()  
    {  
        return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;  
    }

    public TransportMessage GetTransportMessage()  
    {  
        var message = new TransportMessage  
        {  
            ContentType = ContentType,  
            Id = Id,  
            Content = null,  
            Headers = null,  
        };

        object contentObject;  
        if (IsInvokeMessage())  
        {  
            contentObject =  
                SerializerUtilitys.Deserialize<MessagePackJsonRequest>(Content).GetJsonRequest();  
        }  
        else if (IsInvokeResultMessage())  
        {  
            contentObject =  
                SerializerUtilitys.Deserialize<MessagePackJsonResponse>(Content)  
                    .GetJsonResponse();  
        }  
        else  
        {  
            throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");  
        }  
        message.Content = contentObject;  
        var headers = SerializerUtilitys.Deserialize<string>(Headers);  
        message.Headers = JsonConvert.DeserializeObject(headers);  
        return message;  
    }  

}

public sealed class MessagePackTransportMessageEncoder:ITransportMessageEncoder
{
#region Implementation of ITransportMessageEncoder

    \[MethodImpl(MethodImplOptions.AggressiveInlining)\]  
    public byte\[\] Encode(TransportMessage message)  
    {  
        var transportMessage = new MessagePackTransportMessage(message)  
        {  
            Id = message.Id,  
            ContentType = message.ContentType,  
        };  
        return SerializerUtilitys.Serialize(transportMessage);  
    }  
    #endregion Implementation of ITransportMessageEncoder  
}

public sealed class MessagePackTransportMessageDecoder : ITransportMessageDecoder
{
#region Implementation of ITransportMessageDecoder

    \[MethodImpl(MethodImplOptions.AggressiveInlining)\]  
    public TransportMessage Decode(byte\[\] data)  
    {  
        var message = SerializerUtilitys.Deserialize<MessagePackTransportMessage>(data);  
        return message.GetTransportMessage();  
    }

    #endregion Implementation of ITransportMessageDecoder  
}

ProtoBuffer

  这个应该听得比较多

[ProtoContract]
public class ProtoBufferTransportMessage
{
public ProtoBufferTransportMessage(TransportMessage transportMessage)
{
Id = transportMessage.Id;
ContentType = transportMessage.ContentType;

        object contentObject;  
        if (transportMessage.IsInvokeMessage())  
        {  
            contentObject = new ProtoBufferJsonRequest(transportMessage.GetContent<JsonRequest>());  
        }  
        else if (transportMessage.IsInvokeResultMessage())  
        {  
            contentObject = new ProtoBufferJsonResponse(transportMessage.GetContent<JsonResponse>());  
        }  
        else  
        {  
            throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");  
        }

        Content = SerializerUtilitys.Serialize(contentObject);  
        Headers = SerializerUtilitys.Serialize(transportMessage.GetHeaders<NameValueCollection>());  
    }

    public ProtoBufferTransportMessage()  
    {  
    }

    \[ProtoMember(1)\]  
    public string Id { get; set; }

    \[ProtoMember(2)\]  
    public byte\[\] Content { get; set; }

    \[ProtoMember(3)\]  
    public byte\[\] Headers { get; set; }

    \[ProtoMember(4)\]  
    public string ContentType { get; set; }

    public bool IsInvokeMessage()  
    {  
        return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;  
    }

    public bool IsInvokeResultMessage()  
    {  
        return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;  
    }

    public TransportMessage GetTransportMessage()  
    {  
        var message = new TransportMessage  
        {  
            ContentType = ContentType,  
            Id = Id,  
            Content = null,  
            Headers = null,  
        };

        object contentObject;  
        if (IsInvokeMessage())  
        {  
            contentObject =  
                SerializerUtilitys.Deserialize<ProtoBufferJsonRequest>(Content).GetJsonRequest();  
        }  
        else if (IsInvokeResultMessage())  
        {  
            contentObject =  
                SerializerUtilitys.Deserialize<ProtoBufferJsonResponse>(Content)  
                    .GetJsonResponse();  
        }  
        else  
        {  
            throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");  
        }

        message.Content = contentObject;  
        message.Headers = SerializerUtilitys.Deserialize<NameValueCollection>(Headers);

        return message;  
    }  

}

public sealed class ProtoBufferTransportMessageEncoder : ITransportMessageEncoder
{
#region Implementation of ITransportMessageEncoder

    public byte\[\] Encode(TransportMessage message)  
    {  
        var transportMessage = new ProtoBufferTransportMessage(message)  
        {  
            Id = message.Id,  
            ContentType = message.ContentType,  
        };

        return SerializerUtilitys.Serialize(transportMessage);  
    }

    #endregion Implementation of ITransportMessageEncoder  
}

public sealed class ProtoBufferTransportMessageDecoder : ITransportMessageDecoder
{
#region Implementation of ITransportMessageDecoder

    public TransportMessage Decode(byte\[\] data)  
    {  
        var message = SerializerUtilitys.Deserialize<ProtoBufferTransportMessage>(data);  
        return message.GetTransportMessage();  
    }

    #endregion Implementation of ITransportMessageDecoder  
}

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器