nostify 2.9.2

dotnet add package nostify --version 2.9.2
                    
NuGet\Install-Package nostify -Version 2.9.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="nostify" Version="2.9.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="nostify" Version="2.9.2" />
                    
Directory.Packages.props
<PackageReference Include="nostify" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add nostify --version 2.9.2
                    
#r "nuget: nostify, 2.9.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#addin nuget:?package=nostify&version=2.9.2
                    
Install nostify as a Cake Addin
#tool nuget:?package=nostify&version=2.9.2
                    
Install nostify as a Cake Tool

nostify

Dirtball simple, easy to use, scalable event driven microservices framework for .Net.

This framework is intended to simplify the implementation of the ES/CQRS, microservice, and materialized view patterns in a specific tech stack. It also assumes some basic familiarity with domain driven design.

When should I NOT use this?

The framework makes numerous assumptions to cut down on complexity. It has dependencies on Azure components, notably Cosmos. If you need to accommodate a wide range of possible technology stacks, or want lots of flexibility in how to implement, this may not be for you. If you are going to ignore the tech stack requirements there are other libraries you should look at.

When should I use this?

You should consider using this if you are using .Net and Azure and want to follow a strong set of guidelines to quickly and easily spin up services that can massively scale without spending tons of time architecting it yourself.

Current Status

  • Changes in 2.9

    • More effective comparison in ApplyAndPersistAsync()
    • Setting containerThroughput to 0 or less (convention should be -1) in WithComos() to leverage serverless RU provisioning
    • Fixed bug with setting throughput
    • Fixed bug with TryGetValue
  • Changes in 2.8

    • Added BulkApplyAndPersistAsync() methods to facilitate bulk changes in Projection event handlers for events not from the base aggregate.
    • ApplyAndPersistAsync<T>() methods now return Task<T> and if awaited return the updated object. This makes it easier to call InitAsync() on the projection if it requires external data.
    • ApplyAndPersistAsync<T>() now uses CreateItemAsync() and PatchItemAsync() instead of UpsertItemAsync() in the background depending if its a create or not, much better performance
  • Changes in 2.7

    • Added ability to configure using Gateway connection type. Set useGatewayConnection to true in NostifyFactory.WithCosmos()
    • Adds retry to InitAllUninitialized()
  • Changes in 2.6

    • 2.6.3 fixes bug in container creation
    • Added feature for "verbose" start up which outputs more information from steps to console
    • Added TryGetValue<T>() method to extract value from payload if it exists and not throw an error, but instead return false if not
    • Added manual mapping capability to UpdateProperties() so you can now specify how to map property values from payload to Projection if the property names don't match up. Example below will set the ExampleProjection.exampleNameproperty to the value of the payload.name property:
      Dictionary<string, string> propertyPairs = new Dictionary<string, string>{
          {"name", "exampleName"}
      };
      this.UpdateProperties<ExampleProjection>(eventToApply.payload, propertyPairs, true);
    
  • Changes in 2.5

    • Added new ExternalDataEvent.GetEventsAsync<T>() method to help reduce boilerplate code for Projection init
  • Changes in 2.4

    • Much improved Kafka config
    • Fixed issue with connecting to Kafka
    • Auto create topics during start up
    • NostifyFactory Build() is now a generic, so should be used Build<T>() where T is the base aggregate for the service
  • Changes in 2.3

    • HandleUndeliverableAysnc() has the option to publish an Error event to kafka
    • Bulk methods have batchSize option for looping through large lists of events, ability to automatically retry on Cosmos 429 failure, and better error handling
  • Changes in 2.2

    • Factory method for building Nostify singleton
    • Bulk patching method
    • Kafka producer injection
    • Use 2.2.2 - 2.2.0, 2.2.1 have bugs
  • Changes in 2.1

    • Way to programatically create containers automatically on start up if needed (for facilitating local development)
    • Some documentation updates
  • Changes in 2.0

    • Projection initialization is vastly different/better (breaking change)
    • Proper caching of references to CosmosClient and containers speeds up db actions significantly
    • Basic validation for create commands
    • Leveraging TTL to add delete all in a projection container rather than deleting and recreating container
    • More unit tests

Getting Started

To run locally you will need to install some dependencies:

To install nostify and templates:

dotnet new install nostify

To spin up a nostify project:

dotnet new nostify -ag <Your_Aggregate_Name> -p <Port Number To Run on Locally>

