SimpleKafkaLibrary 2.0.0
dotnet add package SimpleKafkaLibrary --version 2.0.0
NuGet\Install-Package SimpleKafkaLibrary -Version 2.0.0
<PackageReference Include="SimpleKafkaLibrary" Version="2.0.0" />
<PackageVersion Include="SimpleKafkaLibrary" Version="2.0.0" />
<PackageReference Include="SimpleKafkaLibrary" />
paket add SimpleKafkaLibrary --version 2.0.0
#r "nuget: SimpleKafkaLibrary, 2.0.0"
#:package SimpleKafkaLibrary@2.0.0
#addin nuget:?package=SimpleKafkaLibrary&version=2.0.0
#tool nuget:?package=SimpleKafkaLibrary&version=2.0.0
Kafka Implementation
in appsettings.json put your configuration
Configuration:
"Kafka": {
"bootstrapservers": "localhost:9092",
"UseKafka": true,
"UseEncryptedData":false,
"EncryptedKey":"xxxxxx"
"FlushProducerInSeconds": 2,
"ConsumedInSeconds": 2,
"RequestReplyMinTimeoutMs": 100,
"RequestReplyMaxTimeoutMs": 2000,
"RequestReplyConsumedInMilliseconds": 100
}
Note that you can use Confluent.Kafka configuration and EncryptedKey is optional if you want to encrypt the data
Producer Usage
Sample Producer DI
public class CreateUserCommandHandler : IRequestHandler<CreateUserCommand, PayloadResponse<UsersDto>>
{
private readonly IMessageProducers _messageProducers;
public CreateUserCommandHandler(IMessageProducers messageProducers)
{
_messageProducers = messageProducers
}
public async Task<PayloadResponse<UsersDto>> Handle(CreateUserCommand request, CancellationToken cancellationToken)
{
...
// Fire-and-forget: publish without waiting for consumer processing
await _messageProducers.WriteFireAndForget<UsersDto>("User", newUser).ConfigureAwait(false);
// Returns true when Kafka accepts the message, false when Kafka is down or produce fails
var published = await _messageProducers.ProduceAsync<UsersDto>("User", newUser).ConfigureAwait(false);
return ResponseStatus<UsersDto>.Create<PayloadResponse<UsersDto>>(ResponseCodes.SUCCESSFUL, _messageProvider.GetMessage(ResponseCodes.SUCCESSFUL), newUser);
}
}
Request-Reply Producer Usage
Use ProduceWaitForFResponseAsync when you need to wait until the consumer processes the request and returns a response.
Priority request headers and fast polling are used so request-reply messages are consumed before regular messages.
Response wait time is configurable between RequestReplyMinTimeoutMs (100) and RequestReplyMaxTimeoutMs (2000).
// Uses default max timeout from config (2000ms)
var result = await _messageProducers.ProduceWaitForFResponseAsync<CreateUserRequest, CreateUserFeedback>(
"User",
request,
cancellationToken: cancellationToken).ConfigureAwait(false);
if (result.IsSuccessful)
{
var feedback = result.Data;
}
var fastResult = await _messageProducers.ProduceWaitForFResponseAsync<CreateUserRequest, CreateUserFeedback>(
"User",
request,
responseTimeoutMs: 500,
cancellationToken: cancellationToken).ConfigureAwait(false);
Consumer Usage
Sample Consumer class
Path: src\Core\SmartCleanArchitecture.Application\kafkaConsumer\Consumer.cs
public class Consumer : ConsumerBase
{
public Consumer(KafkaConfig configuration, IMessageProducers producers, IMessageAdmin messageAdmin) : base("test1", configuration, messageAdmin)
{
}
public override async Task Invoke()
{
await ConsumeAsync<string>("testTopic", (value) =>
{
// put your action here
Console.WriteLine(value);
});
await base.Invoke();
}
}
Request-Reply Consumer Usage
Use ConsumeRequestAsync when the producer calls ProduceWaitForFResponseAsync and expects a response.
The handler must return the feedback object that gets sent back to the producer.
Request / Response models
public class CreateUserRequest
{
public string Name { get; set; } = string.Empty;
public string Email { get; set; } = string.Empty;
}
public class CreateUserFeedback
{
public bool Success { get; set; }
public string Message { get; set; } = string.Empty;
public Guid UserId { get; set; }
}
Consumer that returns feedback
Path: src\Core\SmartCleanArchitecture.Application\kafkaConsumer\UserConsumer.cs
public class UserConsumer : ConsumerBase
{
public UserConsumer(KafkaConfig configuration, IMessageProducers producers, IMessageAdmin messageAdmin)
: base("user-group", configuration, messageAdmin)
{
}
public override async Task Invoke()
{
await ConsumeRequestAsync<CreateUserRequest, CreateUserFeedback>("User", request =>
{
// process the request
if (string.IsNullOrWhiteSpace(request.Email))
{
return new CreateUserFeedback
{
Success = false,
Message = "Email is required."
};
}
var userId = Guid.NewGuid();
// save user, send email, etc.
// return feedback to the producer
return new CreateUserFeedback
{
Success = true,
Message = "User created successfully.",
UserId = userId
};
});
await base.Invoke();
}
}
Producer waiting for feedback
var result = await _messageProducers.ProduceWaitForFResponseAsync<CreateUserRequest, CreateUserFeedback>(
"User",
new CreateUserRequest
{
Name = "John Doe",
Email = "john@example.com"
},
cancellationToken).ConfigureAwait(false);
if (result.IsSuccessful && result.Data is not null)
{
Console.WriteLine($"User created: {result.Data.UserId}");
}
else
{
Console.WriteLine("Request failed or Kafka is unavailable.");
}
Note: - You can create multiple Consumer classes and configure them in the configuration file. - The groupId is test1. - The topic to be consumed is testTopic.
In case of using multiple ConsumeAsync in a Invoke of ConsumerBase class
public class Consumer : ConsumerBase
{
public Consumer(KafkaConfig configuration, IMessageProducers producers, IMessageAdmin messageAdmin) : base("test1", configuration, messageAdmin)
{
}
public override async Task Invoke()
{
var task1 = this.ConsumeAsync<string>("testTopic", (value) =>
{
Console.WriteLine(value);
});
var task2 = this.ConsumeAsync<string>("testTopic2", (value) =>
{
Console.WriteLine(value);
});
var task3 = ConsumeAsync<string>("testTopic3", (value) =>
{
Console.WriteLine(value);
});
await Task.WhenAll(task1, task2, task3);
await base.Invoke();
}
}
services.AddKafkaServices<Consumer>(kafkaConfig);
OR
services.AddKafkaServices(cfg =>
{
cfg.Configure(configuration.GetSection("Kafka"));
cfg.RegisterConsumer<Consumer>();
cfg.RegisterConsumer<Consumer2>();
});
OR
services.AddKafkaServices(cfg =>
{
cfg.Configure(configuration.GetSection("Kafka"));
cfg.RegisterConsumer(Assembly.GetExecutingAssembly());
});
License
This project is licensed with the MIT license.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Confluent.Kafka (>= 2.4.0)
- Microsoft.Extensions.Configuration (>= 8.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 8.0.1)
- Microsoft.Extensions.DependencyInjection (>= 8.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.0)
- Microsoft.Extensions.Options (>= 8.0.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.