rio-command-pipeline 1.0.3

dotnet add package rio-command-pipeline --version 1.0.3                
NuGet\Install-Package rio-command-pipeline -Version 1.0.3                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="rio-command-pipeline" Version="1.0.3" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add rio-command-pipeline --version 1.0.3                
#r "nuget: rio-command-pipeline, 1.0.3"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install rio-command-pipeline as a Cake Addin
#addin nuget:?package=rio-command-pipeline&version=1.0.3

// Install rio-command-pipeline as a Cake Tool
#tool nuget:?package=rio-command-pipeline&version=1.0.3                

Command Pipeline

An simple asynchronous EventHandler stream.

Overview

My goal for this project was to create a simple, efficient, and small API for handling events in .NET ecosystems asynchronously. This project does not utilize RX or .NET's built in Observer system. Instead, the goal was to create an EventHandler with the following characteristics:

  • Returns a Task
  • Is asynchronous (awaitable)
  • Allows for various callbacks in the pipeline
  • Catches and propagates Exceptions
  • Is agnostic to the project type
  • Keep it lightweight
  • Does not use RX or TPL Dataflow

Features

  • Subscribe to various processes within a pipeline: OnStart, OnStartAsync, OnSignalAsync, OnEnd, OnEndAsync, OnFinally, OnFinallyAsync, OnErrorCaught and OnErrorThrown
  • API returns an awaitable Task when 'SignalAsync' is invoked
  • You can choose to skip running the entire pipeline process and simply invoke specific pipeline processes (such as a specific callback)
  • The auxiliary callbacks are completley optional, but allow the consumer to prep data, ensure an asynchronous method is in an appropriate state before firing, and exception handling.
  • Is application agnostic (.NET)

Built With

  • JetBrains Rider
  • Tested with WPF & Blazor WASM

Getting Started

Clone/fork this repository and add a reference to the Command Pipeline project. OR, add this as a Nuget package to your project.

You can instantiate CommandPipelines as needed:

var pipeline = new CommandPipeline();

This project supports logging via Microsoft.Extensions.Logging.ILogger. The base constructor for CommandPipeline accepts an ILogger:

ILogger logger   = new Logger<CommandPipeline>(new LoggerFactory());  
var     pipeline = new CommandPipeline(logger);

Once a pipeline instance is created, access to the fluent API becomes available:

Prerequisites and Dependencies

  • .NET 6
  • C# 10
  • Microsoft.Extensions.Logging.7.0.0
Please feel free to contact me with any issues or concerns in regards to the dependencies defined above. We can work around the majority of them if needed.

Installation

  • Clone or fork this repository. Once done, add a reference to this library in your project
  • Download the latest dll and create a reference to it in your project
  • Install via NPM

Usage

Standalone Instance(s)

using Rio.CommandPipeline;  
  
public class Test {  
    public async Task MyTestMethod() {  
       // Create a new command pipeline  
       var pipeline = new CommandPipeline();  
  
       // Register a command with the pipeline  
       // The method signature must be CommandPipeline.CommandPipelineDelegate:
       // delegate Task PipelineDelegate(object? o, PipelineObject? pObj, CancellationToken token)
       pipeline.RegisterWork(MyWorkMethodAsync);  
         
       // Register optional callbacks  
       // Read through the available fluent methods to see what you can do
	  pipeline.RegisterOnStart(OnStart)  
                  .RegisterOnEnd(OnEnd);  
         
       // The fluent API allows you to chain method calls  
       // We can also register asynchronous callbacks  
       // Register asynchronous callbacks requires the method signature to match CommandPipeline.CommandPipelineDelegate
         pipeline.RegisterOnStartAsync(OnStartAsync)  
                 .RegisterOnEndAsync(OnEndAsync);  
         
       // Start the pipeline  
       // PipelineObject are analogous to EventArgs
       // You should create a derived class from PipelineObject to allow for transmission through the pipeline
       // This class should contain any required information or state for the work registered to the pipeline;
       // you decide.  
       await pipeline.SignalAsync(new PipelineObject());  
       // await pipeline.SignalAsync(PipelineObject.Empty);

       // You can rerun this pipeline as needed.  
       // If needed, unregister any work or callbacks
       pipeline.UnregisterOnStartAsync(OnStartAsync);  
       pipeline.UnregisterOnEnd(OnEnd);  
         
       // All fluent API methods allow for multiple registrations  
       pipeline.RegisterWork(MyWorkMethodAsync, MyWorkMethodAsync, MyWorkMethodAsync);  
       pipeline.RegisterOnStart(OnStart, OnStart, OnStart, OnStart);  
         
       // Errors will be caught and thrown  
       // ErrorCaught is invoked first, followed by ErrorThrown
       // This allows you to handle errors as they are caught, followed by thrown
       pipeline.RegisterOnErrorCaught(OnErrorCaught);  
       pipeline.RegisterOnErrorThrown(OnErrorThrown);  
    }  
  
