NetMQPubSub.Publisher 1.0.1

There is a newer prerelease version of this package available.
See the version list below for details.
dotnet add package NetMQPubSub.Publisher --version 1.0.1                
NuGet\Install-Package NetMQPubSub.Publisher -Version 1.0.1                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="NetMQPubSub.Publisher" Version="1.0.1" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add NetMQPubSub.Publisher --version 1.0.1                
#r "nuget: NetMQPubSub.Publisher, 1.0.1"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install NetMQPubSub.Publisher as a Cake Addin
#addin nuget:?package=NetMQPubSub.Publisher&version=1.0.1

// Install NetMQPubSub.Publisher as a Cake Tool
#tool nuget:?package=NetMQPubSub.Publisher&version=1.0.1                

NetMQPubSub

NetMQPubSub is a lightweight IPC publish-subscribe library written in NET Core. The underlying IPC transport layer is provided by NetMQ. The number of NetMQ features implemented in NetMQPubSub is limited to only those required to support messaging between a publisher and its subscribers. Currently, this is a small number of features, but we will expand the feature set as needed in the future.

Overview

NetMQPubSub provides support for features such as:

  • NET Standard Library - Supports C#, Android, Linux, macOS, Windows, Library
  • Multiple subscribers, subscribing to different topics.
  • Cancellation of publishers and subscribers using a .NET Core CancellationToken.
  • Can be used by both .NET Core web applications and console applications.
  • Messages can sent/received as strings or entity objects.

License

NetMQPubSub uses the <a href="https://mit-license.org/" target="blank">MIT License</a> model.

Nuget Packages

For most applications you write with NetMQPubSub, you will need to include several nuget packages (along with NetMQ):

Note: Except as noted, the NetMQ IPC transport layer libraries must be included in code projects.

Example Applications

In addition, the repository also includes source code for the following example applications:

  • NetMQPubSub.ConsoleApp - This is a console-based application that demonstrates sending continuous messages from a single publisher to a large number of subscribers. This also demonstrates how to cancel all messaging agents based on a NET Core cancellation token. The source code for this application is included below.
  • NetMQPubSub.WebApp - A web application that demonstrates how to publish a message to a subscriber (within the web application as a service worker) when a web button is pressed. The code provides details on how to configure dependency injection for NET Core.

Community Enhancements

We welcome pull requests from the community to enhance and grow NetMQPubSub.

NetMQPubSub.ConsoleApp

This sample console application creates a single publisher and 10 subscribers. The publisher will publish a message, using one of 10 random topics, every 50ms. Each subscriber is subscribed to a single topic. The application will execute until the Enter key is pressed. This example provides a suitable demonstration of how to leverage a NET Core CancellationToken to stop and dispose of the publisher and all subscribers once the Enter key is pressed. Lastly, it shows the proper way for an application to perform the required NetMQ cleanup when an application shuts down (see the code reference to NetMQPubSubHelper.Cleanup).

namespace NetMQPubSub.ConsoleApp;

using NetMQPubSub.Publisher;
using NetMQPubSub.Common.Helpers;
using NetMQPubSub.Subscriber;
using System;
using System.Linq;
using System.Text.Json;
using NetMQPubSub.Core.Interfaces;

internal class Program
{
	private const int MaximumSubscribers = 10;
	private readonly List<string> topics = Enumerable
		.Range(0, MaximumSubscribers)
		.Select(t => $"Topic{t}")
		.ToList();

	static void Main()
	{
		new Program().Run();
		NetMQPubSubHelper.Cleanup();
	}

	private void Run()
	{
		Console.Write("Press Enter to begin.  Once running, press Enter again to stop.");
		Console.ReadLine();

		var cancellationTokenSource = new CancellationTokenSource();
		var server = Task.Run(() => SubscribersAsync(cancellationTokenSource.Token));
		var client = Task.Run(() => PublisherAsync(cancellationTokenSource.Token));

		Console.ReadLine();

		cancellationTokenSource.Cancel();
		Task.WaitAll(server, client);
	}

	private void SubscribersAsync(CancellationToken cancelToken)
	{
		// or use "inproc://{name}" for in-process (e.g. inproc://job-service)
		var addr = "tcp://localhost:12345";

		var subscriberTasks = new List<Task>();
		for (var i = 0; i < topics.Count; i++)
		{
			var topicIndex = i;
			subscriberTasks.Add(Task.Run(() => RunTopicSubscriberAsync(topics[topicIndex], addr, topicIndex, cancelToken), cancelToken));
		}

		Task.WaitAll(subscriberTasks.ToArray());
	}

	private void PublisherAsync(CancellationToken cancelToken)
	{
		// or use "inproc://{name}" for in-process (e.g. inproc://job-service)
		var addr = "tcp://localhost:12345";

		using IMessagePublisher publisher = new MessagePublisher();
		publisher.Options.SendHighWatermark = 1000;

		Console.WriteLine("Publisher socket binding...");
		publisher.Bind(addr);

		// now that we've bound a socket, give subscriber a bit of time to initialize
		// before we begin sending messages
		Thread.Sleep(1000);

		var counter = 0;
		var rand = new Random();
		do
		{
			var randomizedTopic = rand.Next(this.topics.Count);
			var topic = this.topics[randomizedTopic];
			++counter;

			Console.WriteLine($"==> Sending message for topic \"{topic}\". Message: #{counter}");
			publisher.SendTopicMessage(topic, new TestMessage() { Counter = counter });

			// simulate a bit of processing
			Thread.Sleep(50);

		} while (!cancelToken.IsCancellationRequested);

		publisher.Close(); // also consider Unbind(addr)
		Console.WriteLine($"==> Publisher done!");
	}

	private static void RunTopicSubscriberAsync(string topic, string addr, int id, CancellationToken cancelToken)
	{
		Console.WriteLine($"Subscriber #{id} socket connecting...");

		using IMessageSubscriber subscriber = new MessageSubscriber();
		subscriber.Options.ReceiveHighWatermark = 1000;
		subscriber.Connect(addr);
		subscriber.TopicSubscribe(topic);

		const int maxSecondsDelayBeforeCancelCheck = 2;
		var timeout = TimeSpan.FromSeconds(maxSecondsDelayBeforeCancelCheck);
		do
		{
			if (subscriber.TryReceiveMessage<TestMessage>(timeout, out var messageTopicReceived, out var entityReceived))
			{
				Console.WriteLine($"<== Subscriber #{id} message for topic \"{messageTopicReceived}\". Message: {JsonSerializer.Serialize(entityReceived)}");
			}

		} while (!cancelToken.IsCancellationRequested);

		subscriber.Close(); // also consider Disconnect(addr)
		Console.WriteLine($"<== Subscriber #{id} done!");
	}

	internal class TestMessage
	{
		public int Counter { get; set; }

		public string Name { get; set; }

		public DateTime Now { get; set; }

		public TestMessage()
		{
			var names = new string[] { "Joe", "Sally", "Mary", "Steve", "Iris", "Bob" };
			var random = new Random();
			this.Counter = 0;
			this.Name = names[random.Next(names.Length)];
			this.Now = DateTime.Now;
		}
	}
}
Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.0.2-alpha01 184 12/27/2022
1.0.1 370 12/7/2022
1.0.0 311 12/6/2022