dotnet restore

This will install the templates, create the default project based off your Aggregate, and install all the necessary libraries.

Architecture

The library is designed to be used in a microservice pattern (although not necessarily required) using an Azure Function App api and Cosmos as the event store. Kafka serves as the messaging backpane, and projections can be stored in Cosmos or Redis depending on query needs.

You should set up a Function App and Cosmos DB per Aggregate Microservice.

image

Projections that contain data from multiple Aggregates can be updated by Event Handlers from other microservices. Why would this happen? Well say you have a Bank Account record. If we were using a relational database for a data store we'd have to either run two queries or do a join to get the Bank Account and the name of the Account Manager. Using the CQRS model, we can "pre-render" a projection that contains both the account info and the account manager info without having to join tables together. This example is obviously very simple, but in a complex environment where you're joining together dozens of tables to create a DTO to send to the user interface and returning 100's of thousands or millions of records, this type of architecture can dramatically improve BOTH system performance and throughput.

image

Why????

When is comes to scaling there are two things to consider: speed and throughput. "Speed" meaning the quickness of the individual action, and "throughput" meaning the number of concurrent actions that can be performed at the same time. Using nostify addresses both of those concerns.

Speed really comes into play only on the query side for most applications. Thats a large part of the concept behind the CQRS pattern. By seperating the command side from the query side you essentially deconstruct the datastore that would traditionally be utilizing a RDBMS in order to create materialized views of various projections of the aggregate. Think of these views as "pre-rendered" views in a traditional relational database. In a traditional database a view simplifies queries but still runs the joins in real time when data is requested. By materializing the view, we denormalize the data and accept the increased complexity associated with keeping the data accurate in order to massively decrease the performance cost of querying that data. In addition, we gain flexibility by being able to appropriately resource each container to give containers being queried the hardest more resources.

Throughput is the other half of the equation. If you were using physical architecture, you'd have an app server talking to a separate database server serving up your application. The app server say has 4 processors with 8 cores each, so there is a limitation on the number of concurrent tasks that can be performed. We can enhance throughput through proper coding, using parallel processing, and non-blocking code, but there is at a certain point a physical limit to the number of things that can be happening at once. With nostify and the use of Azure Functions, this limitation is removed other than by cost. If 1000 queries hit at the same moment in time, 1000 instances of an Azure Function spin up to handle it. You're limited more by cost than physical hardware.

Concepts

Service

A service in nostify is a stand alone mini-application that encapsulates a group of aggregates and projections and the logic for handling state changes to them. Generally speaking, a single service should encapsulate the logic for a single domain context if you're embracing the microservices pattern.

A service template will contain the directory structure, class files, and code to handle basic Create, Update, and Delete commands as well as some basic queries out of the box.

Create

dotnet new nostify -ag <Your_Aggregate_Name> -p <Port Number To Run on Locally>

Aggregate

An aggregate encapsulates the logic around the "C" or Command part of the CQRS pattern.

An Aggregate is a cluster of associated objects that we treat as a unit for the purpose of data changes. -Eric Evans

For example a Purchase Requisition might have a id, pr_number, and lineItems properties where lineItems is a List<LineItem> type. The aggregate would be composed of multiple types of objects as well as simple types in this case, and the line items of the requisition would never be edited outside the context of their containing req.

In nostify all state changes coming from the UI are called a NostifyCommand and must be performed against an aggregate. This is done by composing an Event, which is written to the event store. This triggers the event getting pushed to the messaging system (Kafka) which the event handler subscribes to. Once triggered the event handler updates the current state container of the aggregate.

Rules
  • All state changes must be applied to an aggregate, and not directly to a projection or to an entity within an aggregate.
  • Aggregates may only refer to other aggregates by id value.
  • An aggregate must implement the NostifyObject abstract class and the IAggregate interface.
  • All state changes must be applied using the Apply() method, either directly or by using the ApplyAndPersist() method. The current state projection of an aggregate is simply the sum of all the events in the event store applied to a new instance of that aggregate object.
  • In nostify only the changed properties should be included when creating an Event, best practice is to not send the entire aggregate.
  • A service generally contains one or more aggregates. A read-only service, such as for BI purposes might not. Domain boundaries should be respected when grouping aggregates together in a service. IE - grouping a WorkOrder aggregate in the same service as a WorkOrderStatus aggregate (which might be an aggregate if your application allows users to add and update them) might make sense, where as putting a PurchaseRequest and a WorkOrder in the same service might not. This is an art not a science so do what makes sense to your application. It is theoretically possible to completely abandon the microservice concept and group an entire application into a single service, but probably not a good idea for scalability and maintainability.
