SimpleKafkaLibrary 2.0.0

dotnet add package SimpleKafkaLibrary --version 2.0.0
                    
NuGet\Install-Package SimpleKafkaLibrary -Version 2.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SimpleKafkaLibrary" Version="2.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="SimpleKafkaLibrary" Version="2.0.0" />
                    
Directory.Packages.props
<PackageReference Include="SimpleKafkaLibrary" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add SimpleKafkaLibrary --version 2.0.0
                    
#r "nuget: SimpleKafkaLibrary, 2.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package SimpleKafkaLibrary@2.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=SimpleKafkaLibrary&version=2.0.0
                    
Install as a Cake Addin
#tool nuget:?package=SimpleKafkaLibrary&version=2.0.0
                    
Install as a Cake Tool

Kafka Implementation

in appsettings.json put your configuration

Configuration:

"Kafka": {
    "bootstrapservers": "localhost:9092",
    "UseKafka": true,
    "UseEncryptedData":false,
    "EncryptedKey":"xxxxxx" 
    "FlushProducerInSeconds": 2,
    "ConsumedInSeconds": 2,
    "RequestReplyMinTimeoutMs": 100,
    "RequestReplyMaxTimeoutMs": 2000,
    "RequestReplyConsumedInMilliseconds": 100
}

Note that you can use Confluent.Kafka configuration and EncryptedKey is optional if you want to encrypt the data

Producer Usage Sample Producer DI

 public class CreateUserCommandHandler : IRequestHandler<CreateUserCommand, PayloadResponse<UsersDto>>
 {
    private readonly IMessageProducers _messageProducers;

    public CreateUserCommandHandler(IMessageProducers messageProducers)
    {
       _messageProducers = messageProducers
    }

    public async Task<PayloadResponse<UsersDto>> Handle(CreateUserCommand request, CancellationToken cancellationToken)
        {
            ...

            // Fire-and-forget: publish without waiting for consumer processing
            await _messageProducers.WriteFireAndForget<UsersDto>("User", newUser).ConfigureAwait(false);

            // Returns true when Kafka accepts the message, false when Kafka is down or produce fails
            var published = await _messageProducers.ProduceAsync<UsersDto>("User", newUser).ConfigureAwait(false);

            return ResponseStatus<UsersDto>.Create<PayloadResponse<UsersDto>>(ResponseCodes.SUCCESSFUL, _messageProvider.GetMessage(ResponseCodes.SUCCESSFUL), newUser);
        }

 }

Request-Reply Producer Usage

Use ProduceWaitForFResponseAsync when you need to wait until the consumer processes the request and returns a response. Priority request headers and fast polling are used so request-reply messages are consumed before regular messages. Response wait time is configurable between RequestReplyMinTimeoutMs (100) and RequestReplyMaxTimeoutMs (2000).

// Uses default max timeout from config (2000ms)
var result = await _messageProducers.ProduceWaitForFResponseAsync<CreateUserRequest, CreateUserFeedback>(
    "User",
    request,
    cancellationToken: cancellationToken).ConfigureAwait(false);

if (result.IsSuccessful)
{
    var feedback = result.Data;
}

var fastResult = await _messageProducers.ProduceWaitForFResponseAsync<CreateUserRequest, CreateUserFeedback>(
    "User",
    request,
    responseTimeoutMs: 500,
    cancellationToken: cancellationToken).ConfigureAwait(false);

Consumer Usage

Sample Consumer class

Path: src\Core\SmartCleanArchitecture.Application\kafkaConsumer\Consumer.cs

    public class Consumer : ConsumerBase
    {

        public Consumer(KafkaConfig configuration, IMessageProducers producers, IMessageAdmin messageAdmin) : base("test1", configuration, messageAdmin)
        {

        }

        public override async Task Invoke()
        {

            await ConsumeAsync<string>("testTopic", (value) =>
            {  
                // put your action here
                Console.WriteLine(value);
            });

            await base.Invoke();
        }
    }

