nostify 2.9.2
dotnet add package nostify --version 2.9.2
NuGet\Install-Package nostify -Version 2.9.2
<PackageReference Include="nostify" Version="2.9.2" />
<PackageVersion Include="nostify" Version="2.9.2" />
<PackageReference Include="nostify" />
paket add nostify --version 2.9.2
#r "nuget: nostify, 2.9.2"
#addin nuget:?package=nostify&version=2.9.2
#tool nuget:?package=nostify&version=2.9.2
nostify
Dirtball simple, easy to use, scalable event driven microservices framework for .Net.
This framework is intended to simplify the implementation of the ES/CQRS, microservice, and materialized view patterns in a specific tech stack. It also assumes some basic familiarity with domain driven design.
When should I NOT use this?
The framework makes numerous assumptions to cut down on complexity. It has dependencies on Azure components, notably Cosmos. If you need to accommodate a wide range of possible technology stacks, or want lots of flexibility in how to implement, this may not be for you. If you are going to ignore the tech stack requirements there are other libraries you should look at.
When should I use this?
You should consider using this if you are using .Net and Azure and want to follow a strong set of guidelines to quickly and easily spin up services that can massively scale without spending tons of time architecting it yourself.
Current Status
Changes in 2.9
- More effective comparison in
ApplyAndPersistAsync()
- Setting
containerThroughput
to 0 or less (convention should be -1) inWithComos()
to leverage serverless RU provisioning - Fixed bug with setting throughput
- Fixed bug with TryGetValue
- More effective comparison in
Changes in 2.8
- Added
BulkApplyAndPersistAsync()
methods to facilitate bulk changes in Projection event handlers for events not from the base aggregate. ApplyAndPersistAsync<T>()
methods now returnTask<T>
and if awaited return the updated object. This makes it easier to callInitAsync()
on the projection if it requires external data.ApplyAndPersistAsync<T>()
now usesCreateItemAsync()
andPatchItemAsync()
instead ofUpsertItemAsync()
in the background depending if its a create or not, much better performance
- Added
Changes in 2.7
- Added ability to configure using Gateway connection type. Set
useGatewayConnection
totrue
inNostifyFactory.WithCosmos()
- Adds retry to
InitAllUninitialized()
- Added ability to configure using Gateway connection type. Set
Changes in 2.6
- 2.6.3 fixes bug in container creation
- Added feature for "verbose" start up which outputs more information from steps to console
- Added
TryGetValue<T>()
method to extract value from payload if it exists and not throw an error, but instead return false if not - Added manual mapping capability to
UpdateProperties()
so you can now specify how to map property values from payload to Projection if the property names don't match up. Example below will set theExampleProjection.exampleName
property to the value of thepayload.name
property:
Dictionary<string, string> propertyPairs = new Dictionary<string, string>{ {"name", "exampleName"} }; this.UpdateProperties<ExampleProjection>(eventToApply.payload, propertyPairs, true);
Changes in 2.5
- Added new
ExternalDataEvent.GetEventsAsync<T>()
method to help reduce boilerplate code for Projection init
- Added new
Changes in 2.4
- Much improved Kafka config
- Fixed issue with connecting to Kafka
- Auto create topics during start up
- NostifyFactory
Build()
is now a generic, so should be usedBuild<T>()
where T is the base aggregate for the service
Changes in 2.3
- HandleUndeliverableAysnc() has the option to publish an Error event to kafka
- Bulk methods have
batchSize
option for looping through large lists of events, ability to automatically retry on Cosmos 429 failure, and better error handling
Changes in 2.2
- Factory method for building Nostify singleton
- Bulk patching method
- Kafka producer injection
- Use 2.2.2 - 2.2.0, 2.2.1 have bugs
Changes in 2.1
- Way to programatically create containers automatically on start up if needed (for facilitating local development)
- Some documentation updates
Changes in 2.0
- Projection initialization is vastly different/better (breaking change)
- Proper caching of references to CosmosClient and containers speeds up db actions significantly
- Basic validation for create commands
- Leveraging TTL to add delete all in a projection container rather than deleting and recreating container
- More unit tests
Getting Started
To run locally you will need to install some dependencies:
Azurite:
npm install -g azurite
Azurite VS Code Extension: https://marketplace.visualstudio.com/items?itemName=Azurite.azurite
Docker Desktop: https://www.docker.com/products/docker-desktop/
Confluent CLI: https://docs.confluent.io/confluent-cli/current/install.html
- To run:
confluent local kafka start
- To add topic:
confluent local kafka topic create <Topic Name>
- To run:
Cosmos Emulator: https://learn.microsoft.com/en-us/azure/cosmos-db/how-to-develop-emulator?tabs=windows%2Ccsharp&pivots=api-nosql
To install nostify
and templates:
dotnet new install nostify
To spin up a nostify project:
dotnet new nostify -ag <Your_Aggregate_Name> -p <Port Number To Run on Locally>
dotnet restore
This will install the templates, create the default project based off your Aggregate, and install all the necessary libraries.
Architecture
The library is designed to be used in a microservice pattern (although not necessarily required) using an Azure Function App api and Cosmos as the event store. Kafka serves as the messaging backpane, and projections can be stored in Cosmos or Redis depending on query needs.
You should set up a Function App and Cosmos DB per Aggregate Microservice.
Projections that contain data from multiple Aggregates can be updated by Event Handlers from other microservices. Why would this happen? Well say you have a Bank Account record. If we were using a relational database for a data store we'd have to either run two queries or do a join to get the Bank Account and the name of the Account Manager. Using the CQRS model, we can "pre-render" a projection that contains both the account info and the account manager info without having to join tables together. This example is obviously very simple, but in a complex environment where you're joining together dozens of tables to create a DTO to send to the user interface and returning 100's of thousands or millions of records, this type of architecture can dramatically improve BOTH system performance and throughput.
Why????
When is comes to scaling there are two things to consider: speed and throughput. "Speed" meaning the quickness of the individual action, and "throughput" meaning the number of concurrent actions that can be performed at the same time. Using nostify
addresses both of those concerns.
Speed really comes into play only on the query side for most applications. Thats a large part of the concept behind the CQRS pattern. By seperating the command side from the query side you essentially deconstruct the datastore that would traditionally be utilizing a RDBMS in order to create materialized views of various projections of the aggregate. Think of these views as "pre-rendered" views in a traditional relational database. In a traditional database a view simplifies queries but still runs the joins in real time when data is requested. By materializing the view, we denormalize the data and accept the increased complexity associated with keeping the data accurate in order to massively decrease the performance cost of querying that data. In addition, we gain flexibility by being able to appropriately resource each container to give containers being queried the hardest more resources.
Throughput is the other half of the equation. If you were using physical architecture, you'd have an app server talking to a separate database server serving up your application. The app server say has 4 processors with 8 cores each, so there is a limitation on the number of concurrent tasks that can be performed. We can enhance throughput through proper coding, using parallel processing, and non-blocking code, but there is at a certain point a physical limit to the number of things that can be happening at once. With nostify
and the use of Azure Functions, this limitation is removed other than by cost. If 1000 queries hit at the same moment in time, 1000 instances of an Azure Function spin up to handle it. You're limited more by cost than physical hardware.
Concepts
Service
A service in nostify
is a stand alone mini-application that encapsulates a group of aggregates and projections and the logic for handling state changes to them. Generally speaking, a single service should encapsulate the logic for a single domain context if you're embracing the microservices pattern.
A service template will contain the directory structure, class files, and code to handle basic Create, Update, and Delete commands as well as some basic queries out of the box.
Create
dotnet new nostify -ag <Your_Aggregate_Name> -p <Port Number To Run on Locally>
Aggregate
An aggregate encapsulates the logic around the "C" or Command part of the CQRS pattern.
An Aggregate is a cluster of associated objects that we treat as a unit for the purpose of data changes. -Eric Evans
For example a Purchase Requisition might have a id
, pr_number
, and lineItems
properties where lineItems
is a List<LineItem>
type. The aggregate would be composed of multiple types of objects as well as simple types in this case, and the line items of the requisition would never be edited outside the context of their containing req.
In nostify
all state changes coming from the UI are called a NostifyCommand
and must be performed against an aggregate. This is done by composing an Event
, which is written to the event store. This triggers the event getting pushed to the messaging system (Kafka) which the event handler subscribes to. Once triggered the event handler updates the current state container of the aggregate.
Rules
- All state changes must be applied to an aggregate, and not directly to a projection or to an entity within an aggregate.
- Aggregates may only refer to other aggregates by id value.
- An aggregate must implement the
NostifyObject
abstract class and theIAggregate
interface. - All state changes must be applied using the
Apply()
method, either directly or by using theApplyAndPersist()
method. The current state projection of an aggregate is simply the sum of all the events in the event store applied to a new instance of that aggregate object. - In
nostify
only the changed properties should be included when creating anEvent
, best practice is to not send the entire aggregate. - A service generally contains one or more aggregates. A read-only service, such as for BI purposes might not. Domain boundaries should be respected when grouping aggregates together in a service. IE - grouping a
WorkOrder
aggregate in the same service as aWorkOrderStatus
aggregate (which might be an aggregate if your application allows users to add and update them) might make sense, where as putting aPurchaseRequest
and aWorkOrder
in the same service might not. This is an art not a science so do what makes sense to your application. It is theoretically possible to completely abandon the microservice concept and group an entire application into a single service, but probably not a good idea for scalability and maintainability.
Create
A "base" aggregate is created when you use the dotnet cli to create a new nostify
service
dotnet new nostify -ag <Your_Aggregate_Name> -p <Port Number To Run on Locally>
If you are adding a new aggregate to an existing service, run the below cli command from the Aggregates directory.
dotnet new nostifyAggregate -ag <Your_Aggregate_Name>
Projection
A projection encapsulates the logic surrounding the "Q" or Query part of the CQRS pattern.
The current state of an aggregate is the sum of all the events for that aggregate in the event store. However, querying the entire stream of events and applying them every time you want to send the current state of an aggregate to the user interface can be inefficient, even more so if you are trying to compose a DTO with properties from multiple aggregates across services.
The solution to this in nostify
is the projection pattern. A projection defines a set of properties from one or more aggregates and then stores the current state of those properties as updated by the event stream. Every projection will have a "base" aggregate that will be the trigger to create a new persisted instance of the projection, and will define any necessary queries for getting data from external aggregates.
Rules
- No command is ever performed on a projection, they are updated by subscribing to and handling events on their component aggregates.
- A projection will have a "base" aggregate and the
id
property of the projection will match theid
property of the base aggregate. The base aggregate will be the aggregate that the create event is handled, creating a new instance of the projection in the container. - A projection must implement the
ProjectionBaseClass<P,A>
abstract class, whereP
is the concrete class of the projection, andA
is the "base" aggregate, and theIContainerName
interface, most will also need to implementIHasExternalData<P>
. - For projections with data coming from aggregates external to the "base" aggregate,
IHasExternalData.GetExternalDataEvents()
method must be implemented to handle getting events from external sources to apply to the projection. The results of this method are used by theProjectionBaseClass
to update either the external data of a new single projection instance, or many projections when the container is initialized or data is imported.
Create
A projection must be added to an existing service. Base aggregate must already exist. From the Projections directory:
dotnet new nostifyProjection -ag <Base_Aggregate_Name> --projectionName <Projection_Name>
Event
An Event
captures a state change to the application. Generally, this is caused by the user issuing a command such as "save". When a command comes in from the front end to the endpoint, the http triggers a command handler function which validates the command, composes an Event
and persists it to the event store. Note that while a Command
is always an Event
, an Event
is not necessarily always a Command
. It is possible for an Event
to originate elsewhere, say from an IoT device for example.
In a typical scenario, the Event
is created in the command handler, and then saved to the event store:
Event pe = new Event(TestCommand.Create, newId, newTest);
await _nostify.PersistEventAsync(pe);
When a command becomes an Event
the text name of the command becomes the topic name published to Kafka, so for this example the event handler, OnTestCreated
would subscribe to the Create_Test
topic.
Command
A Command
is an Event
that comes from the user interface. All Aggregate
classes should have a matching Command
class where you must register all commands that the user may issue. This class must extend the NostifyCommand
class. It will look like this by default:
public class TestCommand : NostifyCommand
{
///<summary>
///Base Create Command
///</summary>
public static readonly TestCommand Create = new TestCommand("Create_Test", true);
///<summary>
///Base Update Command
///</summary>
public static readonly TestCommand Update = new TestCommand("Update_Test");
///<summary>
///Base Delete Command
///</summary>
public static readonly TestCommand Delete = new TestCommand("Delete_Test");
public TestCommand(string name, bool isNew = false)
: base(name, isNew)
{
}
}
The commands may then be handled in the Apply()
method:
public override void Apply(Event eventToApply)
{
if (eventToApply.command == _ReplaceMe_Command.Create || eventToApply.command == _ReplaceMe_Command.Update)
{
this.UpdateProperties<_ReplaceMe_>(eventToApply.payload);
}
else if (eventToApply.command == _ReplaceMe_Command.Delete)
{
this.isDeleted = true;
}
}
Setup
The template will use dependency injection to add a singleton instance of the Nostify class and adds HttpClient by default. You may need to edit these to match your configuration:
public class Program
{
private static void Main(string[] args)
{
var host = new HostBuilder()
.ConfigureFunctionsWorkerDefaults()
.ConfigureServices((context, services) =>
{
services.AddHttpClient();
var config = context.Configuration;
//Note: This is the api key for the cosmos emulator by default
string apiKey = config.GetValue<string>("apiKey");
string dbName = config.GetValue<string>("dbName");
string endPoint = config.GetValue<string>("endPoint");
string kafka = config.GetValue<string>("BrokerList");
var nostify = NostifyFactory.WithCosmos(apiKey, dbName, endPoint)
.WithKafka(kafka)
.Build<MyAggregate>(); //Where T is the base aggregate of the service
services.AddSingleton<INostify>(nostify);
services.AddLogging();
})
.Build();
host.Run();
}
}
By default, the template will contain the single Aggregate specified. In the Aggregates folder you will find Aggregate and AggregateCommand class files already stubbed out. The AggregateCommand base class contains default implementations for Create, Update, and Delete. The UpdateProperties<T>()
method will update any properties of the Aggregate with the value of the Event payload with the same property name.
public class TestCommand : NostifyCommand
{
///<summary>
///Base Create Command
///</summary>
public static readonly TestCommand Create = new TestCommand("Create_Test", true);
///<summary>
///Base Update Command
///</summary>
public static readonly TestCommand Update = new TestCommand("Update_Test");
///<summary>
///Base Delete Command
///</summary>
public static readonly TestCommand Delete = new TestCommand("Delete_Test");
public TestCommand(string name, bool isNew = false)
: base(name, isNew)
{
}
}
public class Test : NostifyObject, IAggregate
{
public Test()
{
}
public bool isDeleted { get; set; } = false;
public static string aggregateType => "Test";
public override void Apply(Event eventToApply)
{
if (eventToApply.command == TestCommand.Create || eventToApply.command == TestCommand.Update)
{
this.UpdateProperties<Test>(eventToApply.payload);
}
else if (eventToApply.command == TestCommand.Delete)
{
this.isDeleted = true;
}
}
}
Basic Tasks
Initializing Current State Container
The template will include a basic method to create, or recreate the current state container. It might become necesseary to recreate a container if a bug was introduced that corrupted the data, for instance. For the current state of an Aggreate, it is simple to recreate the container from the event stream. You will find the function to do so under Admin.
Querying
There is a Queries folder to contain the queries for the Aggregate. Three basic queries are created when you spin up the template: Get Single GET <AggreateType>/{aggregateId}
, Get All GET <AggregateType>
(note: if this will return large amounts of data you may want to refactor the default query), and Rehydrate GET Rehydrate<AggregateType>/{aggregateId}/{datetime?}
which returns the current state of the Aggregate directly from the event stream to the specified datetime.
To do your own query, simply add a new Azure Function per query, inject HttpClient
and INostify
, grab the container you want to query, and run a query with GetItemLinqQueryable<T>()
using Linq syntax. Below is an example of the basic get single instance query included in the template generation.
public class GetTest
{
private readonly HttpClient _client;
private readonly INostify _nostify;
public GetTest(HttpClient httpClient, INostify nostify)
{
this._client = httpClient;
this._nostify = nostify;
}
[Function(nameof(GetTest))]
public async Task<IActionResult> Run(
[HttpTrigger("get", Route = "Test/{aggregateId:guid}")] HttpRequestData req,
Guid aggregateId,
ILogger log)
{
Container currentStateContainer = await _nostify.GetCurrentStateContainerAsync<Test>();
Test retObj = await currentStateContainer
.GetItemLinqQueryable<Test>()
.Where(x => x.id == aggregateId)
.FirstOrDefaultAsync();
return new OkObjectResult(retObj);
}
}
Create New Aggregate
One of the "out of the box" commands handled by nostify
is the Create_Aggregate
command. The template logic flow is:
- Create command is inbound via http post from user interface. Typically this would indicate the user saved a new record.
- Body of post must contain JSON with all the properties to set on create. The
Apply()
method in the aggregate will callUpdateProperties<T>()
which will match any properties in the JSON to the aggregate and set them automatically. Any properties not in the JSON or properties in the JSON that do not match the aggregate will be ignored. As such it is only necessary to send the properties that you want to set over the wire. - In a typical application there would be validation of the command occuring, validating say that all required properties are set.
nostify
does not dictate a validation pattern, use the one that makes the most sense for your application. There would probably be some kind of auth here as well for most apps. - The command handler creates an
Event
and persists it to the event store. Note that the command registered must indicate a new record is being created by setting theisNew
parameter to true:public static readonly TestCommand Create = new TestCommand("Create_Test", true);
- The event publisher function is triggered by an event being written to the event store and publishes the event to Kafka.
- The create handler that is subscribed to the event will be triggered and update the projection containing the current state for the aggregate.
Other projections added will need to contain their own logic for handling the create event. The event handler for update naming convention is: On<AggregateName>Created
. For example: "OnTestCreated".
Update Aggregate
Updating the aggregate and its base current state projection is also handled by the templates. The template logic flow is:
- Update command comes in via http patch from user interface. Typically this would indicate a user saved an existing record.
- Body of the request must contain a JSON object with a property
id
in the default template. - The default event handler for the current state container will query the container for the aggregate id, get the current state, apply the update, then save the update to the container. This is contained in the
ApplyAndPersistAsync<T>()
method. - Body of patch must contain JSON with all the properties to update. The
Apply()
method in the aggregate will callUpdateProperties<T>()
which will match any properties in the JSON to the aggregate and set them automatically. Any properties not in the JSON or properties in the JSON that do not match the aggregate will be ignored. As such it is only necessary to send the properties that you want to set over the wire. - There may be more than one event to subscribe to to update an aggregate if you implement a more complex object. For instance, if you have a property of
List<T>
you may have another command that is issued from the UI that does an http PUT to replace objects in the list.
The function handling the update event naming convention is: On<AggregateName>Updated
, for example: "OnTestUpdated".
Delete Aggregate
Deleting an aggregate by default does not actually remove it from the current state container, it simple sets the isDeleted
property to true. Template logic flow is:
- Delete command comes in via http delete from user interface. This indicates the user chose to delete an existing record.
- Url must contain a guid that matches the
id
property of the aggregate instance to mark as deleted. - The command handler creates an
Event
and persists it to the event store. - The
Event
is published to Kafka and then the event handler function applies theEvent
to the current state, which sets theisDeleted
property to true. - Queries to the current state container should take into account that it may contain deleted aggregates.
Create New Projection Container
Note: for the following Projection
examples we will be using the following projection TestWithStatus
as an example:
public class TestWithStatus : ProjectionBaseClass<TestWithStatus,Test>, IContainerName, IHasExternalData<TestWithStatus>
{
public TestWithStatus()
{
}
public static string containerName => "TestWithStatus";
public bool isDeleted { get; set; }
//Test properites
public string testName { get; set; }
public Guid? statusId { get; set; }
public Guid? testTypeId { get; set; }
//Status properties
public string? statusName { get; set; }
//Test Type properties
public string? testType { get; set; }
public override void Apply(Event eventToApply)
{
//Should update the command tree below to not use string matching
if (eventToApply.command.name.Equals("Create_Test") || eventToApply.command.name.Equals("Update_Test") || eventToApply.command.name.Equals("Update_Status"))
{
this.UpdateProperties<TestWithStatus>(eventToApply.payload);
}
else if (eventToApply.command.name.Equals("Delete_Test"))
{
this.isDeleted = true;
}
}
public class StatusName
{
public Guid id { get; set; }
public string statusName { get; set; }
}
public async static Task<List<ExternalDataEvent>> GetExternalDataEventsAsync(List<TestWithStatus> projectionsToInit, INostify nostify, HttpClient httpClient = null, DateTime? pointInTime = null)
{
// If data exists within this service, even if a different container, use the container to get the data
Container sameServiceEventStore = await nostify.GetEventStoreContainerAsync();
//Use GetEventsAsync to get events from the same service, the selectors are a parameter list of the properties that are used to filter the events
List<ExternalDataEvent> externalDataEvents = await ExternalDataEvent.GetEventsAsync<TestWithStatus>(sameServiceEventStore,
projectionsToInit,
p => p.testTypeId);
externalDataEvents.AddRange(externalDataEvents);
//Get events from other services via http using the EventRequest endpoint
var response = await httpClient.PostAsync("http://localhost:7071/api/EventRequest",statiToGetEventsFor);
if (!response.IsSuccessStatusCode)
throw new Exception("Something went awry");
List<Event> events = JsonConvert.DeserializeObject<List<Event>>(await response.Content.ReadAsStringAsync());
projectionsToInit.ForEach(p =>{
var events = events.Where(e => e.aggrgateRootId == p.id)
.Select(e => new Event(e.command, e.aggregateRootId, e.payload, e.userId))
.ToList<Event>();
ExternalDataEvent ede = new ExternalDataEvent(p.id, events);
externalDataEvents.Add(ede);
});
return externalDataEvents;
}
}
Note the GetExternalDataEventsAsync()
method. You must implement this such that it returns a List<ExternalDataEvent>
. ExternalDataEvent
contains a Guid
proprty that points at the "base" aggregate id
value, and a List<Event>
property which are all of the events external to the base aggregate that need to be applied to get the projection to the current state (or point in time state if desired).
Create New Projection
A Projection
will have a "base" aggregate when it is defined. Projection create event handlers should subscribe to the create event of the base aggregate.
Adding a new instance of a projection requires implementing the Apply()
method to handle all necessary events, and the GetExternalDataEventsAsync()
method to get events external to the base aggregate when initializing new instances of the projection.
This method is called in the event handler function to update the projection with any exsiting external data and then applied and saved to the projection container along with the Event
signifying the creation of the Test
//Get projection container
Container projectionContainer = await _nostify.GetProjectionContainerAsync<TestWithStatus>();
//Create projection
TestWithStatus proj = new TestWithStatus();
//Apply create event of base aggregate
proj.Apply(testCreated);
//Get external data
Event externalData = await proj.SeedExternalDataAsync(_nostify, _httpClient);
//Update projection container
await projectionContainer.ApplyAndPersistAsync<TestWithStatus>(new List<Event>(){testCreated,externalData});
The naming convention of the event handler is: On<Base Aggregate Name>Created_For_<Projection Name>
. For example: "OnTestCreated_For_TestWithStatus". The create event of the base aggregate is the only event to subscribe to in this case for most implementations.
Update Projection
Updating a Projection
works the same as updating the current state projection of an aggregate, except you're more likely to be subscribing to multiple events and you may be subscribing to various events from multiple services.
For instance, with our TestWithStatus
projection, we will need to subscribe to the update event for both Test
and Status
aggregates to capture and handle them in the Apply()
method, see example above.
This means we will need two event handler functions, OnTestUpdated_For_TestWithStatus
and OnStatusUpdated_For_TestWithStatus
. Note the naming convention.
They will both take in their respective events and update the projection container. Using the ApplyAndPersistAsync()
method for updates to the base aggregate will automatically query the projection using the id
value and call Apply()
then save the updated projection back to the data store.
Delete Projection
For most projections is it appriopriate to delete the item out of the container when the base aggregate is deleted (the isDeleted
property is set to true
).
The naming convention of the event handler is: On<Base Aggregate Name>Deleted_For_<Projection Name>
. For example: "OnTestDeleted_For_TestWithStatus". The delete event of the base aggregate is the only event to subscribe to in this case for most implementations.
Event Handlers
Events are things that have already happened and need to be handled by the system. A Command, as discussed elsewhere in the documentation, becomes an Event after validation. As an example: the user clicks a button, the system issues an http POST to the command endpoint, it passes authentication and authorization, then the data in the body of the http call is validated, then the command code runs to write an Event to the Event Store with the pertinant payload data.
As such there should not be any validation to perform in an Event Handler, the "thing" has already happened. An Event Handler is there to process the Event and perform any necessary state updates such as updating the data stored in a Projection container. After the Event is stored, it is published to the event bus (Kafka being used that way in this case), and then the Event Handlers pick it up by subscribing to the event. Kafka makes sure each Event Handler receives each event it is subscribed to.
The Event should be applied to the object's (Aggregate/Projection) current state by calling the Apply()
method. This method needs to take an Event and change the state of the object based on the Event. A "Create" Event might start with a new instance of the object and then update any properties of the object based off the Event's payload.
Helper Methods
There are a number of methods in the framework to help create Event Handlers. They remove writing a large amount of boilerplate code in most circumstances. There may be Events where the helpers don't really apply, or may be scenarios where you require more performance, but 99% of the time you should leverage these methods to simplify your code. The templates will create Event Handlers with these methods
Create/Update From Single Event
A single "Create" or "Update" Event for an Aggregate or the base Aggregate of a Projection can be handled with a couple lines of code with these helper methods. For an Event Handler for an Aggregate subscribed to a Kafka topic you simply pull the Event out of the triggerEvent, make sure it's not null, then get the container your going to update out of Cosmos and call the ApplyAndPersistAsync<T>([event])
method. These will get automatically created by the template.
public async Task Run([KafkaTrigger(<Trigger Details Here>)] NostifyKafkaTriggerEvent triggerEvent,
ILogger log)
{
Event? newEvent = triggerEvent.GetEvent();
try
{
if (newEvent != null)
{
//Update aggregate current state projection
Container currentStateContainer = await _nostify.GetCurrentStateContainerAsync<InventoryItem>();
await currentStateContainer.ApplyAndPersistAsync<InventoryItem>(newEvent);
}
}
catch (Exception e)
{
await _nostify.HandleUndeliverableAsync(nameof(OnInventoryItemCreated), e.Message, newEvent);
}
}
The ApplyAndPersistAsync()
method will deduce the partition key of the object the Event is being applied to by pulling it from the Event and then either create a new instance of the object if needed or do a point read on the database using the id
Guid property. This keeps RU consumption down and performance high. Once the object has been created or found, the Event is fed into the object's Apply([event])
method, which is specified by the developer for each object type. The results are then saved back into the database, either creating a new document or overwriting the existing one. In the example above errors are handled by writing them off into a seperate undeliverable container. The above example is for an Aggregate, which only need to handle internal data updates, ie - Events that update Aggregates will have all of the data needed to perform the state changes internal to the Event. This may not hold true for Projections, they may require getting additional data external to the Event.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. |
-
net8.0
- Confluent.Kafka (>= 2.6.1)
- JsonDiffPatch.Net (>= 2.3.0)
- Microsoft.AspNetCore.Mvc (>= 2.2.0)
- Microsoft.Azure.Cosmos (>= 3.46.0)
- Microsoft.Azure.Functions.Worker (>= 2.0.0)
- Moq (>= 4.20.69)
- Newtonsoft.Json (>= 13.0.3)
- xunit (>= 2.9.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
2.9.2 | 785 | 2/25/2025 |
2.9.1 | 94 | 2/25/2025 |
2.9.0 | 879 | 2/10/2025 |
2.8.0 | 160 | 2/4/2025 |
2.7.0 | 123 | 1/22/2025 |
2.6.3 | 211 | 1/7/2025 |
2.6.2 | 108 | 1/7/2025 |
2.6.1 | 155 | 12/31/2024 |
2.6.0 | 96 | 12/30/2024 |
2.5.0 | 114 | 12/27/2024 |
2.4.2 | 132 | 12/16/2024 |
2.4.1 | 96 | 12/16/2024 |
2.4.0 | 109 | 12/16/2024 |
2.3.0 | 185 | 12/13/2024 |
2.2.3 | 100 | 12/13/2024 |
2.2.2.1 | 113 | 12/11/2024 |
2.2.2 | 102 | 12/11/2024 |
2.2.1 | 102 | 12/8/2024 |
2.2.0 | 100 | 12/8/2024 |
2.1.0.1 | 107 | 12/6/2024 |
2.1.0 | 117 | 12/5/2024 |
2.0.0.10-alpha | 79 | 12/2/2024 |
2.0.0.9-alpha | 377 | 11/26/2024 |
2.0.0.8-alpha | 92 | 11/22/2024 |
2.0.0.7-alpha | 100 | 11/21/2024 |
2.0.0.6-alpha | 87 | 11/20/2024 |
2.0.0.5-alpha | 76 | 11/20/2024 |
2.0.0.4-alpha | 80 | 11/20/2024 |
2.0.0.3-alpha | 85 | 11/20/2024 |
2.0.0.2-alpha | 81 | 11/20/2024 |
2.0.0.1-alpha | 74 | 11/19/2024 |
2.0.0 | 161 | 12/3/2024 |
2.0.0-alpha | 153 | 11/18/2024 |
1.20.2 | 286 | 10/2/2024 |
1.20.1 | 144 | 8/13/2024 |
1.20.0 | 134 | 8/13/2024 |
1.19.1 | 100 | 7/23/2024 |
1.19.0 | 96 | 7/23/2024 |
1.18.0 | 110 | 7/23/2024 |
1.17.0 | 100 | 7/22/2024 |
1.16.0 | 107 | 7/22/2024 |
1.15.0 | 111 | 7/19/2024 |
1.14.2.2 | 98 | 7/19/2024 |
1.14.2.1 | 163 | 7/19/2024 |
1.14.2 | 114 | 7/15/2024 |
1.14.1 | 115 | 7/15/2024 |
1.14.0.2 | 116 | 5/3/2024 |
1.14.0.1 | 90 | 5/3/2024 |
1.13.0 | 89 | 5/2/2024 |
1.12.4.4 | 103 | 5/1/2024 |
1.12.4.3 | 98 | 5/1/2024 |
1.12.4.2 | 141 | 4/29/2024 |
1.12.4 | 129 | 4/29/2024 |
1.12.3.1 | 137 | 3/22/2024 |
1.12.3 | 124 | 3/21/2024 |
1.12.2 | 122 | 3/21/2024 |
1.12.1 | 127 | 3/20/2024 |
1.12.0 | 121 | 3/20/2024 |
1.11.0.2 | 128 | 3/20/2024 |
1.11.0.1 | 129 | 3/20/2024 |
1.11.0 | 130 | 3/20/2024 |
1.10.0 | 137 | 2/29/2024 |
1.9.1.3 | 245 | 12/1/2023 |
1.9.1.2 | 159 | 11/30/2023 |
1.9.1.1 | 145 | 11/30/2023 |
1.9.0.9 | 153 | 11/29/2023 |
1.9.0.8 | 152 | 11/28/2023 |
1.9.0.7 | 154 | 11/28/2023 |
1.9.0.6 | 145 | 11/28/2023 |
1.9.0.5 | 139 | 11/28/2023 |
1.9.0.4 | 138 | 11/28/2023 |
1.9.0.3 | 148 | 11/28/2023 |
1.9.0.2 | 142 | 11/28/2023 |
1.9.0.1 | 137 | 11/27/2023 |
1.9.0 | 140 | 11/27/2023 |
1.8.9.6 | 134 | 11/21/2023 |
1.8.9.5 | 138 | 11/21/2023 |
1.8.9.4 | 130 | 11/10/2023 |
1.8.9.3 | 148 | 11/4/2023 |
1.8.9.2 | 143 | 11/4/2023 |
1.8.9.1 | 125 | 11/3/2023 |
1.8.9 | 120 | 11/3/2023 |
1.8.8.1 | 146 | 11/2/2023 |
1.8.8 | 144 | 11/2/2023 |
1.8.7.1 | 147 | 11/1/2023 |
1.8.7 | 149 | 10/27/2023 |
1.8.6 | 165 | 10/17/2023 |
1.8.5.3 | 146 | 10/16/2023 |
1.8.5.2 | 164 | 10/16/2023 |
1.8.5.1 | 155 | 10/6/2023 |
1.8.5 | 161 | 10/6/2023 |
1.8.4 | 152 | 10/6/2023 |
1.8.3 | 164 | 10/5/2023 |
1.8.2 | 161 | 10/5/2023 |
1.8.1 | 157 | 10/5/2023 |
1.8.0 | 295 | 2/18/2023 |
1.7.11 | 463 | 5/24/2022 |
1.7.10 | 433 | 5/24/2022 |
1.7.9 | 433 | 5/23/2022 |
1.7.8 | 444 | 5/19/2022 |
1.7.7 | 443 | 5/18/2022 |
1.7.6 | 442 | 5/18/2022 |
1.7.5 | 494 | 4/20/2022 |
1.7.4 | 458 | 4/20/2022 |
1.7.3 | 454 | 4/20/2022 |
1.7.2 | 473 | 4/19/2022 |
1.7.1 | 469 | 4/19/2022 |
1.7.0 | 454 | 3/24/2022 |
1.6.6 | 390 | 8/10/2021 |
1.6.5 | 364 | 8/10/2021 |
1.6.4 | 353 | 8/10/2021 |
1.6.3 | 414 | 6/29/2021 |
1.6.2 | 402 | 6/29/2021 |
1.6.1 | 492 | 6/25/2021 |
1.6.0 | 383 | 6/23/2021 |
1.5.10 | 380 | 6/23/2021 |
1.5.9 | 366 | 2/17/2021 |
1.5.8 | 374 | 2/17/2021 |
1.5.7 | 376 | 2/16/2021 |
1.5.6 | 359 | 2/15/2021 |
1.5.5 | 361 | 2/15/2021 |
1.5.3 | 380 | 2/15/2021 |
1.5.2 | 373 | 2/15/2021 |
1.5.1 | 357 | 2/15/2021 |
1.5.0 | 358 | 2/15/2021 |
1.4.9 | 367 | 2/15/2021 |
1.4.8 | 381 | 2/15/2021 |
1.4.7 | 381 | 2/15/2021 |
1.4.5 | 373 | 2/15/2021 |
1.4.4 | 385 | 2/15/2021 |
1.4.3 | 364 | 2/15/2021 |
1.4.2 | 368 | 2/15/2021 |
1.4.1 | 373 | 2/15/2021 |
1.4.0 | 404 | 2/13/2021 |
1.3.6 | 411 | 2/13/2021 |
1.3.3 | 388 | 2/10/2021 |
1.3.2 | 376 | 2/10/2021 |
1.3.1 | 441 | 2/9/2021 |
1.3.0 | 411 | 2/9/2021 |
1.2.16 | 393 | 2/8/2021 |
1.2.15 | 404 | 2/8/2021 |
1.2.14 | 401 | 2/8/2021 |
1.2.13 | 407 | 2/8/2021 |
1.2.12 | 397 | 1/29/2021 |
1.2.11 | 384 | 1/29/2021 |
1.2.10 | 353 | 1/29/2021 |
1.2.9 | 373 | 1/29/2021 |
1.2.8 | 421 | 1/28/2021 |
1.2.7 | 390 | 1/28/2021 |
1.2.6 | 396 | 1/28/2021 |
1.2.5 | 405 | 1/28/2021 |
1.2.4 | 426 | 1/27/2021 |
1.2.3 | 429 | 1/27/2021 |
1.2.2 | 420 | 1/27/2021 |
1.2.1 | 397 | 1/25/2021 |
1.2.0 | 376 | 1/25/2021 |
1.1.10 | 411 | 1/23/2021 |
1.1.9 | 434 | 1/23/2021 |
1.1.8 | 424 | 1/22/2021 |
1.1.7 | 404 | 1/22/2021 |
1.1.6 | 423 | 1/22/2021 |
1.1.5 | 414 | 1/22/2021 |
1.1.4 | 418 | 1/22/2021 |
1.1.3 | 454 | 1/22/2021 |
1.1.2 | 440 | 1/22/2021 |
1.1.1 | 449 | 1/21/2021 |
1.0.10 | 436 | 1/21/2021 |
1.0.9 | 451 | 11/9/2020 |
1.0.8 | 450 | 11/9/2020 |
1.0.7 | 479 | 11/9/2020 |
1.0.6 | 470 | 11/9/2020 |
1.0.5 | 451 | 11/9/2020 |
1.0.4 | 552 | 11/6/2020 |
1.0.3 | 405 | 11/6/2020 |
1.0.2 | 432 | 11/6/2020 |
1.0.1 | 480 | 11/6/2020 |
1.0.0 | 495 | 11/5/2020 |