Custom Service Synchronization Context



Custom Service Synchronization Context

While synchronization context is a general-purpose pattern, out of the box, .NET 2.0 and WCF only implement a single occurrence of it: the Windows Forms synchronization context. Developing a custom service synchronization context has two aspects. The first is implementing a custom synchronization context, and the second is installing it or even applying it declaratively on the service. The first aspect of implementing a custom synchronization context has nothing to do with WCF and is therefore not discussed in this book. Instead in this section I will use my ready-made AffinitySynchronizer class, defined as:

public class AffinitySynchronizer : SynchronizationContext,IDisposable
{
   public AffinitySynchronizer( );
   public AffinitySynchronizer(string threadName);
   public void Dispose( );
}

AffinitySynchronizer executes all calls marshaled to it on the same private worker thread. When attached to the thread that opens a host, all instances of the service, regardless of instance mode, concurrency mode, endpoints, and contracts, will execute on the same worker thread. The implementation of AffinitySynchronizer is available along with the source code of this book. In a nutshell, AffinitySynchronizer creates a worker thread and maintains a synchronized queue of work items. You can even provide AffinitySynchronizer with a thread name as a constructor parameter for debugging and logging purposes. Unassigned, the worker thread name will default to "AffinitySynchronizer Worker Thread." Each work item in the queue contains a delegate of the type SendOrPostCallback. When the Post( ) or Send( ) methods are called, AffinitySynchronizer wraps a work item around the delegate and posts it to the queue. The worker thread monitors the queue. As long as long there are work items in the queue, the worker thread de-queues the first item in the queue and invokes the delegate. To terminate the worker threads, dispose of the AffinitySynchronizer instance. In a similar manner, you can develop a custom synchronization context that marshals all incoming calls to a private, small pool of threads, where only those threads are allowed to execute the calls.

The second aspect of implementing a custom synchronization contextthat is, installing itserves as a great example for WCF extensibility and as a checklist of points to consider regarding the how and when of installing such extensions.

Thread-Affinity Services

With a WCF service, AffinitySynchronizer is useful when it is the service that creates and then interacts with resources that require thread affinity, such as the TLS. A service that uses AffinitySynchronizer is always thread-safe, since only the internal worker thread of a particular AffinitySynchronizer can ever call it. When the service is configured with ConcurrencyMode.Single, the service gains no additional thread safety because the service instance is single-threaded anyway. You do get double queuing of concurrent calls: all concurrent calls to the service are first queued in the lock's queue, and then are dispatched to the worker thread one at a time. With ConcurrencyMode.Multiple, calls are dispatched to the worker thread's queue as fast as they arrive and are then queued up, later to be invoked in order and never concurrently. Note that if you use a custom synchronization context that marshals its calls to a pool of threads instead of a single thread, ConcurrencyMode.Multiple will yield the best throughput. Finally, with ConcurrencyMode.Reentrant, the service is of course not reentrant, because the incoming reentering call will be queued up and a deadlock would occur. The recommended mode with AffinitySynchronizer is ConcurrencyMode.Single.

Installing a Service Synchronization Context

Figure demonstrates installing AffinitySynchronizer before opening the host so that all instances of the service run on the same thread.

Installing AffinitySynchronizer

SynchronizationContext synchronizationContext = new AffinitySynchronizer( );
SynchronizationContext.SetSynchronizationContext(synchronizationContext);

using(synchronizationContext as IDisposable)
{
   ServiceHost host = new ServiceHost(typeof(MyService));
   host.Open( );
   /* Some blocking operations */
   host.Close( );
}

To attach a synchronization context to the current thread, call the static SetSynchronizationContext( ) method of SynchronizationContext. Once the host is opened, it will use the provided synchronization context. Note in Figure that after closing the host, the example disposes of AffinitySynchronizer to shut down the worker thread used.

You can streamline the code in Figure by encapsulating the installation of AffinitySynchronizer in a custom host, as with my ServiceHost<T>:

public class ServiceHost<T> : ServiceHost
{
   public void SetThreadAffinity(string threadName);
   public void SetThreadAffinity( );
   //More members
}

Using SetThreadAffinity( ) to attach AffinitySynchronizer is straightforward:

ServiceHost<MyService> host = new ServiceHost<MyService>( );
host.SetThreadAffinity( );

host.Open( );

/* Some blocking operations */

host.Close( );

Figure lists the implementation of the SetThreadAffinity( ) methods.