Request-Reply Consumer Usage

Use ConsumeRequestAsync when the producer calls ProduceWaitForFResponseAsync and expects a response. The handler must return the feedback object that gets sent back to the producer.

Request / Response models

public class CreateUserRequest
{
    public string Name { get; set; } = string.Empty;
    public string Email { get; set; } = string.Empty;
}

public class CreateUserFeedback
{
    public bool Success { get; set; }
    public string Message { get; set; } = string.Empty;
    public Guid UserId { get; set; }
}

Consumer that returns feedback

Path: src\Core\SmartCleanArchitecture.Application\kafkaConsumer\UserConsumer.cs

public class UserConsumer : ConsumerBase
{
    public UserConsumer(KafkaConfig configuration, IMessageProducers producers, IMessageAdmin messageAdmin)
        : base("user-group", configuration, messageAdmin)
    {
    }

    public override async Task Invoke()
    {
        await ConsumeRequestAsync<CreateUserRequest, CreateUserFeedback>("User", request =>
        {
            // process the request
            if (string.IsNullOrWhiteSpace(request.Email))
            {
                return new CreateUserFeedback
                {
                    Success = false,
                    Message = "Email is required."
                };
            }

            var userId = Guid.NewGuid();

            // save user, send email, etc.

            // return feedback to the producer
            return new CreateUserFeedback
            {
                Success = true,
                Message = "User created successfully.",
                UserId = userId
            };
        });

        await base.Invoke();
    }
}

Producer waiting for feedback

var result = await _messageProducers.ProduceWaitForFResponseAsync<CreateUserRequest, CreateUserFeedback>(
    "User",
    new CreateUserRequest
    {
        Name = "John Doe",
        Email = "john@example.com"
    },
    cancellationToken).ConfigureAwait(false);

if (result.IsSuccessful && result.Data is not null)
{
    Console.WriteLine($"User created: {result.Data.UserId}");
}
else
{
    Console.WriteLine("Request failed or Kafka is unavailable.");
}

Note: - You can create multiple Consumer classes and configure them in the configuration file. - The groupId is test1. - The topic to be consumed is testTopic.

In case of using multiple ConsumeAsync in a Invoke of ConsumerBase class

    public class Consumer : ConsumerBase
    {

        public Consumer(KafkaConfig configuration, IMessageProducers producers, IMessageAdmin messageAdmin) : base("test1", configuration, messageAdmin)
        {

        }

        public override async Task Invoke()
        {

             var task1 =  this.ConsumeAsync<string>("testTopic", (value) =>
             {
                Console.WriteLine(value);

             });


            var task2 = this.ConsumeAsync<string>("testTopic2", (value) =>
            {
                Console.WriteLine(value);

            });

            var task3 = ConsumeAsync<string>("testTopic3", (value) =>
            {
                Console.WriteLine(value);

            });

            await Task.WhenAll(task1, task2, task3);

            await base.Invoke();
        }
    }
services.AddKafkaServices<Consumer>(kafkaConfig);

OR


services.AddKafkaServices(cfg =>
{
    cfg.Configure(configuration.GetSection("Kafka"));
    cfg.RegisterConsumer<Consumer>();
    cfg.RegisterConsumer<Consumer2>(); 
});

OR



services.AddKafkaServices(cfg =>
{
    cfg.Configure(configuration.GetSection("Kafka"));

    cfg.RegisterConsumer(Assembly.GetExecutingAssembly());

});

License

This project is licensed with the MIT license.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.0.0 50 6/22/2026
1.0.7 811 5/15/2025
1.0.6 1,080 2/25/2025
1.0.5 331 1/20/2025
1.0.4 201 12/27/2024
1.0.3 187 12/17/2024
1.0.2 1,856 11/27/2024
1.0.1 205 6/22/2024
1.0.0 192 6/22/2024