Create

A "base" aggregate is created when you use the dotnet cli to create a new nostify service

dotnet new nostify -ag <Your_Aggregate_Name> -p <Port Number To Run on Locally>

If you are adding a new aggregate to an existing service, run the below cli command from the Aggregates directory.

dotnet new nostifyAggregate -ag <Your_Aggregate_Name>

Projection

A projection encapsulates the logic surrounding the "Q" or Query part of the CQRS pattern.

The current state of an aggregate is the sum of all the events for that aggregate in the event store. However, querying the entire stream of events and applying them every time you want to send the current state of an aggregate to the user interface can be inefficient, even more so if you are trying to compose a DTO with properties from multiple aggregates across services.

The solution to this in nostify is the projection pattern. A projection defines a set of properties from one or more aggregates and then stores the current state of those properties as updated by the event stream. Every projection will have a "base" aggregate that will be the trigger to create a new persisted instance of the projection, and will define any necessary queries for getting data from external aggregates.

Rules
  • No command is ever performed on a projection, they are updated by subscribing to and handling events on their component aggregates.
  • A projection will have a "base" aggregate and the id property of the projection will match the id property of the base aggregate. The base aggregate will be the aggregate that the create event is handled, creating a new instance of the projection in the container.
  • A projection must implement the ProjectionBaseClass<P,A> abstract class, where P is the concrete class of the projection, and A is the "base" aggregate, and the IContainerName interface, most will also need to implement IHasExternalData<P>.
  • For projections with data coming from aggregates external to the "base" aggregate, IHasExternalData.GetExternalDataEvents() method must be implemented to handle getting events from external sources to apply to the projection. The results of this method are used by the ProjectionBaseClass to update either the external data of a new single projection instance, or many projections when the container is initialized or data is imported.
Create

A projection must be added to an existing service. Base aggregate must already exist. From the Projections directory:

dotnet new nostifyProjection -ag <Base_Aggregate_Name> --projectionName <Projection_Name>

Event

An Event captures a state change to the application. Generally, this is caused by the user issuing a command such as "save". When a command comes in from the front end to the endpoint, the http triggers a command handler function which validates the command, composes an Event and persists it to the event store. Note that while a Command is always an Event, an Event is not necessarily always a Command. It is possible for an Event to originate elsewhere, say from an IoT device for example.

In a typical scenario, the Event is created in the command handler, and then saved to the event store:

Event pe = new Event(TestCommand.Create, newId, newTest);
await _nostify.PersistEventAsync(pe);

When a command becomes an Event the text name of the command becomes the topic name published to Kafka, so for this example the event handler, OnTestCreated would subscribe to the Create_Test topic.

Command

A Command is an Event that comes from the user interface. All Aggregate classes should have a matching Command class where you must register all commands that the user may issue. This class must extend the NostifyCommand class. It will look like this by default:

public class TestCommand : NostifyCommand
{
  ///<summary>
  ///Base Create Command
  ///</summary>
  public static readonly TestCommand Create = new TestCommand("Create_Test", true);
  ///<summary>
  ///Base Update Command
  ///</summary>
  public static readonly TestCommand Update = new TestCommand("Update_Test");
  ///<summary>
  ///Base Delete Command
  ///</summary>
  public static readonly TestCommand Delete = new TestCommand("Delete_Test");


  public TestCommand(string name, bool isNew = false)
  : base(name, isNew)
  {

  }
}

The commands may then be handled in the Apply() method:

public override void Apply(Event eventToApply)
{
    if (eventToApply.command == _ReplaceMe_Command.Create || eventToApply.command == _ReplaceMe_Command.Update)
    {
        this.UpdateProperties<_ReplaceMe_>(eventToApply.payload);
    }
    else if (eventToApply.command == _ReplaceMe_Command.Delete)
    {
        this.isDeleted = true;
    }
}

Setup

The template will use dependency injection to add a singleton instance of the Nostify class and adds HttpClient by default. You may need to edit these to match your configuration:


public  class  Program
{

