Rebus.Redis
0.0.1
dotnet add package Rebus.Redis --version 0.0.1
NuGet\Install-Package Rebus.Redis -Version 0.0.1
<PackageReference Include="Rebus.Redis" Version="0.0.1" />
paket add Rebus.Redis --version 0.0.1
#r "nuget: Rebus.Redis, 0.0.1"
// Install Rebus.Redis as a Cake Addin #addin nuget:?package=Rebus.Redis&version=0.0.1 // Install Rebus.Redis as a Cake Tool #tool nuget:?package=Rebus.Redis&version=0.0.1
Rebus Redis
This library provides Redis support for Saga persistence, outbox, and async messaging.
This is specifically designed to implement scatter/gather commands, where we want to send a single logical command to multiple services in parallel, then return data from one or more of those services to the caller.
There is an existing async library for Rebus which uses the normal Rebus transport to send a reply, however it is marked experimental and the reasoning given for this is quite a reasonable one that durable messages are not suitable for the ephemeral state of an async request. A pending async await is by nature ephemeral, using a persistent queue to send a reply is undesirable. In place of using the normal Rebus transport, this library uses Redis pub/sub to send the reply, only currently subscribed listeners will receive the reply making it well suited for our purposes.
Using async you can make a call like this on the client:
var response = await bus.SendRequest<ReplyMessage>(request);
This will send the message to the server as normal, as well as add a task to the Redis pub/sub subscription. On the server you can then do a simple like this:
await bus.ReplyAsync(replyMessage);
There are also some additional methods to allow flexibility in cases like sagas where the handler is not ready to reply
until some further action is taken. Calling GetReplyContext
in the context of a Redis async request will return a
context to allow you to send a reply at some later date. This context is just an identifier, it is safe to store and can
be added to a saga state, allowing a future message to reply to the original request.
var replyContext = messageContext.GetReplyContext();
await replyContext.ReplyAsync(replyMessage);
In addition, the timeout for the caller is sent along with the request so that the recipient of a message can determine how long the caller will be waiting for a response, which may be useful for cancelling a task or determining whether to send a response to the caller.
// in the client (the default timeout is 15 seconds if not specified)
var response = await bus.SendRequest<ReplyMessage>(request, timeout: TimeSpan.FromSeconds(30));
// on the handler
var timeout = messageContext.GetReplyTimeout();
Async Configuration
To configure async messaging, you need to enable Redis and configure the async messaging. This can be done as follows:
Configure.With(activationHandler)
.Options(o =>
{
o.SetBusName("main");
o.EnableRedis("localhost:6379", r => r.EnableAsync());
})
// ...
By default, both client and server mode will be active. This means that a listener will be started to listen for replies from dispatched requests and that a step handler will be registered to redirect replies sent from a Redis request to the Redis publish channel. If you only want to use async messaging in one direction, you can disable the other mode as follows:
Configure.With(activationHandler)
.Options(o =>
{
o.SetBusName("main");
o.EnableRedis("localhost:6379", r => r.EnableAsync(AsyncMode.Client)); // or AsyncMode.Host
})
// ...
Typically only one service would use Redis async messaging, e.g. a client facing service. If however you need to send replies via Redis from one service to another and if each one has it's own Redis instance, you can configure the replies to be routed based on the sender address. This can be done as follows
Configure.With(activationHandler)
.Options(o =>
{
o.SetBusName("main");
o.EnableRedis("main-redis:6379", r => r.EnableAsync()
.RouteRepliesTo("other-service", "other-redis:6379"));
})
// ...
Note that this impacts only the reply routing, all other Redis components will use the main Redis connection configured
when calling EnableRedis
.
Sage and Subscription Storage and Outbox
This library also provides a Redis implementation of the saga and subscription storage and an outbox implementation modeled after the Postgres implementation in Rebus. The outbox is implemented using Redis streams, sage data is stored using Redis hashes, and subscriptions using Redis lists.
Basic configuration for the saga storage and outbox is as follows:
using var activationHandler = new BuiltinHandlerActivator();
Configure.With(activationHandler)
.Options(o =>
{
o.SetBusName("main");
o.EnableRedis("localhost:6379", r => r.EnableAsync());
})
.Outbox(o => o.StoreInRedis())
.Sagas(s => s.StoreInRedis())
.Subscriptions(s => s.StoreInRedis());
This outbox only makes sense to use when the activity being performed is also stored in Redis, e.g. for sagas that use
Redis storage. Internally the outbox is implemented using Redis streams using the StackExchange Redis client. Currently
due to the way that commands are multiplexed, blocking stream reads are not supported, since this would block all Redis
operations being performed by the application, only polling mode is available. To ensure messages are received as with
as little latency as possible, an additional Redis connection is created for each sender dedicated to listening for the
outgoing messages, using the arbitrary execute command to perform a blocking read. This can be disabled by calling
options.EnableBlockingRead(false);
in the outbox configuration method.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Rebus (>= 8.1.0)
- StackExchange.Redis (>= 2.7.10)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
0.0.1 | 200 | 2/8/2024 |
0.0.0-pre001 | 202 | 11/6/2023 |