赞
踩
进程间通讯多有那些方式?
(1)本地进程:管道、消息、事件和共享内存。
(2)分布式进程:socket,remoting,wcf,通讯中间件。
微软在 .NET 框架中提供了多种实用的线程同步手段,其中包括 monitor 类及 reader-writer锁。但跨进程的同步方法还是非常欠缺。另外,目前也没有方便的线程间及进程间传递消息的方法。例如C/S和SOA,又或者生产者/消费者模式中就常常需要传递消息。为此我编写了一个独立完整的框架,实现了跨线程和跨进程的同步和通讯。这框架内包含了信号量,信箱,内存映射文件,阻塞通道,及简单消息流控制器等组件。这篇文章里提到的类同属于一个开源的库项目(BSD许可),你可以从这里下载到 www.cdrnet.net/projects/threadmsg/.
这个框架的目的是:
注意:我删除了本文中全部代码的XML注释以节省空间。如果你想知道这些方法和参数的详细信息,请参考附件中的代码。
使用了这个库后,跨进程的消息传递将变得非常简单。我将用一个小例子来作示范:一个控制台程序,根据参数可以作为发送方也可以作为接收方运行。在发送程序里,你可以输入一定的文本并发送到信箱内(返回key),接收程序将显示所有从信箱内收到的消息。你可以运行无数个发送程序和接收程序,但是每个消息只会被具体的某一个接收程序所收到。
01 | [Serializable] |
02 | struct Message |
03 | { |
04 | public string Text; |
05 | } |
06 |
07 | class Test |
08 | { |
09 | IMailBox mail; |
10 |
11 | public Test() |
12 | { |
13 | mail = new ProcessMailBox( "TMProcessTest" ,1024); |
14 | } |
15 |
16 | public void RunWriter() |
17 | { |
18 | Console.WriteLine( "Writer started" ); |
19 | Message msg; |
20 | while ( true ) |
21 | { |
22 | msg.Text = Console.ReadLine(); |
23 | if (msg.Text.Equals( "exit" )) |
24 | break ; |
25 | mail.Content = msg; |
26 | } |
27 | } |
28 |
29 | public void RunReader() |
30 | { |
31 | Console.WriteLine( "Reader started" ); |
32 | while ( true ) |
33 | { |
34 | Message msg = (Message)mail.Content; |
35 | Console.WriteLine(msg.Text); |
36 | } |
37 | } |
38 |
39 | [STAThread] |
40 | static void Main( string [] args) |
41 | { |
42 | Test test = new Test(); |
43 | if (args.Length > 0) |
44 | test.RunWriter(); |
45 | else |
46 | test.RunReader(); |
47 | } |
48 | } |
信箱一旦创建之后(这上面代码里是 ProcessMailBox ),接收消息只需要读取 Content 属性,发送消息只需要给这个属性赋值。当没有数据时,获取消息将会阻塞当前线程;发送消息时如果信箱里已经有数据,则会阻塞当前线程。正是有了这个阻塞,整个程序是完全基于中断的,并且不会过度占用CPU(不需要进行轮询)。发送和接收的消息可以是任意支持序列化(Serializable)的类型。
然而,实际上暗地里发生的事情有点复杂:消息通过内存映射文件来传递,这是目前唯一的跨进程共享内存的方法,这个例子里我们只会在 pagefile 里面产生虚拟文件。对这个虚拟文件的访问是通过 win32 信号量来确保同步的。消息首先序列化成二进制,然后再写进该文件,这就是为什么需要声明Serializable属性。内存映射文件和 win32 信号量都需要调用NT内核的方法。多得了 .NET 框架中的 Marshal 类,我们可以避免编写不安全的代码。我们将在下面讨论更多的细节。
线程/进程间的通讯需要共享内存或者其他内建机制来发送/接收数据。即使是采用共享内存的方式,也还需要一组同步方法来允许并发访问。
同一个进程内的所有线程都共享公共的逻辑地址空间(堆)。对于不同进程,从 win2000 开始就已经无法共享内存。然而,不同的进程可以读写同一个文件。WinAPI提供了多种系统调用方法来映射文件到进程的逻辑空间,及访问系统内核对象(会话)指向的 pagefile 里面的虚拟文件。无论是共享堆,还是共享文件,并发访问都有可能导致数据不一致。我们就这个问题简单讨论一下,该怎样确保线程/进程调用的有序性及数据的一致性。
.NET 框架和 C# 提供了方便直观的线程同步方法,即 monitor 类和 lock 语句(本文将不会讨论 .NET 框架的互斥量)。对于线程同步,虽然本文提供了其他方法,我们还是推荐使用 lock 语句。
01 | void Work1() |
02 | { |
03 | NonCriticalSection1(); |
04 | Monitor.Enter( this ); |
05 | try |
06 | { |
07 | CriticalSection(); |
08 | } |
09 | finally |
10 | { |
11 | Monitor.Exit( this ); |
12 | } |
13 | NonCriticalSection2(); |
14 | } |
1 | void Work2() |
2 | { |
3 | NonCriticalSection1(); |
4 | lock ( this ) |
5 | { |
6 | CriticalSection(); |
7 | } |
8 | NonCriticalSection2(); |
9 | } |
Work1 和 Work2 是等价的。在C#里面,很多人喜欢第二个方法,因为它更短,且不容易出错。
信号量是经典的同步基本概念之一(由 Edsger Dijkstra 引入)。信号量是指一个有计数器及两个操作的对象。它的两个操作是:获取(也叫P或者等待),释放(也叫V或者收到信号)。信号量在获取操作时如果计数器为0则阻塞,否则将计数器减一;在释放时将计数器加一,且不会阻塞。虽然信号量的原理很简单,但是实现起来有点麻烦。好在,内建的monitor 类有阻塞特性,可以用来实现信号量。
01 | public sealed class ThreadSemaphore : ISemaphore |
02 | { |
03 | private int counter; |
04 | private readonly int max; |
05 |
06 | public ThreadSemaphore() : this (0, int .Max) {} |
07 | public ThreadSemaphore( int initial) : this (initial, int .Max) {} |
08 | public ThreadSemaphore( int initial, int max) |
09 | { |
10 | this .counter = Math.Min(initial,max); |
11 | this .max = max; |
12 | } |
13 |
14 | public void Acquire() |
15 | { |
16 | lock ( this ) |
17 | { |
18 | counter--; |
19 | if (counter < 0 && !Monitor.Wait( this )) |
20 | throw new SemaphoreFailedException(); |
21 | } |
22 | } |
23 |
24 | public void Acquire(TimeSpan timeout) |
25 | { |
26 | lock ( this ) |
27 | { |
28 | counter--; |
29 | if (counter < 0 && !Monitor.Wait( this ,timeout)) |
30 | throw new SemaphoreFailedException(); |
31 | } |
32 | } |
33 |
34 | public void Release() |
35 | { |
36 | lock ( this ) |
37 | { |
38 | if (counter >= max) |
39 | throw new SemaphoreFailedException(); |
40 | if (counter < 0) |
41 | Monitor.Pulse( this ); |
42 | counter++; |
43 | } |
44 | } |
45 | } |
信号量在复杂的阻塞情景下更加有用,例如我们后面将要讨论的通道(channel)。你也可以使用信号量来实现临界区的排他性(如下面的 Work3),但是我还是推荐使用内建的 lock 语句,像上面的 Work2 那样。
请注意:如果使用不当,信号量也是有潜在危险的。正确的做法是:当获取信号量失败时,千万不要再调用释放操作;当获取成功时,无论发生了什么错误,都要记得释放信号量。遵循这样的原则,你的同步才是正确的。Work3 中的 finally语句就是为了保证正确释放信号量。注意:获取信号量( s.Acquire() )的操作必须放到 try 语句的外面,只有这样,当获取失败时才不会调用释放操作。
01 | ThreadSemaphore s = new ThreadSemaphore(1); |
02 | void Work3() |
03 | { |
04 | NonCriticalSection1(); |
05 | s.Acquire(); |
06 | try |
07 | { |
08 | CriticalSection(); |
09 | } |
10 | finally |
11 | { |
12 | s.Release(); |
13 | } |
14 | NonCriticalSection2(); |
15 | } |
为了协调不同进程访问同一资源,我们需要用到上面讨论过的概念。很不幸,.NET 中的 monitor 类不可以跨进程使用。但是,win32 API提供的内核信号量对象可以用来实现跨进程同步。 Robin Galloway-Lunn 介绍了怎样将 win32 的信号量映射到 .NET 中(见 Using Win32 Semaphores in C# )。我们的实现也类似:
01 | [DllImport( "kernel32" ,EntryPoint= "CreateSemaphore" , |
02 | SetLastError= true ,CharSet=CharSet.Unicode)] |
03 | internal static extern uint CreateSemaphore( |
04 | SecurityAttributes auth, int initialCount, |
05 | int maximumCount, string name); |
06 |
07 | [DllImport( "kernel32" ,EntryPoint= "WaitForSingleObject" , |
08 | SetLastError= true ,CharSet=CharSet.Unicode)] |
09 | internal static extern uint WaitForSingleObject( |
10 | uint hHandle, uint dwMilliseconds); |
11 |
12 | [DllImport( "kernel32" ,EntryPoint= "ReleaseSemaphore" , |
13 | SetLastError= true ,CharSet=CharSet.Unicode)] |
14 | [ return : MarshalAs( UnmanagedType.VariantBool )] |
15 | internal static extern bool ReleaseSemaphore( |
16 | uint hHandle, int lReleaseCount, out int lpPreviousCount); |
17 | |
18 | [DllImport( "kernel32" ,EntryPoint= "CloseHandle" ,SetLastError= true , |
19 | CharSet=CharSet.Unicode)] |
20 | [ return : MarshalAs( UnmanagedType.VariantBool )] |
21 | internal static extern bool CloseHandle( uint hHandle); |
01 | public class ProcessSemaphore : ISemaphore, IDisposable |
02 | { |
03 | private uint handle; |
04 | private readonly uint interruptReactionTime; |
05 |
06 | public ProcessSemaphore( string name) : this ( |
07 | name,0, int .MaxValue,500) {} |
08 | public ProcessSemaphore( string name, int initial) : this ( |
09 | name,initial, int .MaxValue,500) {} |
10 | public ProcessSemaphore( string name, int initial, |
11 | int max, int interruptReactionTime) |
12 | { |
13 | this .interruptReactionTime = ( uint )interruptReactionTime; |
14 | this .handle = NTKernel.CreateSemaphore( null , initial, max, name); |
15 | if (handle == 0) |
16 | throw new SemaphoreFailedException(); |
17 | } |
18 |
19 | public void Acquire() |
20 | { |
21 | while ( true ) |
22 | { //looped 0.5s timeout to make NT-blocked threads interruptable. |
23 | uint res = NTKernel.WaitForSingleObject(handle, |
24 | interruptReactionTime); |
25 | try {System.Threading.Thread.Sleep(0);} |
26 | catch (System.Threading.ThreadInterruptedException e) |
27 | { |
28 | if (res == 0) |
29 | { //Rollback |
30 | int previousCount; |
31 | NTKernel.ReleaseSemaphore(handle,1, out previousCount); |
32 | } |
33 | throw e; |
34 | } |
35 | if (res == 0) |
36 | return ; |
37 | if (res != 258) |
38 | throw new SemaphoreFailedException(); |
39 | } |
40 | } |
41 |
42 | public void Acquire(TimeSpan timeout) |
43 | { |
44 | uint milliseconds = ( uint )timeout.TotalMilliseconds; |
45 | if (NTKernel.WaitForSingleObject(handle, milliseconds) != 0) |
46 | throw new SemaphoreFailedException(); |
47 | } |
48 |
49 | public void Release() |
50 | { |
51 | int previousCount; |
52 | if (!NTKernel.ReleaseSemaphore(handle, 1, out previousCount)) |
53 | throw new SemaphoreFailedException(); |
54 | } |
55 |
56 | #region IDisposable Member |
57 | public void Dispose() |
58 | { |
59 | if (handle != 0) |
60 | { |
61 | if (NTKernel.CloseHandle(handle)) |
62 | handle = 0; |
63 | } |
64 | } |
65 | #endregion |
66 | } |
有一点很重要:win32中的信号量是可以命名的。这允许其他进程通过名字来创建相应信号量的句柄。为了让阻塞线程可以中断,我们使用了一个(不好)的替代方法:使用超时和 Sleep(0)。我们需要中断来安全关闭线程。更好的做法是:确定没有线程阻塞之后才释放信号量,这样程序才可以完全释放资源并正确退出。
你可能也注意到了:跨线程和跨进程的信号量都使用了相同的接口。所有相关的类都使用了这种模式,以实现上面背景介绍中提到的封闭性。需要注意:出于性能考虑,你不应该将跨进程的信号量用到跨线程的场景,也不应该将跨线程的实现用到单线程的场景。
我们已经实现了跨线程和跨进程的共享资源访问同步。但是传递/接收消息还需要共享资源。对于线程来说,只需要声明一个类成员变量就可以了。但是对于跨进程来说,我们需要使用到 win32 API 提供的内存映射文件(Memory Mapped Files,简称MMF)。使用 MMF和使用 win32 信号量差不多。我们需要先调用 CreateFileMapping 方法来创建一个内存映射文件的句柄:
01 | [DllImport( "Kernel32.dll" ,EntryPoint= "CreateFileMapping" , |
02 | SetLastError= true ,CharSet=CharSet.Unicode)] |
03 | internal static extern IntPtr CreateFileMapping( uint hFile, |
04 | SecurityAttributes lpAttributes, uint flProtect, |
05 | uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName); |
06 | |
07 | [DllImport( "Kernel32.dll" ,EntryPoint= "MapViewOfFile" , |
08 | SetLastError= true ,CharSet=CharSet.Unicode)] |
09 | internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject, |
10 | uint dwDesiredAccess, uint dwFileOffsetHigh, |
11 | uint dwFileOffsetLow, uint dwNumberOfBytesToMap); |
12 | |
13 | [DllImport( "Kernel32.dll" ,EntryPoint= "UnmapViewOfFile" , |
14 | SetLastError= true ,CharSet=CharSet.Unicode)] |
15 | [ return : MarshalAs( UnmanagedType.VariantBool )] |
16 | internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress); |
01 | public static MemoryMappedFile CreateFile( string name, |
02 | FileAccess access, int size) |
03 | { |
04 | if (size < 0) |
05 | throw new ArgumentException( "Size must not be negative" , "size" ); |
06 |
07 | IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu, null , |
08 | ( uint )access,0,( uint )size,name); |
09 | if (fileMapping == IntPtr.Zero) |
10 | throw new MemoryMappingFailedException(); |
11 |
12 | return new MemoryMappedFile(fileMapping,size,access); |
13 | } |
我们希望直接使用 pagefile 中的虚拟文件,所以我们用 -1(0xFFFFFFFF) 来作为文件句柄来创建我们的内存映射文件句柄。我们也指定了必填的文件大小,以及相应的名称。这样其他进程就可以通过这个名称来同时访问该映射文件。创建了内存映射文件后,我们就可以映射这个文件不同的部分(通过偏移量和字节大小来指定)到我们的进程地址空间。我们通过 MapViewOfFile 系统方法来指定:
01 | public MemoryMappedFileView CreateView( int offset, int size, |
02 | MemoryMappedFileView.ViewAccess access) |
03 | { |
04 | if ( this .access == FileAccess.ReadOnly && access == |
05 | MemoryMappedFileView.ViewAccess.ReadWrite) |
06 | throw new ArgumentException( |
07 | "Only read access to views allowed on files without write access" , |
08 | "access" ); |
09 | if (offset < 0) |
10 | throw new ArgumentException( "Offset must not be negative" , "size" ); |
11 | if (size < 0) |
12 | throw new ArgumentException( "Size must not be negative" , "size" ); |
13 | IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping, |
14 | ( uint )access,0,( uint )offset,( uint )size); |
15 | return new MemoryMappedFileView(mappedView,size,access); |
16 | } |
在不安全的代码中,我们可以将返回的指针强制转换成我们指定的类型。尽管如此,我们不希望有不安全的代码存在,所以我们使用 Marshal 类来从中读写我们的数据。偏移量参数是用来从哪里开始读写数据,相对于指定的映射视图的地址。
01 | public byte ReadByte( int offset) |
02 | { |
03 | return Marshal.ReadByte(mappedView,offset); |
04 | } |
05 | public void WriteByte( byte data, int offset) |
06 | { |
07 | Marshal.WriteByte(mappedView,offset,data); |
08 | } |
09 |
10 | public int ReadInt32( int offset) |
11 | { |
12 | return Marshal.ReadInt32(mappedView,offset); |
13 | } |
14 | public void WriteInt32( int data, int offset) |
15 | { |
16 | Marshal.WriteInt32(mappedView,offset,data); |
17 | } |
18 |
19 | public void ReadBytes( byte [] data, int offset) |
20 | { |
21 | for ( int i=0;i<data.Length;i++) |
22 | data[i] = Marshal.ReadByte(mappedView,offset+i); |
23 | } |
24 | public void WriteBytes( byte [] data, int offset) |
25 | { |
26 | for ( int i=0;i<data.Length;i++) |
27 | Marshal.WriteByte(mappedView,offset+i,data[i]); |
28 | } |
但是,我们希望读写整个对象树到文件中,所以我们需要支持自动进行序列化和反序列化的方法。
01 | public object ReadDeserialize( int offset, int length) |
02 | { |
03 | byte [] binaryData = new byte [length]; |
04 | ReadBytes(binaryData,offset); |
05 | System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter |
06 | = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); |
07 | System.IO.MemoryStream ms = new System.IO.MemoryStream( |
08 | binaryData,0,length, true , true ); |
09 | object data = formatter.Deserialize(ms); |
10 | ms.Close(); |
11 | return data; |
12 | } |
13 | public void WriteSerialize( object data, int offset, int length) |
14 | { |
15 | System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter |
16 | = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); |
17 | byte [] binaryData = new byte [length]; |
18 | System.IO.MemoryStream ms = new System.IO.MemoryStream( |
19 | binaryData,0,length, true , true ); |
20 | formatter.Serialize(ms,data); |
21 | ms.Flush(); |
22 | ms.Close(); |
23 | WriteBytes(binaryData,offset); |
24 | } |
请注意:对象序列化之后的大小不应该超过映射视图的大小。序列化之后的大小总是比对象本身占用的内存要大的。我没有试过直接将对象内存流绑定到映射视图,那样做应该也可以,甚至可能带来少量的性能提升。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。