  private  static  void  Main(string[] args)
  {
    var host = new  HostBuilder()
    .ConfigureFunctionsWorkerDefaults()
    .ConfigureServices((context, services) =>
    {
      services.AddHttpClient();   

      var config = context.Configuration;

      //Note: This is the api key for the cosmos emulator by default
      string apiKey = config.GetValue<string>("apiKey");
      string dbName = config.GetValue<string>("dbName");
      string endPoint = config.GetValue<string>("endPoint");
      string kafka = config.GetValue<string>("BrokerList");

      var nostify = NostifyFactory.WithCosmos(apiKey, dbName, endPoint)
                          .WithKafka(kafka)
                          .Build<MyAggregate>(); //Where T is the base aggregate of the service

      services.AddSingleton<INostify>(nostify);

      services.AddLogging();
    })
    .Build();
    
    host.Run();
  }
}

By default, the template will contain the single Aggregate specified. In the Aggregates folder you will find Aggregate and AggregateCommand class files already stubbed out. The AggregateCommand base class contains default implementations for Create, Update, and Delete. The UpdateProperties<T>() method will update any properties of the Aggregate with the value of the Event payload with the same property name.


public  class  TestCommand : NostifyCommand
{
  ///<summary>
  ///Base Create Command
  ///</summary>
  public  static  readonly  TestCommand  Create = new  TestCommand("Create_Test", true);
  ///<summary>
  ///Base Update Command
  ///</summary>
  public  static  readonly  TestCommand  Update = new  TestCommand("Update_Test");
  ///<summary>
  ///Base Delete Command
  ///</summary>
  public  static  readonly  TestCommand  Delete = new  TestCommand("Delete_Test");    

  public  TestCommand(string  name, bool  isNew = false)
  : base(name, isNew)
  {
  }

}

public  class  Test : NostifyObject, IAggregate
{

  public  Test()
  {
  }   

  public  bool  isDeleted { get; set; } = false;  

  public  static  string  aggregateType => "Test";


  public  override  void  Apply(Event  eventToApply)
  {
    if (eventToApply.command == TestCommand.Create || eventToApply.command == TestCommand.Update)
    {
      this.UpdateProperties<Test>(eventToApply.payload);
    }
    else  if (eventToApply.command == TestCommand.Delete)
    {
      this.isDeleted = true;
    }
  }
}

  

Basic Tasks

Initializing Current State Container

The template will include a basic method to create, or recreate the current state container. It might become necesseary to recreate a container if a bug was introduced that corrupted the data, for instance. For the current state of an Aggreate, it is simple to recreate the container from the event stream. You will find the function to do so under Admin.

Querying

There is a Queries folder to contain the queries for the Aggregate. Three basic queries are created when you spin up the template: Get Single GET <AggreateType>/{aggregateId}, Get All GET <AggregateType> (note: if this will return large amounts of data you may want to refactor the default query), and Rehydrate GET Rehydrate<AggregateType>/{aggregateId}/{datetime?} which returns the current state of the Aggregate directly from the event stream to the specified datetime.

To do your own query, simply add a new Azure Function per query, inject HttpClient and INostify, grab the container you want to query, and run a query with GetItemLinqQueryable<T>() using Linq syntax. Below is an example of the basic get single instance query included in the template generation.

public  class  GetTest
{
  private  readonly  HttpClient  _client;
  private  readonly  INostify  _nostify;

  public  GetTest(HttpClient  httpClient, INostify  nostify)
  {
    this._client = httpClient;
    this._nostify = nostify;
  }    

  [Function(nameof(GetTest))]
  public  async  Task<IActionResult> Run(
  [HttpTrigger("get", Route = "Test/{aggregateId:guid}")] HttpRequestData  req,
  Guid  aggregateId,
  ILogger  log)
  {
    Container  currentStateContainer = await  _nostify.GetCurrentStateContainerAsync<Test>();

    Test  retObj = await  currentStateContainer
                          .GetItemLinqQueryable<Test>()
                          .Where(x => x.id == aggregateId)
                          .FirstOrDefaultAsync();

    return  new  OkObjectResult(retObj);
  }
}

Create New Aggregate

