NetMQ用作IPC的实例
阅读原文时间:2023年07月08日阅读:2
  • 发送端/接收端

    using System;
    using System.Threading;
    using NetMQ;
    using NetMQ.Sockets;

    namespace NetMQIPCServer
    {
    class Program
    {
    private const string topic = "unity3d";
    private static bool running = true;
    static void Main(string[] args)
    {
    Console.WriteLine("Main thread id = {0}", Thread.CurrentThread.ManagedThreadId);

         // 进程内通信  
         //pub.Bind("inproc://unity3d");  
         //sub.Connect("inproc://unity3d");
     // 跨进程通信  
     //var pub = new PublisherSocket();  
     //var sub = new SubscriberSocket();  
     //pub.Bind("ipc:///unity3d/0");  
     //sub.Connect("ipc:///unity3d/0");  
     //sub.Subscribe(topic);
    
     // 定向接收\[NonBlock\]  
     //var pub = new PublisherSocket();  
     //var sub = new SubscriberSocket();  
     //pub.Bind("ipc:///unity3d/0");  
     //sub.Connect("ipc:///unity3d/0");  
     //sub.Subscribe(topic);  
     //var proactor = new NetMQProactor(sub, (socket, message) =>  
     //{  
     //    //Console.WriteLine(message);  
     //    foreach (var b in message\[1\].Buffer)  
     //    {  
     //        Console.Write("{0:x2}", b);  
     //        Console.Write(" ");  
     //    }  
     //    Console.WriteLine("Received message in thread {0}", Thread.CurrentThread.ManagedThreadId);  
     //});
    
     // TCP  
     var pub = new PublisherSocket();  
     var sub = new SubscriberSocket();  
     pub.Bind("tcp://\*:2017");  
     sub.Connect("tcp://localhost:2017");  
     sub.Subscribe(topic);  
     var proactor = new NetMQProactor(sub, (socket, message) =>  
     {  
         //Console.WriteLine(message);  
         foreach (var b in message\[\].Buffer)  
         {  
             Console.Write("{0:x2}", b);  
             Console.Write(" ");  
         }  
         Console.WriteLine("Received message in thread {0}", Thread.CurrentThread.ManagedThreadId);  
     });
    
     // 轮询模式  
     //var poller = new NetMQPoller();  
     //poller.Add(sub);  
     //sub.ReceiveReady += (sender, eventArgs) =>  
     //{  
     //    bool more = false;  
     //    byte\[\] bytes = null;  
     //    eventArgs.Socket.ReceiveFrameBytes(out more);  
     //    if (more)  
     //    {  
     //        bytes = eventArgs.Socket.ReceiveFrameBytes();  
     //        foreach (var b in bytes)  
     //        {  
     //            Console.Write("{0:x2}", b);  
     //            Console.Write(" ");  
     //        }  
     //        Console.WriteLine("Received bytes in thread {0}", Thread.CurrentThread.ManagedThreadId);  
     //    }  
     //};
    
     new Thread(() =>  
     {  
         while (running)  
         {  
             Console.WriteLine("PublisherSocket:Send Bytes in thread {0}.", Thread.CurrentThread.ManagedThreadId);  
             pub.SendMoreFrame(topic).SendFrame(new byte\[\] { 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,  
                 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,  
                 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,  
                 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,  
                 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,  
                 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,  
                 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,  
                 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,  
                 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,  
                 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,  
                 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,  
                 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,  
                 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,  
                 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,  
                 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,  
                 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3 });  
             Thread.Sleep();  
         }  
     }).Start();
    
     //new Thread(() =>  
     //{  
     //    bool more = false;  
     //    byte\[\] bytes = null;  
     //    while (running)  
     //    {  
     //        if (sub.HasIn)  
     //        {  
     //            sub.ReceiveFrameBytes(out more);  
     //            if (more)  
     //            {  
     //                bytes = sub.ReceiveFrameBytes();  
     //                foreach (var b in bytes)  
     //                {  
     //                    Console.Write("{0:x2}", b);  
     //                    Console.Write(" ");  
     //                }  
     //                Console.WriteLine("Received bytes in thread {0}", Thread.CurrentThread.ManagedThreadId);  
     //            }  
     //        }  
     //    }  
     //}).Start();  
     // 在主线程轮询\[Block\]  
     //poller.Run();  
     // 异步轮询\[NonBlock\]  
     //poller.RunAsync();
    
     Console.ReadKey();  
     running = false;  
     Thread.Sleep();  
     //poller.Stop();  
     //poller.StopAsync();  
     pub.Close();  
     sub.Close();  
     NetMQConfig.Cleanup();  
    }

    }
    }

  • 运行结果如图

NetMQ官方主页:http://netmq.readthedocs.io/en/latest/

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器