Meshmakers.Octo.Common.DistributionEventHub
3.4.25
dotnet add package Meshmakers.Octo.Common.DistributionEventHub --version 3.4.25
NuGet\Install-Package Meshmakers.Octo.Common.DistributionEventHub -Version 3.4.25
<PackageReference Include="Meshmakers.Octo.Common.DistributionEventHub" Version="3.4.25" />
<PackageVersion Include="Meshmakers.Octo.Common.DistributionEventHub" Version="3.4.25" />
<PackageReference Include="Meshmakers.Octo.Common.DistributionEventHub" />
paket add Meshmakers.Octo.Common.DistributionEventHub --version 3.4.25
#r "nuget: Meshmakers.Octo.Common.DistributionEventHub, 3.4.25"
#:package Meshmakers.Octo.Common.DistributionEventHub@3.4.25
#addin nuget:?package=Meshmakers.Octo.Common.DistributionEventHub&version=3.4.25
#tool nuget:?package=Meshmakers.Octo.Common.DistributionEventHub&version=3.4.25
DistributionEventHub
A distributed event system for OctoMesh services built on MassTransit and RabbitMQ, providing four core messaging patterns with MongoDB-based file caching.
Architecture Overview
The DistributionEventHub is built on MassTransit with RabbitMQ as the transport layer and implements a service-wide event system with four main functions.
Core Features
1. Broadcast Events
Implementation: BroadcastEventConsumerDefinition
- Pattern: Fanout exchange with service-specific queues
- Queue Schema:
octo::service::{ServiceName} - Behavior: Every service receives a copy of the event
- Use Cases: Tenant reload, global configuration changes
// Configuration
config.AddBroadcastEventConsumer<TenantReloadConsumer, TenantReloadEvent>();
// Publishing
await eventHub.PublishAsync(new TenantReloadEvent());
2. Routed Events (Fire & Forget)
Implementation: RoutedEventConsumerDefinition
- Pattern: Direct exchange with message type as routing key
- Queue Schema:
{MessageType.FullName}or explicit address - Load Balancing: Automatic round-robin with multiple consumer instances
- Use Cases: Asynchronous commands without response
// Configuration
config.AddRoutedEventConsumer<DataProcessConsumer, ProcessDataEvent>("data-processor");
// Sending
await eventHub.SendAsync(new Uri("queue:data-processor"), processEvent);
3. CommandClient (Request/Response)
Implementation: CommandClient<TRequest> & RoutedCommandClient<TRequest>
- Pattern: RPC over temporary reply queues
- Transport: Exchange-based with correlation IDs
- Features: Timeout handling, retry mechanism, exception wrapping
- Variants:
CommandClient: Pre-configured for specific commandsRoutedCommandClient: Dynamic target addressing
// Standard CommandClient
config.AddCommandClient<GetUserCommand>("user-service", TimeSpan.FromSeconds(30));
var response = await commandClient.GetResponse<UserResponse>(getUserCmd);
// Routed CommandClient
var response = await routedClient.GetResponse<UserResponse>("user-service", getUserCmd);
4. MongoDB File Caching
Implementation: DistributedCacheService
- Storage: MongoDB GridFS via
IRepositoryClient - Features: TTL support, content-type aware, tenant isolation
- Repository Resolution: Via
ITenantResolver(repository per tenant)
// Upload with TTL
var cacheKey = await cacheService.CreateStreamAsync(
tenantId, fileStream, "application/pdf", "report.pdf", TimeSpan.FromHours(24));
// Download
var cachedFile = await cacheService.GetCacheStreamByIdAsync(tenantId, cacheKey);
Technical Details
Exception Handling
DistributionTimeoutException: Request timeoutsDistributedOperationFailedException: General errors- Automatic exception wrapping for better diagnostics
Consumer Integration
DistributedConsumer<TConsumer, TMessage>: Wrapper for business logicIDistributedContext: Access to MassTransit context- Scoped DI integration for all consumers
Configuration
- Fluent API via
DistributionEventHubConfiguration - Service discovery via
UniqueServiceAddress - Optional Hangfire scheduling for time-based messages
Message Routing
- Broadcast:
exchange:{MessageType}→octo::service::{ServiceName} - Routed:
queue:{DestinationAddress}with round-robin - Commands:
exchange:{CommandName}?temporary=truefor RPC
Getting Started
Basic Setup
services.AddDistributionEventHub(config =>
{
config.UniqueServiceAddress = "my-service";
// Optional: Configure instance prefix for multi-instance deployments
// config.InstancePrefix = "prod"; // Isolates this instance from others
// Add consumers
config.AddBroadcastEventConsumer<MyBroadcastConsumer, MyBroadcastEvent>();
config.AddRoutedEventConsumer<MyRoutedConsumer, MyRoutedEvent>("my-queue");
config.AddCommandConsumer<MyCommandConsumer, MyCommand>("my-command");
// Add clients
config.AddCommandClient<MyCommand>("target-service");
config.AddRoutedCommandClient<MyRoutedCommand>();
});
Multi-Instance Configuration
For deployments where multiple OctoMesh instances need to share the same message broker while maintaining complete isolation, configure the InstancePrefix. There are two ways to set this:
Option 1: Configuration-based (Recommended)
Set the instance prefix in your appsettings.json:
{
"DistributionEventHub": {
"BrokerHost": "localhost",
"BrokerPort": 5672,
"InstancePrefix": "production"
}
}
services.Configure<DistributionEventHubOptions>(options =>
hostContext.Configuration.GetSection("DistributionEventHub").Bind(options));
services.AddDistributionEventHub(config =>
{
config.UniqueServiceAddress = "my-service";
// InstancePrefix is automatically applied from DistributionEventHubOptions
});
Option 2: Code-based Configuration
services.AddDistributionEventHub(config =>
{
config.UniqueServiceAddress = "my-service";
config.InstancePrefix = "production"; // or "dev", "staging", "tenant-a", etc.
// All queue and exchange names will be prefixed with "production:"
// Example: "octo::service::my-service" becomes "production:octo::service::my-service"
});
Instance Isolation Benefits:
- Complete Separation: Different environments (prod/staging/dev) can use the same RabbitMQ cluster
- Tenant Isolation: Multi-tenant deployments with dedicated message spaces
- Blue/Green Deployments: Run multiple versions simultaneously without interference
- Testing Isolation: Integration tests won't interfere with production traffic
Priority: Code-based configuration takes precedence over options-based configuration.
Usage
// Inject services
public class MyService
{
private readonly IDistributionEventHubService _eventHub;
private readonly ICommandClient<MyCommand> _commandClient;
private readonly IDistributedCacheService _cacheService;
// Send broadcast event
await _eventHub.PublishAsync(new MyBroadcastEvent());
// Send routed event
await _eventHub.SendAsync(new Uri("queue:target"), new MyRoutedEvent());
// Execute command
var result = await _commandClient.GetResponse<MyResponse>(new MyCommand());
// Cache file
var cacheKey = await _cacheService.CreateStreamAsync(
tenantId, stream, "application/json", "data.json", TimeSpan.FromMinutes(30));
}
Dependencies
- MassTransit: Message transport abstraction
- RabbitMQ: Message broker
- MongoDB: File caching storage
- Hangfire (optional): Message scheduling
Architecture Benefits
The implementation abstracts complex MassTransit configuration and provides type-safe APIs for all event patterns. It cleanly separates fire-and-forget events from request/response patterns while offering a content-type-aware caching system with automatic cleanup mechanisms.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
| .NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
| .NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
| MonoAndroid | monoandroid was computed. |
| MonoMac | monomac was computed. |
| MonoTouch | monotouch was computed. |
| Tizen | tizen40 was computed. tizen60 was computed. |
| Xamarin.iOS | xamarinios was computed. |
| Xamarin.Mac | xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- MassTransit.Hangfire (>= 8.5.10)
- MassTransit.RabbitMQ (>= 8.5.10)
- Meshmakers.Common.Shared (>= 4.1.38)
- Microsoft.Extensions.DependencyInjection (>= 10.0.9)
- Microsoft.Extensions.Options (>= 10.0.9)
- Newtonsoft.Json (>= 13.0.4)
-
net10.0
- MassTransit.Hangfire (>= 8.5.10)
- MassTransit.RabbitMQ (>= 8.5.10)
- Meshmakers.Common.Shared (>= 4.1.38)
- Microsoft.Extensions.DependencyInjection (>= 10.0.9)
- Microsoft.Extensions.Options (>= 10.0.9)
- Newtonsoft.Json (>= 13.0.4)
NuGet packages (6)
Showing the top 5 NuGet packages that depend on Meshmakers.Octo.Common.DistributionEventHub:
| Package | Downloads |
|---|---|
|
Meshmakers.Octo.Services.Common
Shared Middleware implementations for ASP.NET web API |
|
|
Meshmakers.Octo.Sdk.Common
General classes for communication of tools, sockets and plugs of Octo Mesh |
|
|
Meshmakers.Octo.Common.DistributionEventHub.MongoDB
Package Description |
|
|
Meshmakers.Octo.Services.Contracts
Shared Middleware contracts for OctoMesh Services |
|
|
Meshmakers.Octo.Sdk.Pipeline
ETL pipeline + node framework for OctoMesh adapters. Carved out of Sdk.Common in Phase 3 of the YAML migration; previously Sdk.Common/EtlDataPipeline and Sdk.Common/Services. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 3.4.25 | 0 | 6/29/2026 |
| 3.4.24 | 0 | 6/29/2026 |
| 3.4.22 | 27 | 6/28/2026 |
| 3.4.21 | 58 | 6/28/2026 |
| 3.4.20 | 50 | 6/26/2026 |
| 3.4.17 | 63 | 6/24/2026 |
| 3.4.16 | 332 | 6/18/2026 |
| 3.4.15 | 301 | 6/16/2026 |
| 3.4.14 | 308 | 6/15/2026 |
| 3.4.13 | 295 | 6/15/2026 |
| 3.4.12 | 312 | 6/15/2026 |
| 3.4.11 | 302 | 6/15/2026 |
| 3.4.10 | 208 | 6/13/2026 |
| 3.4.9 | 103 | 6/13/2026 |
| 3.4.7 | 113 | 6/12/2026 |
| 3.4.5 | 106 | 6/12/2026 |
| 3.4.4 | 107 | 6/12/2026 |
| 3.4.3 | 475 | 6/11/2026 |
| 3.4.2 | 263 | 6/11/2026 |
| 3.4.1 | 283 | 6/11/2026 |