    async Task MyWorkMethodAsync(object? sender, PipelineObject? pipelineObject, CancellationToken token) {  
       // Do some work  
       await Task.Delay(5000, token);  
    }  
      
    void OnStart(PipelineObject? pipelineObject) {  
       // Do something when the pipeline starts  
    }  
      
    void OnEnd(PipelineObject? pipelineObject) {  
       // Do something when the pipeline ends  
    }  
      
    void OnErrorCaught(PipelineObject? pipelineObject, Exception exception) {  
       // Do something when an exception is caught  
    }  
      
    void OnErrorThrown(PipelineObject? pipelineObject, Exception exception) {  
       // Do something when an exception is thrown  
    }  
  
    Task OnStartAsync(object?  sender, PipelineObject? pipelineObject, CancellationToken token) {  
       // Do something when the pipeline starts asynchronously  
       return Task.CompletedTask;  
    }  
  
    Task OnEndAsync(object? sender, PipelineObject? pipelineObject, CancellationToken token) {  
       // Do something when the pipeline ends asynchronously  
       return Task.CompletedTask;  
    }  
}

Using the CommandPipelineBroker

using Rio.CommandPipeline;  
  
public class Test {  
    public async Task MyTestMethod() {  
       // Instantiate a new CommandPipelineBroker  
       // The default takes params string[] ids
       // These ids are used to register new instances of ICommandPipeline to an internal container.
       // The container key is the id.
       // You can register as many ids as you want.
       // With an instance of CommandPipelineBroker, you can access the ICommandPipeline instances via the indexer.
       var pipelineBroker1 = new CommandPipelineBroker("myIdOne", "myIdTwo");  
         
       // You also do not need to provide any ids  
       var pipelineBroker2 = new CommandPipelineBroker();  
  
       var pipelineOne = pipelineBroker1["myIdOne"];  
       var pipelineTwo = pipelineBroker1["myIdTwo"];  
         
       // You can also add new instances of ICommandPipeline to the broker.  
       // These pipeline instances functional identically to the "StandAlone" example.  
       pipelineOne.RegisterWork(MyWorkMethodAsync);  
       pipelineTwo.RegisterOnFinally(OnEnd);  
         
       // You can signal the pipeline to start by calling SignalAsync after retrieving an instance from the broker.  
       await pipelineOne.SignalAsync();  
       //await pipelineOne.SignalAsync(PipelineObject.Empty, new CancellationToken());

       // Or let the broker do it for you.  
       await pipelineBroker1.SignalAsync("myIdOne");  
       // await pipelineBroker1.SignalAsync("myIdOne", PipelineObject.Empty, new CancellationToken());  
  
       // You can also register event handlers to the pipeline.
       pipelineBroker1.Register("myNewPipeline", new CommandPipeline());  
    }  
  
    async Task MyWorkMethodAsync(object? sender, PipelineObject? pipelineObject, CancellationToken token) {  
       // Do some work  
       await Task.Delay(5000, token);  
    }  
  
    void OnEnd(PipelineObject? pipelineObject) {  
       // Do something when the pipeline ends  
    }  
}

Roadmap

There is currently no future features planned.

Contributing

Contributions are absolutely welcome. This is an open source project.

  1. Fork the repository
  2. Create a feature branch
git checkout -b feature/your-feature-branch
  1. Commit changes on your feature branch
git commit -m 'Summary feature'
  1. Push your changes to your branch
git push origin feature/your-feature-branch
  1. Open a pull request to merge/incorporate your feature

License

Distributed under the MIT License.

Contact

GitHub

Acknowledgments and Credit

Product Compatible and additional computed target framework versions.
.NET net7.0 is compatible.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.0.3 157 5/17/2024
1.0.0 104 5/16/2024

Official version 1 release