Adding thread affinity support to ServiceHost<T>

public class ServiceHost<T> : ServiceHost
{
   AffinitySynchronizer m_AffinitySynchronizer;

   public void SetThreadAffinity(string threadName)
   {
      if(State == CommunicationState.Opened)
      {
         throw new InvalidOperationException("Host is already opened");
      }
      m_AffinitySynchronizer = new AffinitySynchronizer(threadName);
      SynchronizationContext.SetSynchronizationContext(m_AffinitySynchronizer);
   }
   public void SetThreadAffinity( )
   {
      SetThreadAffinity("Executing all endpoints of " + typeof(T));
   }
   protected override void OnClosing( )
   {
      using(m_AffinitySynchronizer)
      {}
      base.OnClosing( );
   }
   //More members
}

ServiceHost<T> maintains a member variable of the type AffinitySynchronizer and offers two versions of SetThreadAffinity( ). The parameterized one takes the thread name to provide for AffinitySynchronizer's worker thread, and the parameterless SetThreadAffinity( ) calls the other SetThreadAffinity( ) method specifying a thread name inferred from the hosted service type, such as "Executing all endpoints of MyService." SetThreadAffinity( ) first checks that the host is not opened yet, because you can only attach a synchronization context before the host is opened. If the host is not opened, SetThreadAffinity( ) constructs a new AffinitySynchronizer, providing it with the thread name to use and attaches it to the current thread. Finally, ServiceHost<T> overrides its base class OnClosing( ) method in order to call dispose on the AffinitySynchronizer member to shut down its worker thread. Since the AffinitySynchronizer member could be null if no one called SetThreadAffinity( ), OnClosing( ) uses the using statement that internally checks for null assignment before calling Dispose( ).

The ThreadAffinityBehavior attribute

The previous section showed how to install AffinitySynchronizer by the host, regardless of the service configuration. However, if by design the service is required to always execute on the same thread, it is better not to be at the mercy of the host and the thread that happens to open it. Use my ThreadAffinityBehaviorAttribute defined as:

[AttributeUsage(AttributeTargets.Class)]
public class ThreadAffinityBehaviorAttribute : Attribute,
                                               IContractBehavior,IServiceBehavior
{
   public ThreadAffinityBehaviorAttribute(Type serviceType);
   public ThreadAffinityBehaviorAttribute(Type serviceType,string threadName);
   public string ThreadName
   {get;set;}
}

As the name of the attribute implies, ThreadAffinityBehavior provides a local behavior enforcing the fact that all service instances always run on the same thread. The ThreadAffinityBehavior attribute uses my AffinitySynchronizer class internally. When applying the attribute, you need to provide the type of the service and optionally a thread name:

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
[ThreadAffinityBehavior(typeof(MyService))]
class MyService : IMyContract
{...}

The thread name defaults to "Executing all endpoints of <service type>."

The ThreadAffinityBehavior attribute is a custom contract behavior because it implements the interface IContractBehavior, introduced in Chapter 5. IContractBehavior offers the ApplyDispatchBehavior( ) method, allowing you to affect an individual endpoint dispatcher's runtime and set its synchronization context:

public interface IContractBehavior
{
   void ApplyDispatchBehavior(ContractDescription description,
                              ServiceEndpoint endpoint,
                              DispatchRuntime dispatch);
   //More members
}

Each endpoint has its own dispatcher, and each dispatcher has its own synchronization context, so the attribute is instantiated and ApplyDispatchBehavior( ) is called per endpoint. Figure shows most of the implementation of the THReadAffinityBehavior attribute.

Implementing ThreadAffinityBehaviorAttribute

