Zookeeper基础教程(六):.net core使用Zookeeper
阅读原文时间:2023年07月10日阅读:1

  Demo代码已提交到gitee,感兴趣的更有可以直接克隆使用,地址:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/Zookeeper

  .net core要使用Zookeeper,我们还是推荐使用ZooKeeperNetEx这个插件,先在nuget中搜索安装ZooKeeperNetEx,然后可以在Startup类中直接使用ZooKeeperNetEx连接Zookeeper获取数据,也可以使用前面章节中介绍的ZookeeperHelper辅助类来操作,本文也是以这个Zookeeper辅助类来作为工具,将Zookeeper作为一个配置源,从而将它集成到.net core的Configuration-option配置中。

  博主使用的是.net core2.2的版本,不过对2.*和3.*区别不大。

  首先贴出简单的项目文件结构:

  

  其中,Configurations目录下的一系列Zookeeper开头的文件是集成Zookeeper所需的类,下面单独介绍,首先从我们前面的文章中得到ZookeeperHelper这个辅助类:Zookeeper基础教程(四):C#连接使用Zookeeper

  ZookeeperConfigurationExtensions是一个拓展类,最终使用的是这个类的拓展方法去集成Zookeeper到Configuration配置源中

  

public static class ZookeeperConfigurationExtensions  
{  
    /// <summary>  
    /// 添加Zookeeper配置  
    /// </summary>  
    /// <param name="builder"></param>  
    /// <param name="configure"></param>  
    public static IConfigurationBuilder AddZookeeper(this IConfigurationBuilder builder, Action<ZookeeperOptions> configure)  
    {  
        var options = new ZookeeperOptions();  
        configure?.Invoke(options);  
        return builder.AddZookeeper(options);  
    }  
    /// <summary>  
    /// 添加Zookeeper配置  
    /// </summary>  
    /// <param name="builder"></param>  
    /// <param name="zookeeperOptions"></param>  
    public static IConfigurationBuilder AddZookeeper(this IConfigurationBuilder builder, ZookeeperOptions zookeeperOptions)  
    {  
        ZookeeperConfigurationSource zookeeperConfigurationSource = new ZookeeperConfigurationSource(zookeeperOptions);  
        builder.Add(zookeeperConfigurationSource);  
        return builder;  
    }  
    /// <summary>  
    /// 添加Zookeeper配置  
    /// </summary>  
    /// <param name="builder"></param>  
    /// <param name="configure"></param>  
    /// <returns></returns>  
    public static IHostBuilder UseZookeeper(this IHostBuilder builder, Action<ZookeeperOptions> configure)  
    {  
        var options = new ZookeeperOptions();  
        configure?.Invoke(options);  
        return builder.UseZookeeper(options);  
    }  
    /// <summary>  
    /// 添加Zookeeper配置  
    /// </summary>  
    /// <param name="builder"></param>  
    /// <param name="zookeeperOptions"></param>  
    /// <returns></returns>  
    public static IHostBuilder UseZookeeper(this IHostBuilder builder, ZookeeperOptions zookeeperOptions)  
    {  
        return builder.ConfigureAppConfiguration((\_, cbuilder) => cbuilder.AddZookeeper(zookeeperOptions));  
    }  
}

ZookeeperConfigurationExtensions

  ZookeeperConfigurationProvider是Zookeeper集成的核心类,主要是保存Zookeeper读取下来的数据  

  

public class ZookeeperConfigurationProvider : ConfigurationProvider, IDisposable  
{  
    IDisposable \_changeTokenRegistration;  
    ConfigurationReloadToken \_reloadToken = new ConfigurationReloadToken();

    /// <summary>  
    /// 监控对象  
    /// </summary>  
    ZookeeperConfigurationWatcher zookeeperConfigurationWatcher;  
    public ZookeeperConfigurationSource ZookeeperConfigurationSource { get; private set; }

    public ZookeeperConfigurationProvider(ZookeeperConfigurationSource zookeeperConfigurationSource)  
    {  
        this.ZookeeperConfigurationSource = zookeeperConfigurationSource;  
        this.zookeeperConfigurationWatcher = new ZookeeperConfigurationWatcher(this);

        if (zookeeperConfigurationSource.ZookeeperOptions.ReloadOnChange)  
        {  
            \_changeTokenRegistration = ChangeToken.OnChange(  
                () => Watch(),  
                () =>  
                {  
                    Thread.Sleep(zookeeperConfigurationSource.ZookeeperOptions.ReloadDelay);  
                    Load();  
                });  
        }  
    }  
    /// <summary>  
    /// 获取ReloadToken  
    /// </summary>  
    /// <returns></returns>  
    private IChangeToken Watch()  
    {  
        return \_reloadToken;  
    }  
    /// <summary>  
    /// 加载配置  
    /// </summary>  
    public override void Load()  
    {  
        Data = zookeeperConfigurationWatcher.Process();

        OnReload();  
    }  
    /// <summary>  
    /// 重置  
    /// </summary>  
    public void Reload()  
    {  
        var previousToken = Interlocked.Exchange(ref \_reloadToken, new ConfigurationReloadToken());  
        previousToken.OnReload();  
    }  
    /// <summary>  
    /// 释放  
    /// </summary>  
    public void Dispose()  
    {  
        \_changeTokenRegistration?.Dispose();  
    }  
}

