.net core集成使用EasyNetQ来使用rabbitmq
阅读原文时间:2023年07月08日阅读:4

  之前有写到一篇介绍EasyNetQ的博文(C# .net 使用rabbitmq消息队列——EasyNetQ插件介绍),所以本文从.net core的角度去继承使用EasyNetQ,而用法类似于之前集成使用rabbitmq的博文:.net core使用rabbitmq消息队列 (二)

  国际惯例,先上代码,但是代码比较多,所有又放gitee了:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/EasyNetQ

  消息发布(AspNetCore.WebApi.Producer)

  Demo中这个项目是消息的发布程序,在Startup中添加服务:    

  

public void ConfigureServices(IServiceCollection services)  
{  
    var connectionString = "host=192.168.209.133;virtualHost=/;username=admin;password=123456;timeout=60";  
    string\[\] hosts = new string\[\] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };  
    ushort port = 5672;  
    string userName = "admin";  
    string password = "123456";  
    string virtualHost = "/";  
    var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

    #region 订阅发布

    services.AddEasyNetQProducer("Publish", options =>  
    {  
        //options.ConnectionString = connectionString;  
        options.Hosts = hosts;  
        options.Port = port;  
        options.Password = password;  
        options.UserName = userName;  
        options.VirtualHost = virtualHost;

        options.PersistentMessages = true;  
        options.Priority = 1;  
    });

    #endregion  
    #region 请求响应

    services.AddEasyNetQProducer("Request", options =>  
    {  
        //options.ConnectionString = connectionString;  
        options.Hosts = hosts;  
        options.Port = port;  
        options.Password = password;  
        options.UserName = userName;  
        options.VirtualHost = virtualHost;

        options.PersistentMessages = true;  
        options.Priority = 3;  
    });

    #endregion  
    #region 发送接收

    services.AddEasyNetQProducer("Send", options =>  
    {  
        //options.ConnectionString = connectionString;  
        options.Hosts = hosts;  
        options.Port = port;  
        options.Password = password;  
        options.UserName = userName;  
        options.VirtualHost = virtualHost;

        options.Priority = 4;  
        options.Queue = "send-recieve";  
    });

    #endregion  
    ......  
}

ConfigureServices

  添加相关服务使用AddEasyNetQProducer方法,可以指定一个名称,在创建生产者时可以提供指定的名称。熟悉EasyNetQ的朋友应该知道它提供三种消息模式:Publish/Subscribe, Request/Response和 Send/Receive,正是上面的三种申明方式。

  使用时,需要先注入IBusClientFactory对象,使用它的Create方法创建生产者对象,然后使用这个对象的方法操作消息(Publish方法、Request方法、Send方法分别对应上面的三种模式)。

  另外,EasyNetQ的消息都是一些自定的实体类,因此我们发送消息需要自定创建实体类,比如发布订阅消息时创建的实体类Subscriber:  

public class Subscriber  
{  
    public string Message { get; set; }  
}

  使用时:  

/// <summary>  
/// 发布订阅模式  
/// </summary>  
/// <param name="message"></param>  
/// <returns></returns>  
\[HttpGet("Publish")\]  
public string Publish(string message)  
{  
    message = message ?? "";  
    var bus = busFactory.Create("Publish");  
    bus.Publish(new Subscriber() { Message = message });

    return "success";  
}

  消息消费(AspNetCore.WebApi.Consumer)

  首先,在Startup中添加服务:  

  

public void ConfigureServices(IServiceCollection services)  
{  
    var connectionString = "host=192.168.209.133;virtualHost=/;username=admin;password=123456;timeout=60";  
    string\[\] hosts = new string\[\] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };  
    ushort port = 5672;  
    string userName = "admin";  
    string password = "123456";  
    string virtualHost = "/";  
    var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

    #region 订阅发布

    services.AddEasyNetQConsumer(options =>  
    {  
        //options.ConnectionString = connectionString;  
        options.Hosts = hosts;  
        options.Port = port;  
        options.Password = password;  
        options.UserName = userName;  
        options.VirtualHost = virtualHost;

        options.AutoDelete = true;  
        options.Durable = true;  
        options.PrefetchCount = 1;  
        options.Priority = 2;  
    })  
    .AddSubscriber("PubSub1",typeof(EasyNetQSubscriber))  
    .AddSubscriber<Subscriber>("PubSub2", r =>  
    {  
        Console.WriteLine("PubSub:" + r.Message);  
    });

    #endregion  
    #region 请求响应

    services.AddEasyNetQConsumer(options =>  
    {  
        //options.ConnectionString = connectionString;  
        options.Hosts = hosts;  
        options.Port = port;  
        options.Password = password;  
        options.UserName = userName;  
        options.VirtualHost = virtualHost;

        options.Durable = true;  
        options.PrefetchCount = 2;  
    })  
    .AddResponder(typeof(EasyNetQResponder))  
    .AddResponder<Requester, Responder>(request =>  
    {  
        Console.WriteLine("Rpc:" + request.Data);  
        return new Responder() { Result = "Rpc:" + request.Data };  
    });

    #endregion  
    #region 发送接收

    services.AddEasyNetQConsumer(options =>  
    {  
        //options.ConnectionString = connectionString;  
        options.Hosts = hosts;  
        options.Port = port;  
        options.Password = password;  
        options.UserName = userName;  
        options.VirtualHost = virtualHost;

        options.Priority = 5;  
        options.PrefetchCount = 5;  
        options.Exclusive = false;  
        options.Arguments = arguments;  
        options.Queue = "send-recieve";  
    })  
    .AddReceiver(typeof(EasyNetQReceiver<Reciever1>))  
    .AddReceiver(typeof(EasyNetQReceiver<Reciever2>));  
    //.AddReceiver<Reciever1>(r =>  
    //{  
    //    Console.WriteLine("Reciever1:" + r.Message);  
    //})  
    //.AddReceiver<Reciever2>(r =>  
    //{  
    //    Console.WriteLine("Reciever2:" + r.Message);  
    //});

    #endregion

    ......  
}

ConfigureServices

  这里先使用AddEasyNetQConsumer方法获得一个消费者建造者,然后使用它的AddSubscriber方法、AddResponder方法、AddReceiver方法添加消费消息的处理过程,当然这三个方法分别也是对应上面的三种模式。

  另外,这三个方法添加的消息处理程序可以使用Lambda表达式实现,也可以通过响应的接口实现,比如AddSubscriber方法添加的处理程序可通过实现了IEasyNetQSubscriber接口的类来替代,比如Demo中的EasyNetQSubscriber:  

public class EasyNetQSubscriber : IEasyNetQSubscriber<Subscriber>  
{  
    public void Subscribe(Subscriber message)  
    {  
        Console.WriteLine("EasyNetQSubscriber:" + message.Message);  
    }  
}