[AttributeUsage(AttributeTargets.Class)]
public class ThreadAffinityBehaviorAttribute : Attribute,
                                               IContractBehavior,
                                               IServiceBehavior
{
   string m_ThreadName;
   Type m_ServiceType;

   public string ThreadName //Accesses m_ThreadName
   {get;set;}

   public ThreadAffinityBehaviorAttribute(Type serviceType) :
                                                     this(serviceType,null)
   {}
   public ThreadAffinityBehaviorAttribute(Type serviceType,string threadName)
   {
      m_ThreadName = threadName;
      m_ServiceType = serviceType;
   }
   void IContractBehavior.ApplyDispatchBehavior(ContractDescription description,
                                                ServiceEndpoint endpoint,
                                                DispatchRuntime dispatch)
   {
      m_ThreadName = m_ThreadName ?? "Executing endpoints of " + m_ServiceType;
      ThreadAffinityHelper.ApplyDispatchBehavior(m_ServiceType,m_ThreadName,
                                                                         dispatch);
   }
   void IContractBehavior.Validate(...)
   {}
   void IContractBehavior.AddBindingParameters(...)
   {}
   void IContractBehavior.ApplyClientBehavior(...)
   {}

   void IServiceBehavior.Validate(ServiceDescription description,
                                  ServiceHostBase serviceHostBase)
   {
      serviceHostBase.Closed += delegate
                                {
                                   ThreadAffinityHelper.CloseThread(m_ServiceType);
                                };
   }
   void IServiceBehavior.AddBindingParameters(...)
   {}
   void IServiceBehavior.ApplyDispatchBehavior(...)
   {}
}

The constructors of the ThreadAffinityBehavior attribute save the provided service type and thread name.

The ApplyDispatchBehavior( ) method in Figure uses the ?? null-coalescing operator (introduced in C# 2.0) to assign a thread name if it needs to. The expression:

m_ThreadName = m_ThreadName ?? "Executing endpoints
                                of " + m_ServiceType;

is shorthand for:

if(m_ThreadName == null)
{
   m_ThreadName = "Executing endpoints of " +
                                       m_ServiceType;
}


The ThreadAffinityBehavior attribute serves as a top-level coordinator, delegating the actual implementation of ApplyDispatchBehavior( ) to the helper static class ThreadAffinityHelper:

public static class ThreadAffinityHelper
{
   internal static void ApplyDispatchBehavior(Type type,string threadName,
                                              DispatchRuntime dispatch)
   public static void CloseThread(Type type);
}

The class ThreadAffinityHelper also offers the static method CloseThread( ) that shuts down the worker thread associated with the service type synchronization context. ThreadAffinityBehavior is also a service behavior. It implements IServiceBehavior so that in its implementation of Validate( ) it can obtain a reference to the service host and subscribe to the Closed event using an anonymous method. That anonymous method shuts down the worker thread by calling ThreadAffinityHelper.CloseThread( ) and providing the service type. Figure shows the implementation of the ThreadAffinityHelper class.

Implementing ThreadAffinityHelper

public static class ThreadAffinityHelper
{
   static Dictionary<Type,AffinitySynchronizer> m_Contexts =
                             new Dictionary<Type,AffinitySynchronizer>( );

   [MethodImpl(MethodImplOptions.Synchronized)]
   internal static void ApplyDispatchBehavior(Type type,string threadName,
                                              DispatchRuntime dispatch)
   {
      Debug.Assert(dispatch.SynchronizationContext == null);

      if(m_Contexts.ContainsKey(type) == false)
      {
         m_Contexts[type] = new AffinitySynchronizer(threadName);
      }
      dispatch.SynchronizationContext = m_Contexts[type];
   }

   [MethodImpl(MethodImplOptions.Synchronized)]
   public static void CloseThread(Type type)
   {
      if(m_Contexts.ContainsKey(type))
      {
         m_Contexts[type].Dispose( );
         m_Contexts.Remove(type);
      }
   }
}

The DispatchRuntime class provides the SynchronizationContext property ThreadAffinityHelper, used to assign a synchronization context for the dispatcher:

public sealed class DispatchRuntime
{
   public SynchronizationContext SynchronizationContext
   {get;set;}
   //More members
}

Before making the assignment, THReadAffinityHelper verifies that the dispatcher has no other synchronization context, since that would indicate some unresolved conflict. The task of ThreadAffinityHelper is to associate all dispatchers of all endpoints of the provided service type with the same instance of a synchronization context. To handle this single-association scenario, THReadAffinityHelper uses a static dictionary internally that maps a type to its synchronization context. THReadAffinityHelper, in ApplyDispatchBehavior( ), checks if the dictionary already contains a synchronization context for the type at hand. If no matching entry is found, THReadAffinityHelper creates a new synchronization context (with the thread name) and adds it to the dictionary. It then looks up in the dictionary the synchronization context for the type and assigns it to the dispatcher. The CloseThread( ) method uses the provided type as a key to look up in the dictionary the associated synchronization context and then dispose of it, thus shutting down the thread. All access to the static dictionary is synchronized declaratively using the MethodImpl attribute with the MethodImplOptions.Synchronized flag.