Shuttle.Core.Threading 21.0.1-beta

Prefix Reserved
This is a prerelease version of Shuttle.Core.Threading.
dotnet add package Shuttle.Core.Threading --version 21.0.1-beta
                    
NuGet\Install-Package Shuttle.Core.Threading -Version 21.0.1-beta
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Shuttle.Core.Threading" Version="21.0.1-beta" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Shuttle.Core.Threading" Version="21.0.1-beta" />
                    
Directory.Packages.props
<PackageReference Include="Shuttle.Core.Threading" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Shuttle.Core.Threading --version 21.0.1-beta
                    
#r "nuget: Shuttle.Core.Threading, 21.0.1-beta"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Shuttle.Core.Threading@21.0.1-beta
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Shuttle.Core.Threading&version=21.0.1-beta&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=Shuttle.Core.Threading&version=21.0.1-beta&prerelease
                    
Install as a Cake Tool

Shuttle.Core.Threading

Provides various classes and interfaces to facilitate thread-based processing using task-based asynchronous patterns.

Installation

dotnet add package Shuttle.Core.Threading

Overview

This library enables you to create thread pools that continuously execute processor implementations. Each processor runs in a loop, performing work and utilizing configurable idle strategies when no work is available. The library uses dependency injection and supports multiple thread pools with different service keys.

Core Components

IProcessor

Implement this interface to define the work that will be executed by processor threads:

public interface IProcessor
{
    ValueTask<bool> ExecuteAsync(CancellationToken cancellationToken = default);
}

The return value indicates whether work was performed (true) or not (false), which is used by the idle strategy to determine thread behavior.

IProcessorContext

Available via dependency injection within your processor implementation, providing context about the current execution:

public interface IProcessorContext
{
    string ServiceKey { get; }
    int ManagedThreadId { get; }
}

ProcessorThreadPool

Manages a pool of processor threads that execute your IProcessor implementation:

public class ProcessorThreadPool(
    string serviceKey,
    int threadCount,
    IServiceScopeFactory serviceScopeFactory,
    ThreadingOptions threadingOptions,
    IProcessorIdleStrategy processorIdleStrategy
) : IProcessorThreadPool

Parameters:

  • serviceKey: Identifier used for keyed service resolution and configuration
  • threadCount: Number of processor threads in the pool (must be > 0)
  • serviceScopeFactory: Factory for creating service scopes for each processor execution
  • threadingOptions: Configuration options including events and timeouts
  • processorIdleStrategy: Strategy for handling idle periods when no work is performed

Configuration

Service Registration

Register threading services in your dependency injection container:

services.AddThreading(builder =>
{
    builder.ConfigureThreading(options =>
    {
        options.JoinTimeout = TimeSpan.FromSeconds(30);
    });
    
    builder.ConfigureProcessorIdle("my-processor", options =>
    {
        options.Durations = new List<TimeSpan>
        {
            TimeSpan.FromMilliseconds(100),
            TimeSpan.FromMilliseconds(500),
            TimeSpan.FromSeconds(1)
        };
    });
});

// Register your processor implementation with a service key
services.AddKeyedScoped<IProcessor, MyProcessor>("my-processor");

ThreadingOptions

Property Type Default Description
JoinTimeout TimeSpan 00:00:15 Duration to wait for processor threads to stop gracefully
ProcessorThreadCreated AsyncEvent - Raised when a processor thread is created
ProcessorThreadActive AsyncEvent - Raised when a processor thread becomes active
ProcessorThreadStarting AsyncEvent - Raised when a processor thread is starting
ProcessorThreadStopping AsyncEvent - Raised when a processor thread is stopping
ProcessorThreadStopped AsyncEvent - Raised when a processor thread has stopped
ProcessorThreadOperationCanceled AsyncEvent - Raised when a processor operation is canceled
ProcessorExecuting AsyncEvent - Raised before processor execution
ProcessorExecuted AsyncEvent - Raised after processor execution
ProcessorException AsyncEvent - Raised when a processor throws an exception

ProcessorIdleOptions

Configures the idle strategy behavior when processors return false (no work performed):

public class ProcessorIdleOptions
{
    public List<TimeSpan> Durations { get; set; } = [];
}

The Durations list defines progressive wait times. When no work is performed, the thread waits for increasing durations from this list before retrying.

Usage Example

1. Implement IProcessor

public class MyProcessor : IProcessor
{
    private readonly IProcessorContext _context;
    private readonly ILogger<MyProcessor> _logger;
    private readonly IMyWorkQueue _workQueue;

    public MyProcessor(
        IProcessorContext context,
        ILogger<MyProcessor> logger,
        IMyWorkQueue workQueue)
    {
        _context = context;
        _logger = logger;
        _workQueue = workQueue;
    }

    public async ValueTask<bool> ExecuteAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation(
            "Processing on thread {ThreadId} with service key {ServiceKey}",
            _context.ManagedThreadId,
            _context.ServiceKey);

        var workItem = await _workQueue.DequeueAsync(cancellationToken);

        if (workItem == null)
        {
            return false; // No work available
        }

        await ProcessWorkItemAsync(workItem, cancellationToken);
        return true; // Work was performed
    }

    private async Task ProcessWorkItemAsync(WorkItem item, CancellationToken cancellationToken)
    {
        // Process the work item
        await Task.Delay(100, cancellationToken);
    }
}

2. Register Services

var builder = Host.CreateApplicationBuilder(args);

