Meshmakers.Octo.Common.DistributionEventHub.MongoDB
3.4.22
See the version list below for details.
dotnet add package Meshmakers.Octo.Common.DistributionEventHub.MongoDB --version 3.4.22
NuGet\Install-Package Meshmakers.Octo.Common.DistributionEventHub.MongoDB -Version 3.4.22
<PackageReference Include="Meshmakers.Octo.Common.DistributionEventHub.MongoDB" Version="3.4.22" />
<PackageVersion Include="Meshmakers.Octo.Common.DistributionEventHub.MongoDB" Version="3.4.22" />
<PackageReference Include="Meshmakers.Octo.Common.DistributionEventHub.MongoDB" />
paket add Meshmakers.Octo.Common.DistributionEventHub.MongoDB --version 3.4.22
#r "nuget: Meshmakers.Octo.Common.DistributionEventHub.MongoDB, 3.4.22"
#:package Meshmakers.Octo.Common.DistributionEventHub.MongoDB@3.4.22
#addin nuget:?package=Meshmakers.Octo.Common.DistributionEventHub.MongoDB&version=3.4.22
#tool nuget:?package=Meshmakers.Octo.Common.DistributionEventHub.MongoDB&version=3.4.22
DistributionEventHub
A distributed event system for OctoMesh services built on MassTransit and RabbitMQ, providing four core messaging patterns with MongoDB-based file caching.
Architecture Overview
The DistributionEventHub is built on MassTransit with RabbitMQ as the transport layer and implements a service-wide event system with four main functions.
Core Features
1. Broadcast Events
Implementation: BroadcastEventConsumerDefinition
- Pattern: Fanout exchange with service-specific queues
- Queue Schema:
octo::service::{ServiceName} - Behavior: Every service receives a copy of the event
- Use Cases: Tenant reload, global configuration changes
// Configuration
config.AddBroadcastEventConsumer<TenantReloadConsumer, TenantReloadEvent>();
// Publishing
await eventHub.PublishAsync(new TenantReloadEvent());
2. Routed Events (Fire & Forget)
Implementation: RoutedEventConsumerDefinition
- Pattern: Direct exchange with message type as routing key
- Queue Schema:
{MessageType.FullName}or explicit address - Load Balancing: Automatic round-robin with multiple consumer instances
- Use Cases: Asynchronous commands without response
// Configuration
config.AddRoutedEventConsumer<DataProcessConsumer, ProcessDataEvent>("data-processor");
// Sending
await eventHub.SendAsync(new Uri("queue:data-processor"), processEvent);
3. CommandClient (Request/Response)
Implementation: CommandClient<TRequest> & RoutedCommandClient<TRequest>
- Pattern: RPC over temporary reply queues
- Transport: Exchange-based with correlation IDs
- Features: Timeout handling, retry mechanism, exception wrapping
- Variants:
CommandClient: Pre-configured for specific commandsRoutedCommandClient: Dynamic target addressing
// Standard CommandClient
config.AddCommandClient<GetUserCommand>("user-service", TimeSpan.FromSeconds(30));
var response = await commandClient.GetResponse<UserResponse>(getUserCmd);
// Routed CommandClient
var response = await routedClient.GetResponse<UserResponse>("user-service", getUserCmd);
4. MongoDB File Caching
Implementation: DistributedCacheService
- Storage: MongoDB GridFS via
IRepositoryClient - Features: TTL support, content-type aware, tenant isolation
- Repository Resolution: Via
ITenantResolver(repository per tenant)
// Upload with TTL
var cacheKey = await cacheService.CreateStreamAsync(
tenantId, fileStream, "application/pdf", "report.pdf", TimeSpan.FromHours(24));
// Download
var cachedFile = await cacheService.GetCacheStreamByIdAsync(tenantId, cacheKey);
Technical Details
Exception Handling
DistributionTimeoutException: Request timeoutsDistributedOperationFailedException: General errors- Automatic exception wrapping for better diagnostics
Consumer Integration
DistributedConsumer<TConsumer, TMessage>: Wrapper for business logicIDistributedContext: Access to MassTransit context- Scoped DI integration for all consumers
Configuration
- Fluent API via
DistributionEventHubConfiguration - Service discovery via
UniqueServiceAddress - Optional Hangfire scheduling for time-based messages
Message Routing
- Broadcast:
exchange:{MessageType}→octo::service::{ServiceName} - Routed:
queue:{DestinationAddress}with round-robin - Commands:
exchange:{CommandName}?temporary=truefor RPC
Getting Started
Basic Setup
services.AddDistributionEventHub(config =>
{
config.UniqueServiceAddress = "my-service";
// Optional: Configure instance prefix for multi-instance deployments
// config.InstancePrefix = "prod"; // Isolates this instance from others
// Add consumers
config.AddBroadcastEventConsumer<MyBroadcastConsumer, MyBroadcastEvent>();
config.AddRoutedEventConsumer<MyRoutedConsumer, MyRoutedEvent>("my-queue");
config.AddCommandConsumer<MyCommandConsumer, MyCommand>("my-command");
// Add clients
config.AddCommandClient<MyCommand>("target-service");
config.AddRoutedCommandClient<MyRoutedCommand>();
});
Multi-Instance Configuration
For deployments where multiple OctoMesh instances need to share the same message broker while maintaining complete isolation, configure the InstancePrefix. There are two ways to set this:
Option 1: Configuration-based (Recommended)
Set the instance prefix in your appsettings.json:
{
"DistributionEventHub": {
"BrokerHost": "localhost",
"BrokerPort": 5672,
"InstancePrefix": "production"
}
}
services.Configure<DistributionEventHubOptions>(options =>
hostContext.Configuration.GetSection("DistributionEventHub").Bind(options));
services.AddDistributionEventHub(config =>
{
config.UniqueServiceAddress = "my-service";
// InstancePrefix is automatically applied from DistributionEventHubOptions
});
Option 2: Code-based Configuration
services.AddDistributionEventHub(config =>
{
config.UniqueServiceAddress = "my-service";
config.InstancePrefix = "production"; // or "dev", "staging", "tenant-a", etc.
// All queue and exchange names will be prefixed with "production:"
// Example: "octo::service::my-service" becomes "production:octo::service::my-service"
});
Instance Isolation Benefits:
- Complete Separation: Different environments (prod/staging/dev) can use the same RabbitMQ cluster
- Tenant Isolation: Multi-tenant deployments with dedicated message spaces
- Blue/Green Deployments: Run multiple versions simultaneously without interference
- Testing Isolation: Integration tests won't interfere with production traffic
Priority: Code-based configuration takes precedence over options-based configuration.
Usage
// Inject services
public class MyService
{
private readonly IDistributionEventHubService _eventHub;
private readonly ICommandClient<MyCommand> _commandClient;
private readonly IDistributedCacheService _cacheService;
// Send broadcast event
await _eventHub.PublishAsync(new MyBroadcastEvent());
// Send routed event
await _eventHub.SendAsync(new Uri("queue:target"), new MyRoutedEvent());
// Execute command
var result = await _commandClient.GetResponse<MyResponse>(new MyCommand());
// Cache file
var cacheKey = await _cacheService.CreateStreamAsync(
tenantId, stream, "application/json", "data.json", TimeSpan.FromMinutes(30));
}
Dependencies
- MassTransit: Message transport abstraction
- RabbitMQ: Message broker
- MongoDB: File caching storage
- Hangfire (optional): Message scheduling
Architecture Benefits
The implementation abstracts complex MassTransit configuration and provides type-safe APIs for all event patterns. It cleanly separates fire-and-forget events from request/response patterns while offering a content-type-aware caching system with automatic cleanup mechanisms.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Meshmakers.Octo.Common.DistributionEventHub (>= 3.4.22)
- MongoDB.Driver (>= 3.9.0)
- SharpCompress (>= 1.0.0)
- Snappier (>= 1.3.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 3.4.28 | 0 | 6/29/2026 |
| 3.4.27 | 0 | 6/29/2026 |
| 3.4.25 | 0 | 6/29/2026 |
| 3.4.24 | 30 | 6/29/2026 |
| 3.4.22 | 42 | 6/28/2026 |
| 3.4.21 | 40 | 6/28/2026 |
| 3.4.20 | 50 | 6/26/2026 |
| 3.4.17 | 99 | 6/24/2026 |
| 3.4.16 | 98 | 6/18/2026 |
| 3.4.15 | 98 | 6/16/2026 |
| 3.4.14 | 97 | 6/15/2026 |
| 3.4.13 | 93 | 6/15/2026 |
| 3.4.12 | 97 | 6/15/2026 |
| 3.4.11 | 101 | 6/15/2026 |
| 3.4.10 | 100 | 6/13/2026 |
| 3.4.9 | 97 | 6/13/2026 |
| 3.4.7 | 107 | 6/12/2026 |
| 3.4.5 | 101 | 6/12/2026 |
| 3.4.4 | 98 | 6/12/2026 |
| 3.4.3 | 107 | 6/11/2026 |