One of the "out of the box" commands handled by nostify is the Create_Aggregate command. The template logic flow is:

  • Create command is inbound via http post from user interface. Typically this would indicate the user saved a new record.
  • Body of post must contain JSON with all the properties to set on create. The Apply() method in the aggregate will call UpdateProperties<T>() which will match any properties in the JSON to the aggregate and set them automatically. Any properties not in the JSON or properties in the JSON that do not match the aggregate will be ignored. As such it is only necessary to send the properties that you want to set over the wire.
  • In a typical application there would be validation of the command occuring, validating say that all required properties are set. nostify does not dictate a validation pattern, use the one that makes the most sense for your application. There would probably be some kind of auth here as well for most apps.
  • The command handler creates an Event and persists it to the event store. Note that the command registered must indicate a new record is being created by setting the isNew parameter to true: public static readonly TestCommand Create = new TestCommand("Create_Test", true);
  • The event publisher function is triggered by an event being written to the event store and publishes the event to Kafka.
  • The create handler that is subscribed to the event will be triggered and update the projection containing the current state for the aggregate.

Other projections added will need to contain their own logic for handling the create event. The event handler for update naming convention is: On<AggregateName>Created. For example: "OnTestCreated".

Update Aggregate

Updating the aggregate and its base current state projection is also handled by the templates. The template logic flow is:

  • Update command comes in via http patch from user interface. Typically this would indicate a user saved an existing record.
  • Body of the request must contain a JSON object with a property id in the default template.
  • The default event handler for the current state container will query the container for the aggregate id, get the current state, apply the update, then save the update to the container. This is contained in the ApplyAndPersistAsync<T>() method.
  • Body of patch must contain JSON with all the properties to update. The Apply() method in the aggregate will call UpdateProperties<T>() which will match any properties in the JSON to the aggregate and set them automatically. Any properties not in the JSON or properties in the JSON that do not match the aggregate will be ignored. As such it is only necessary to send the properties that you want to set over the wire.
  • There may be more than one event to subscribe to to update an aggregate if you implement a more complex object. For instance, if you have a property of List<T> you may have another command that is issued from the UI that does an http PUT to replace objects in the list.

The function handling the update event naming convention is: On<AggregateName>Updated, for example: "OnTestUpdated".

Delete Aggregate

Deleting an aggregate by default does not actually remove it from the current state container, it simple sets the isDeleted property to true. Template logic flow is:

  • Delete command comes in via http delete from user interface. This indicates the user chose to delete an existing record.
  • Url must contain a guid that matches the id property of the aggregate instance to mark as deleted.
  • The command handler creates an Event and persists it to the event store.
  • The Event is published to Kafka and then the event handler function applies the Event to the current state, which sets the isDeleted property to true.
  • Queries to the current state container should take into account that it may contain deleted aggregates.

Create New Projection Container

Note: for the following Projection examples we will be using the following projection TestWithStatus as an example:

public class TestWithStatus : ProjectionBaseClass<TestWithStatus,Test>, IContainerName, IHasExternalData<TestWithStatus>
{
  public TestWithStatus()
  {

  }

  public static string containerName => "TestWithStatus";

  public bool isDeleted { get; set; }

  //Test properites
  public string testName { get; set; }
  public Guid? statusId { get; set; }
  public Guid? testTypeId { get; set; }

  //Status properties
  public string? statusName { get; set; }

  //Test Type properties
  public string? testType { get; set; }


  public override void Apply(Event eventToApply)
  {
      //Should update the command tree below to not use string matching
      if (eventToApply.command.name.Equals("Create_Test") || eventToApply.command.name.Equals("Update_Test") || eventToApply.command.name.Equals("Update_Status"))
      {
          this.UpdateProperties<TestWithStatus>(eventToApply.payload);
      }
      else if (eventToApply.command.name.Equals("Delete_Test"))
      {
          this.isDeleted = true;
      }
  }

  public class StatusName
  {
      public Guid id { get; set; }
      public string statusName { get; set; }
  }

  public async static Task<List<ExternalDataEvent>> GetExternalDataEventsAsync(List<TestWithStatus> projectionsToInit, INostify nostify, HttpClient httpClient = null, DateTime? pointInTime = null)
  {
    // If data exists within this service, even if a different container, use the container to get the data
    Container sameServiceEventStore = await nostify.GetEventStoreContainerAsync();
    
    //Use GetEventsAsync to get events from the same service, the selectors are a parameter list of the properties that are used to filter the events
    List<ExternalDataEvent> externalDataEvents = await ExternalDataEvent.GetEventsAsync<TestWithStatus>(sameServiceEventStore, 
        projectionsToInit, 
        p => p.testTypeId);

    externalDataEvents.AddRange(externalDataEvents);

    //Get events from other services via http using the EventRequest endpoint
    var response = await httpClient.PostAsync("http://localhost:7071/api/EventRequest",statiToGetEventsFor);
    if (!response.IsSuccessStatusCode)
      throw new Exception("Something went awry");

    List<Event> events = JsonConvert.DeserializeObject<List<Event>>(await response.Content.ReadAsStringAsync());

    projectionsToInit.ForEach(p =>{
        var events = events.Where(e => e.aggrgateRootId == p.id)
          .Select(e => new Event(e.command, e.aggregateRootId, e.payload, e.userId))
          .ToList<Event>();
        ExternalDataEvent ede = new ExternalDataEvent(p.id, events);
        externalDataEvents.Add(ede);
    });

    return externalDataEvents;
  }

}