ZookeeperConfigurationProvider

  ZookeeperConfigurationSource类表示一个配置源  

  

public class ZookeeperConfigurationSource : IConfigurationSource  
{  
    /// <summary>  
    /// 源状态信息  
    /// </summary>  
    public ZookeeperOptions ZookeeperOptions { get; private set; }

    public ZookeeperConfigurationSource(ZookeeperOptions zookeeperOptions)  
    {  
        this.ZookeeperOptions = zookeeperOptions;  
    }

    /// <summary>  
    /// 获取配置提供者  
    /// </summary>  
    /// <param name="builder"></param>  
    /// <returns></returns>  
    public IConfigurationProvider Build(IConfigurationBuilder builder)  
    {  
        return new ZookeeperConfigurationProvider(this);  
    }  
}

ZookeeperConfigurationSource

  ZookeeperConfigurationWatcher是Zookeeper数据读取的核心类,ZookeeperHelper这个辅助类就是在这个类中连接Zookeeper去读取数据     

  

public class ZookeeperConfigurationWatcher : IDisposable  
{  
    ZookeeperConfigurationProvider zookeeperConfigurationProvider;

    /// <summary>  
    /// Zookeeper辅助操作类  
    /// </summary>  
    ZookeeperHelper zookeeperHelper;

    public ZookeeperConfigurationWatcher(ZookeeperConfigurationProvider zookeeperConfigurationProvider)  
    {  
        this.zookeeperConfigurationProvider = zookeeperConfigurationProvider;  
        var options = zookeeperConfigurationProvider.ZookeeperConfigurationSource.ZookeeperOptions;  
        zookeeperHelper = new ZookeeperHelper(options.Address, options.RootPath);  
        //初次连接后,立即  
        zookeeperHelper.OnConnected += () =>  
        {  
            if (options.Scheme != AuthScheme.World)  
            {  
                zookeeperHelper.Authorize(options.Scheme, options.Auth);  
            }  
            zookeeperConfigurationProvider.Reload();  
        };  
        zookeeperHelper.Connect();  
        //int timeOut = 10;  
        //while (!zookeeperHelper.Connected && timeOut-- > 0)  
        //{  
        //    Thread.Sleep(1000); //停一秒,等待连接完成  
        //}  
        if (!zookeeperHelper.Connected)  
        {  
            throw new TimeoutException($"connect to zookeeper \[{options.Address}\] timeout");  
        }

        if (options.ReloadOnChange)  
        {  
            //监听根节点下所有的znode节点,当任意节点发生改变后,即立刻重新加载  
            zookeeperHelper.WatchAllAsync(ze =>  
            {  
                if (ze.Type != ZookeeperEvent.EventType.None)  
                {  
                    //使用一个异步去完成,同步会造成死锁  
                    Task.Run(() =>  
                    {  
                        zookeeperConfigurationProvider.Reload();  
                    });  
                }  
            }).Wait();  
        }  
    }

    /// <summary>  
    /// 获取Zookeeper中的数据  
    /// </summary>  
    /// <returns></returns>  
    public IDictionary<string, string> Process()  
    {  
        var data = zookeeperHelper.GetDictionaryAsync(ConfigurationPath.KeyDelimiter).GetAwaiter().GetResult();  
        if (!zookeeperConfigurationProvider.ZookeeperConfigurationSource.ZookeeperOptions.ExcludeRoot)  
        {  
            return data;  
        }

        var root = this.zookeeperConfigurationProvider.ZookeeperConfigurationSource.ZookeeperOptions.RootPath ?? "";  
        var rootSplits = root.Split(new string\[\] { "/" }, StringSplitOptions.RemoveEmptyEntries);  
        IDictionary<string, string> dict = new Dictionary<string, string>();  
        foreach (var key in data.Keys)  
        {  
            var split = key.Split(new string\[\] { ConfigurationPath.KeyDelimiter }, StringSplitOptions.RemoveEmptyEntries);  
            var array = split.Skip(rootSplits.Length).ToArray();  
            dict\[ConfigurationPath.Combine(array)\] = data\[key\];  
        }  
        return dict;  
    }  
    /// <summary>  
    /// 释放资源  
    /// </summary>  
    public virtual void Dispose()  
    {  
        zookeeperHelper.Dispose();  
        zookeeperConfigurationProvider.Dispose();  
    }  
}

ZookeeperConfigurationWatcher

  ZookeeperOptions是相关配置信息  

  

