KafkaConsumerRetry 0.1.0
See the version list below for details.
dotnet add package KafkaConsumerRetry --version 0.1.0
NuGet\Install-Package KafkaConsumerRetry -Version 0.1.0
<PackageReference Include="KafkaConsumerRetry" Version="0.1.0" />
paket add KafkaConsumerRetry --version 0.1.0
#r "nuget: KafkaConsumerRetry, 0.1.0"
// Install KafkaConsumerRetry as a Cake Addin #addin nuget:?package=KafkaConsumerRetry&version=0.1.0 // Install KafkaConsumerRetry as a Cake Tool #tool nuget:?package=KafkaConsumerRetry&version=0.1.0
UBER style retry architecture
How it works
This library implements a retry system by using retry topics.
The original article text is in the local ARTICLE.md or on Uber
Main considerations
- Retry spamming
- Worker starvation
- Too many threads
Retry Spamming
Worker Starvation
Usage
var retryServiceConfig = new KafkaRetryConfig {
RetryAttempts = 3,
RetryBaseTime = TimeSpan.FromSeconds(5),
OriginCluster = new Dictionary<string, string> {
["group.id"] = "my-group-name",
["bootstrap.servers"] = "localhost:9092",
["client.id"] = "client-id",
["auto.offset.reset"] = "earliest",
["enable.auto.offset.store"] = "false", //Don't auto save the offset
["enable.auto.commit"] = "true" // Allow auto commit
}
};
var topicNaming = _naming.GetTopicNaming(originalName,retryServiceConfig);
await _consumerRunner.RunConsumersAsync<TestingResultHandler>(retryServiceConfig, topicNaming, cancellationToken);
Deep dive
Topic Naming
Naming of the topic
Class | Usage |
---|---|
KafkaRetryConfig | Connection settings for the Origin and Retry Kafka Clusters. If no retry cluster is specified, then the origin will be used for the retry topics |
TopicNaming | Responsible for the naming of the retries and dlq |
PartitionManager | Controls the actions of the workers. More info below |
PartitionManager
PartitionMessageManager
handles messages from the subscribed topics.
Internal message queues are updated on Assigned
, Revoked
and Lost
partition events.
Incoming messages are added to a partition's work queue until a threshold is reached. When the threshold is reached,
the Pause()
action is called on that topic's partition.
The Pause()
call is not passed to the server, but is used by the internal Kafka
library librdkafka
to stop the requesting of messages from that topic's
partition.
Resume()
is called when the work queue reaches zero.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net6.0
- Confluent.Kafka (>= 1.9.3)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 6.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 6.0.3)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.