Note the GetExternalDataEventsAsync() method. You must implement this such that it returns a List<ExternalDataEvent>. ExternalDataEvent contains a Guid proprty that points at the "base" aggregate id value, and a List<Event> property which are all of the events external to the base aggregate that need to be applied to get the projection to the current state (or point in time state if desired).

Create New Projection

A Projection will have a "base" aggregate when it is defined. Projection create event handlers should subscribe to the create event of the base aggregate.

Adding a new instance of a projection requires implementing the Apply() method to handle all necessary events, and the GetExternalDataEventsAsync() method to get events external to the base aggregate when initializing new instances of the projection.

This method is called in the event handler function to update the projection with any exsiting external data and then applied and saved to the projection container along with the Event signifying the creation of the Test

 //Get projection container
  Container projectionContainer = await _nostify.GetProjectionContainerAsync<TestWithStatus>();

  //Create projection
  TestWithStatus proj = new TestWithStatus();
  //Apply create event of base aggregate
  proj.Apply(testCreated);
  //Get external data
  Event externalData = await proj.SeedExternalDataAsync(_nostify, _httpClient);
  //Update projection container
  await projectionContainer.ApplyAndPersistAsync<TestWithStatus>(new List<Event>(){testCreated,externalData});

The naming convention of the event handler is: On<Base Aggregate Name>Created_For_<Projection Name>. For example: "OnTestCreated_For_TestWithStatus". The create event of the base aggregate is the only event to subscribe to in this case for most implementations.

Update Projection

Updating a Projection works the same as updating the current state projection of an aggregate, except you're more likely to be subscribing to multiple events and you may be subscribing to various events from multiple services.

For instance, with our TestWithStatus projection, we will need to subscribe to the update event for both Test and Status aggregates to capture and handle them in the Apply() method, see example above.

This means we will need two event handler functions, OnTestUpdated_For_TestWithStatus and OnStatusUpdated_For_TestWithStatus. Note the naming convention.

They will both take in their respective events and update the projection container. Using the ApplyAndPersistAsync() method for updates to the base aggregate will automatically query the projection using the id value and call Apply() then save the updated projection back to the data store.

Delete Projection

For most projections is it appriopriate to delete the item out of the container when the base aggregate is deleted (the isDeleted property is set to true).

The naming convention of the event handler is: On<Base Aggregate Name>Deleted_For_<Projection Name>. For example: "OnTestDeleted_For_TestWithStatus". The delete event of the base aggregate is the only event to subscribe to in this case for most implementations.

Event Handlers

Events are things that have already happened and need to be handled by the system. A Command, as discussed elsewhere in the documentation, becomes an Event after validation. As an example: the user clicks a button, the system issues an http POST to the command endpoint, it passes authentication and authorization, then the data in the body of the http call is validated, then the command code runs to write an Event to the Event Store with the pertinant payload data.

As such there should not be any validation to perform in an Event Handler, the "thing" has already happened. An Event Handler is there to process the Event and perform any necessary state updates such as updating the data stored in a Projection container. After the Event is stored, it is published to the event bus (Kafka being used that way in this case), and then the Event Handlers pick it up by subscribing to the event. Kafka makes sure each Event Handler receives each event it is subscribed to.

The Event should be applied to the object's (Aggregate/Projection) current state by calling the Apply() method. This method needs to take an Event and change the state of the object based on the Event. A "Create" Event might start with a new instance of the object and then update any properties of the object based off the Event's payload.

Helper Methods

There are a number of methods in the framework to help create Event Handlers. They remove writing a large amount of boilerplate code in most circumstances. There may be Events where the helpers don't really apply, or may be scenarios where you require more performance, but 99% of the time you should leverage these methods to simplify your code. The templates will create Event Handlers with these methods