public class ZookeeperOptions  
{  
    /// <summary>  
    /// 是否监控源数据变化  
    /// </summary>  
    public bool ReloadOnChange { get; set; } = true;  
    /// <summary>  
    /// 加载延迟  
    /// </summary>  
    public int ReloadDelay { get; set; } = 250;  
    /// <summary>  
    /// Zookeeper集群地址  
    /// </summary>  
    public string\[\] Address { get; set; }  
    /// <summary>  
    /// Zookeeper初始路径  
    /// </summary>  
    public string RootPath { get; set; }  
    /// <summary>  
    /// 认证主题  
    /// </summary>  
    public AuthScheme Scheme { get; set; }  
    /// <summary>  
    /// 认证信息  
    /// </summary>  
    public string Auth { get; set; }  
    /// <summary>  
    /// 生成的配置中是否去掉初始路径  
    /// </summary>  
    public bool ExcludeRoot { get; set; }  
}

ZookeeperOptions

  然后是Program类:  

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using AspNetCore.ZookeeperApi.Configurations;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace AspNetCore.ZookeeperApi
{
public class Program
{
public static void Main(string[] args)
{
CreateWebHostBuilder(args).Build().Run();
}

    public static IWebHostBuilder CreateWebHostBuilder(string\[\] args) =>  
        WebHost.CreateDefaultBuilder(args)  
            .UseStartup<Startup>()  
            .UseZookeeper(new ZookeeperOptions()  
            {  
                //Auth = "user:123",  
                //Scheme = AuthScheme.Digest,  
                //Address = new string\[\] { "192.168.209.133:2181", "192.168.209.134:2181", "192.168.209.135:2181" },  
                Address = new string\[\] { "192.168.209.133:2181" },  
                RootPath = "/Test"  
            });  
}  

}

  其中UseZookeeper方法就是ZookeeperConfigurationExtensions的一个拓展方法,调用这个拓展方法之后,Zookeeper读取的配置可以通过IConfiguration这个服务对象读取了。

  UseZookeeper拓展方法传入了一个参数,Address是Zookeeper连接地址,RootPath是项目配置在Zookeeper中的根节点路径。

  假如我们在Zookeeper中创建一个znode节点:/Test,然后在/Test节点下创建5个子节点:/Test/Value1、/Test/Value2、/Test/Value3、/Test/Value4、/Test/Value5

  

  然后分别将数据保存到这5个节点中:  

# Value1表示字符串类型数据
  set /Test/Value1 'this is string values'
  # Value2表示整形类型数据
  set /Test/Value2 1
  # Value3表示浮点型数据
  set /Test/Value3 3.14
  # Value4表示布尔类型数据
  set /Test/Value4 false
  # Value5表示日期类型数据
  set /Test/Value5 '2020-06-02'

  上面说了,Zookeeper读取的配置可以通过IConfiguration这个服务对象读取,IConfiguration是.net core提供的配置服务,我们可以直接使用读取,但是推荐使用Option方式读取,比如为读取/Test节点下的5个节点数据,我们可以创建一个TestOptions类:  

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace AspNetCore.ZookeeperApi
{
public class TestOptions
{
///

/// 字符数据 ///
public string Value1 { get; set; }
/// /// 整形数据 ///
public int Value2 { get; set; }
/// /// 浮点型数据 ///
public decimal Value3 { get; set; }
/// /// 布尔型数据 ///
public bool Value4 { get; set; }
/// /// 日期数据 ///
public DateTime Value5 { get; set; }
}
}

  然后在Startup的ConfigureServices方法中注册  

public void ConfigureServices(IServiceCollection services)
{
services.Configure(Configuration.GetSection("Test"));

   services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version\_2\_2);  

}

  之后我们就可以直接使用Option注入的形式去访问了,比如我们创建一个TestController的控制器:  

\[HttpGet\]  
public object Get()  
{  
    //使用IOptionsSnapshot<>,它是Scoped作用域  
    var optionsSnapshot = HttpContext.RequestServices.GetService<IOptionsSnapshot<TestOptions>>();

    //因为IOptionsMonitor<>有缓存IOptionsMonitorCache<>,所以需要注意使用  
    //如果使用service.Configure<TOptions>(IConfiguration)方法添加的Options,否则需要自行设置重新加载机制  
    //重新加载可参考:https://www.cnblogs.com/shanfeng1000/p/15095236.html  
    var optionsMonitor = HttpContext.RequestServices.GetService<IOptionsMonitor<TestOptions>>();

    return new  
    {  
        OptionsSnapshot = optionsSnapshot.Value,  
        OptionsMonitor = optionsMonitor.CurrentValue  
    };  
}

  然后运行项目,访问这个接口地址就能得到:

  

  可以看到Zookeeper中的数据成功被读取到,现在我们可以修改Zookeeper中/Test节点下的任意节点数据,如:  

set /Test/Value2 2

  由于Zookeeper的监听机制,重新调用接口,你会看到Value2的值也成功刷新了

  

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章