builder.Services.AddThreading(threadingBuilder =>
{
    threadingBuilder.ConfigureThreading(options =>
    {
        options.JoinTimeout = TimeSpan.FromSeconds(30);
        
        // Subscribe to events
        options.ProcessorException.Subscribe(async args =>
        {
            Console.WriteLine($"Processor exception: {args.Exception.Message}");
        });
    });

    threadingBuilder.ConfigureProcessorIdle("my-processor", options =>
    {
        options.Durations = new List<TimeSpan>
        {
            TimeSpan.FromMilliseconds(100),
            TimeSpan.FromMilliseconds(500),
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(5)
        };
    });
});

builder.Services.AddKeyedScoped<IProcessor, MyProcessor>("my-processor");
builder.Services.AddSingleton<IMyWorkQueue, MyWorkQueue>();

var host = builder.Build();
await host.RunAsync();

3. Create and Manage Thread Pool

public class MyHostedService : IHostedService
{
    private readonly IServiceScopeFactory _serviceScopeFactory;
    private readonly ThreadingOptions _threadingOptions;
    private readonly IProcessorIdleStrategy _processorIdleStrategy;
    private ProcessorThreadPool? _threadPool;

    public MyHostedService(
        IServiceScopeFactory serviceScopeFactory,
        IOptions<ThreadingOptions> threadingOptions,
        IProcessorIdleStrategy processorIdleStrategy)
    {
        _serviceScopeFactory = serviceScopeFactory;
        _threadingOptions = threadingOptions.Value;
        _processorIdleStrategy = processorIdleStrategy;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        _threadPool = new ProcessorThreadPool(
            "my-processor",
            threadCount: 5,
            _serviceScopeFactory,
            _threadingOptions,
            _processorIdleStrategy);

        await _threadPool.StartAsync(cancellationToken);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        if (_threadPool != null)
        {
            await _threadPool.StopAsync(cancellationToken);
            await _threadPool.DisposeAsync();
        }
    }
}

Idle Strategies

IProcessorIdleStrategy

Controls thread behavior when processors return false (no work performed):

public interface IProcessorIdleStrategy
{
    Task SignalAsync(string serviceKey, bool workPerformed, CancellationToken cancellationToken = default);
}

Built-in Implementations:

  • DefaultProcessorIdleStrategy: Uses progressive wait durations from ProcessorIdleOptions. When no work is performed, waits for increasing durations before retrying.
  • NullProcessorIdleStrategy: No idle behavior; threads immediately retry without waiting.

Event Handling

Subscribe to events to monitor and react to processor lifecycle and execution:

services.AddThreading(builder =>
{
    builder.ConfigureThreading(options =>
    {
        options.ProcessorExecuting.Subscribe(async args =>
        {
            Console.WriteLine($"Executing processor on thread {args.ManagedThreadId}");
        });

        options.ProcessorExecuted.Subscribe(async args =>
        {
            Console.WriteLine($"Executed processor. Work performed: {args.WorkPerformed}");
        });

        options.ProcessorException.Subscribe(async args =>
        {
            Console.WriteLine($"Exception on thread {args.ManagedThreadId}: {args.Exception.Message}");
        });
    });
});

Advanced Scenarios

Multiple Thread Pools

You can create multiple thread pools with different service keys and configurations:

// Register multiple processors
services.AddKeyedScoped<IProcessor, HighPriorityProcessor>("high-priority");
services.AddKeyedScoped<IProcessor, LowPriorityProcessor>("low-priority");

// Configure different idle strategies
builder.ConfigureProcessorIdle("high-priority", options =>
{
    options.Durations = new List<TimeSpan> { TimeSpan.FromMilliseconds(50) };
});

builder.ConfigureProcessorIdle("low-priority", options =>
{
    options.Durations = new List<TimeSpan> { TimeSpan.FromSeconds(5) };
});

// Create separate thread pools
var highPriorityPool = new ProcessorThreadPool("high-priority", 10, ...);
var lowPriorityPool = new ProcessorThreadPool("low-priority", 2, ...);

Thread Safety

  • ProcessorThreadPool uses internal locking (SemaphoreSlim) for thread-safe start/stop operations
  • Each processor execution runs in its own service scope
  • ProcessorContext is scoped per execution and accessed via ProcessorContextAccessor
Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (8)

Showing the top 5 NuGet packages that depend on Shuttle.Core.Threading:

Package Downloads
Shuttle.Esb

Contains the core Shuttle.Esb assembly that should always be referenced when building Shuttle.Esb solutions.

Shuttle.Recall

Event sourcing mechanism.

Shuttle.Core.ServiceHost

Turns your console application into a Windows service.

Shuttle.Core.Data.CallContext

IDatabaseConnectionCache implementation for use in async/await scenarios.

Shuttle.Core.Data.ThreadDatabaseContextScope

Provides a mechanism to create a new database context scope per processor thread.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
21.0.1-beta 95 2/7/2026
21.0.0-alpha 113 1/18/2026
20.0.0 4,569 2/2/2025
13.1.0 2,655 8/5/2024
13.0.0 4,784 4/30/2024
12.0.1 17,319 12/1/2022
12.0.0 29,637 9/4/2022
11.1.2 1,254 4/9/2022
11.1.1 42,917 1/30/2021
11.0.2 14,447 1/17/2021
11.0.1 2,932 11/27/2020
11.0.0 43,120 6/21/2019
10.1.0 1,422 4/27/2019
10.0.2 81,354 7/4/2018
10.0.1 2,032 7/2/2018
10.0.0 24,608 1/3/2018