I've been using the publish-subscribe pattern a lot in my applications and I really love it. Many times over the years, this simple yet very powerful messaging pattern helped me keep my code clean and was a key service of the systems I've designed because of the flexibility it offers.
In one of the projects I am working on though, some of the applicative modules of the app have been moved from the host to a remote client. Both are connected to the same network.
Moving a module isn't a big deal in my architecture because of the DDD approach that I started taking months ago. Each module can be considered as a bounded context and is independent.
However, as my messaging service was only capable of dispatching the messages locally, the remote modules cannot listen to the app notifications anymore.
The existing code
The interface of my messaging module is very simple:
public interface IServiceMessaging { void Subscribe<T>(object subscriber, Action<T> handler) where T : new(); void Unsubscribe<T>(object subscriber) where T : new(); Task PublishAsync<T>(T data) where T : new(); }
A client would simply do the following to listen to particular message:
messaging.Subscribe<StatusIoMessage>(this, OnStatusIoMessage); void OnStatusIoMessage(StatusIoMessage message) { Console.WriteLine($"Received : {message.Type} with {message.Symbol} = {message.Value}"); }
At the moment, the implementation of this service is wrapping a 3rd party lib to do the job. The idea is to extend the communication to the remote modules.
SignalR implementation
After looking around for available solutions, SignalR seemed like a good choice to achieve our goal. Other candidates were studied but SignalR stood out for its simplicity and the fact that it is a key component of .NET Core.
On client side, a HubConnection object is needed to start publishing messages to the remote hub. Here is what the code looks like:
private HubConnection BuildConnection() { string url = $"https://{address}:{port}{MessagingHub.MainRoute}"; if (hubConnection == null) { hubConnection = new HubConnectionBuilder() .WithUrl((url)) .AddNewtonsoftJsonProtocol(options => { options.PayloadSerializerSettings.TypeNameHandling = Newtonsoft.Json.TypeNameHandling.Objects; options.PayloadSerializerSettings.TypeNameAssemblyFormatHandling = Newtonsoft.Json.TypeNameAssemblyFormatHandling.Full; options.PayloadSerializerSettings.Converters.Add(new StringEnumConverter()); }) .WithAutomaticReconnect() .Build(); hubConnection.On("__unused__", () => { }); hubConnection.Closed += HubConnection_Closed; hubConnection.Reconnecting += HubConnection_Reconnecting; hubConnection.Reconnected += HubConnection_Reconnected; } return hubConnection; }
Address, port and main route are properties of my messaging class that are read from application's configuration file.
Note that I am using Newtonsoft Json serializer instead of the default Microsoft Json protocol. This is because of the higher configurability of Newtonsoft.
BuildConnection is called at the initialization of my messaging service, immediately followed by a connection to the hub:
hubConnection = BuildConnection(); await hubConnection.StartAsync(token);
The publishing of a message is now possible with:
await hubConnection.SendAsync(TargetNames.Publish, message);
Why TargetNames.Publish ? SignalR relates client/hub calls thanks to what they call "target methods". It is basically a key that is used to route the calls. When sending a message to the hub, this key is one of the hub's method names. Here, TargetNames refers to a static class where I've listed all the magic strings that are needed to do the binding.
public static readonly string Publish = "PublishAsync";
Alright now let's have a look on the hub's side. My hub is running in a self-hosted mode, without IIS. More information here .
The configuration of the hub (MessageHub) is done in the Startup class:
public void ConfigureServices(IServiceCollection services) { services.AddSignalR(options => { options.EnableDetailedErrors = true; }) .AddNewtonsoftJsonProtocol(options => { options.PayloadSerializerSettings.TypeNameHandling = Newtonsoft.Json.TypeNameHandling.Objects; options.PayloadSerializerSettings.TypeNameAssemblyFormatHandling = Newtonsoft.Json.TypeNameAssemblyFormatHandling.Full; options.PayloadSerializerSettings.Converters.Add(new StringEnumConverter()); }); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapHub<MessagingHub>(MessagingHub.MainRoute); }); }
Then when my app starts, I need to build and configure the OWIN host:
host = Host.CreateDefaultBuilder()
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup()
.UseUrls($"https://{ipAddress}:{Port}");
})
.ConfigureServices((context, services) => { services.AddSingleton<MessagingHub>(); })
.Build();
and finally, this is hub's code:
public class MessagingHub : Microsoft.AspNetCore.SignalR.Hub { public async Task PublishAsync(object message) { if (message == null) { throw new ArgumentNullException(nameof(message)); } try { await Clients.Others.SendAsync( TargetNames.GetEventNameFor(message.GetType().Name), message); } catch (Exception e) { OnError?.Invoke($"Failed to publish message with e={e}"); } } }
There you can see the PublishAsync method that will be called by remote clients. Pay attention to the target method name used to call subscribers. In my implementation, I am generating a key based on the type of message being dispatched. GetEventNameFor() only prepends "On" to the type name, but that's just an internal convention of my code. Any key would work on the condition that the same is used by the clients on subscription:
public void Subscribe<T>(object subscriber, Action<T> handler) where T : new() { if (subscriber == null) { throw new ArgumentNullException(nameof(subscriber)); } if (handler == null) { throw new ArgumentNullException(nameof(handler)); } string eventName = TargetNames.GetEventNameFor<T>(); hubConnection.On<T>(eventName, handler); }
Now that everything is wired up, my service can be shared by both remote and local modules.
public void Subscribe<T>(object subscriber, Action<T> handler) where T : new() { if (subscriber == null) { throw new ArgumentNullException(nameof(subscriber)); } if (handler == null) { throw new ArgumentNullException(nameof(handler)); } if (!isSubscribed) { remoteHub.Subscribe<T>(subscriber, handler); localHub.Subscribe<T>(subscriber, handler); } } public void Unsubscribe<T>(object subscriber) where T : new() { if (subscriber == null) { throw new ArgumentNullException(nameof(subscriber)); } if (subscription != null) { remoteHub.Unsubscribe<T>(subscriber); localHub.Unsubscribe<T>(subscriber); } } public async Task PublishAsync<T>(T data) where T : new() { if (data == null) { throw new ArgumentNullException(nameof(data)); } try { await remoteHub.PublishAsync<T>(data); await localHub.PublishAsync<T>(data); } catch (MessagingHubException e) { Log?.Error($"Failed to publish {typeof(T)} with exception={e}"); } }
I have encapsulated the local and the remote implementations in my global messaging service. The components that were using the messaging service are now connected to the remote modules as well.
In my next article I'll introduce how to add a point-to-point messaging capability to this service with SignalR.
0 comments:
New comments are not allowed.