Create/Update From Single Event

A single "Create" or "Update" Event for an Aggregate or the base Aggregate of a Projection can be handled with a couple lines of code with these helper methods. For an Event Handler for an Aggregate subscribed to a Kafka topic you simply pull the Event out of the triggerEvent, make sure it's not null, then get the container your going to update out of Cosmos and call the ApplyAndPersistAsync<T>([event]) method. These will get automatically created by the template.

public async Task Run([KafkaTrigger(<Trigger Details Here>)] NostifyKafkaTriggerEvent triggerEvent,
    ILogger log)
{
    Event? newEvent = triggerEvent.GetEvent();
    try
    {
        if (newEvent != null)
        {
            //Update aggregate current state projection
            Container currentStateContainer = await _nostify.GetCurrentStateContainerAsync<InventoryItem>();
            await currentStateContainer.ApplyAndPersistAsync<InventoryItem>(newEvent);
        }                           
    }
    catch (Exception e)
    {
        await _nostify.HandleUndeliverableAsync(nameof(OnInventoryItemCreated), e.Message, newEvent);
    }

    
}

The ApplyAndPersistAsync() method will deduce the partition key of the object the Event is being applied to by pulling it from the Event and then either create a new instance of the object if needed or do a point read on the database using the id Guid property. This keeps RU consumption down and performance high. Once the object has been created or found, the Event is fed into the object's Apply([event]) method, which is specified by the developer for each object type. The results are then saved back into the database, either creating a new document or overwriting the existing one. In the example above errors are handled by writing them off into a seperate undeliverable container. The above example is for an Aggregate, which only need to handle internal data updates, ie - Events that update Aggregates will have all of the data needed to perform the state changes internal to the Event. This may not hold true for Projections, they may require getting additional data external to the Event.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
2.9.2 785 2/25/2025
2.9.1 94 2/25/2025
2.9.0 879 2/10/2025
2.8.0 160 2/4/2025
2.7.0 123 1/22/2025
2.6.3 211 1/7/2025
2.6.2 108 1/7/2025
2.6.1 155 12/31/2024
2.6.0 96 12/30/2024
2.5.0 114 12/27/2024
2.4.2 132 12/16/2024
2.4.1 96 12/16/2024
2.4.0 109 12/16/2024
2.3.0 185 12/13/2024
2.2.3 100 12/13/2024
2.2.2.1 113 12/11/2024
2.2.2 102 12/11/2024
2.2.1 102 12/8/2024
2.2.0 100 12/8/2024
2.1.0.1 107 12/6/2024
2.1.0 117 12/5/2024
2.0.0.10-alpha 79 12/2/2024
2.0.0.9-alpha 377 11/26/2024
2.0.0.8-alpha 92 11/22/2024
2.0.0.7-alpha 100 11/21/2024
2.0.0.6-alpha 87 11/20/2024
2.0.0.5-alpha 76 11/20/2024
2.0.0.4-alpha 80 11/20/2024
2.0.0.3-alpha 85 11/20/2024
2.0.0.2-alpha 81 11/20/2024
2.0.0.1-alpha 74 11/19/2024
2.0.0 161 12/3/2024
2.0.0-alpha 153 11/18/2024
1.20.2 286 10/2/2024
1.20.1 144 8/13/2024
1.20.0 134 8/13/2024
1.19.1 100 7/23/2024
1.19.0 96 7/23/2024
1.18.0 110 7/23/2024
1.17.0 100 7/22/2024
1.16.0 107 7/22/2024
1.15.0 111 7/19/2024
1.14.2.2 98 7/19/2024
1.14.2.1 163 7/19/2024
1.14.2 114 7/15/2024
1.14.1 115 7/15/2024
1.14.0.2 116 5/3/2024
1.14.0.1 90 5/3/2024
1.13.0 89 5/2/2024
1.12.4.4 103 5/1/2024
1.12.4.3 98 5/1/2024
1.12.4.2 141 4/29/2024
1.12.4 129 4/29/2024
1.12.3.1 137 3/22/2024
1.12.3 124 3/21/2024
1.12.2 122 3/21/2024
1.12.1 127 3/20/2024
1.12.0 121 3/20/2024
1.11.0.2 128 3/20/2024
1.11.0.1 129 3/20/2024
1.11.0 130 3/20/2024
1.10.0 137 2/29/2024
1.9.1.3 245 12/1/2023
1.9.1.2 159 11/30/2023
1.9.1.1 145 11/30/2023
1.9.0.9 153 11/29/2023
1.9.0.8 152 11/28/2023
1.9.0.7 154 11/28/2023
1.9.0.6 145 11/28/2023
1.9.0.5 139 11/28/2023
1.9.0.4 138 11/28/2023
1.9.0.3 148 11/28/2023
1.9.0.2 142 11/28/2023
1.9.0.1 137 11/27/2023
1.9.0 140 11/27/2023
1.8.9.6 134 11/21/2023
1.8.9.5 138 11/21/2023
1.8.9.4 130 11/10/2023
1.8.9.3 148 11/4/2023
1.8.9.2 143 11/4/2023
1.8.9.1 125 11/3/2023
1.8.9 120 11/3/2023
1.8.8.1 146 11/2/2023
1.8.8 144 11/2/2023
1.8.7.1 147 11/1/2023
1.8.7 149 10/27/2023
1.8.6 165 10/17/2023
1.8.5.3 146 10/16/2023
1.8.5.2 164 10/16/2023
1.8.5.1 155 10/6/2023
1.8.5 161 10/6/2023
1.8.4 152 10/6/2023
1.8.3 164 10/5/2023
1.8.2 161 10/5/2023
1.8.1 157 10/5/2023
1.8.0 295 2/18/2023
1.7.11 463 5/24/2022
1.7.10 433 5/24/2022
1.7.9 433 5/23/2022
1.7.8 444 5/19/2022
1.7.7 443 5/18/2022
1.7.6 442 5/18/2022
1.7.5 494 4/20/2022
1.7.4 458 4/20/2022
1.7.3 454 4/20/2022
1.7.2 473 4/19/2022
1.7.1 469 4/19/2022
1.7.0 454 3/24/2022
1.6.6 390 8/10/2021
1.6.5 364 8/10/2021
1.6.4 353 8/10/2021
1.6.3 414 6/29/2021
1.6.2 402 6/29/2021
1.6.1 492 6/25/2021
1.6.0 383 6/23/2021
1.5.10 380 6/23/2021
1.5.9 366 2/17/2021
1.5.8 374 2/17/2021
1.5.7 376 2/16/2021
1.5.6 359 2/15/2021
1.5.5 361 2/15/2021
1.5.3 380 2/15/2021
1.5.2 373 2/15/2021
1.5.1 357 2/15/2021
1.5.0 358 2/15/2021
1.4.9 367 2/15/2021
1.4.8 381 2/15/2021
1.4.7 381 2/15/2021
1.4.5 373 2/15/2021
1.4.4 385 2/15/2021
1.4.3 364 2/15/2021
1.4.2 368 2/15/2021
1.4.1 373 2/15/2021
1.4.0 404 2/13/2021
1.3.6 411 2/13/2021
1.3.3 388 2/10/2021
1.3.2 376 2/10/2021
1.3.1 441 2/9/2021
1.3.0 411 2/9/2021
1.2.16 393 2/8/2021
1.2.15 404 2/8/2021
1.2.14 401 2/8/2021
1.2.13 407 2/8/2021
1.2.12 397 1/29/2021
1.2.11 384 1/29/2021
1.2.10 353 1/29/2021
1.2.9 373 1/29/2021
1.2.8 421 1/28/2021
1.2.7 390 1/28/2021
1.2.6 396 1/28/2021
1.2.5 405 1/28/2021
1.2.4 426 1/27/2021
1.2.3 429 1/27/2021
1.2.2 420 1/27/2021
1.2.1 397 1/25/2021
1.2.0 376 1/25/2021
1.1.10 411 1/23/2021
1.1.9 434 1/23/2021
1.1.8 424 1/22/2021
1.1.7 404 1/22/2021
1.1.6 423 1/22/2021
1.1.5 414 1/22/2021
1.1.4 418 1/22/2021
1.1.3 454 1/22/2021
1.1.2 440 1/22/2021
1.1.1 449 1/21/2021
1.0.10 436 1/21/2021
1.0.9 451 11/9/2020
1.0.8 450 11/9/2020
1.0.7 479 11/9/2020
1.0.6 470 11/9/2020
1.0.5 451 11/9/2020
1.0.4 552 11/6/2020
1.0.3 405 11/6/2020
1.0.2 432 11/6/2020
1.0.1 480 11/6/2020
1.0.0 495 11/5/2020