Open.ChannelExtensions
5.1.1
See the version list below for details.
dotnet add package Open.ChannelExtensions --version 5.1.1
NuGet\Install-Package Open.ChannelExtensions -Version 5.1.1
<PackageReference Include="Open.ChannelExtensions" Version="5.1.1" />
paket add Open.ChannelExtensions --version 5.1.1
#r "nuget: Open.ChannelExtensions, 5.1.1"
// Install Open.ChannelExtensions as a Cake Addin #addin nuget:?package=Open.ChannelExtensions&version=5.1.1 // Install Open.ChannelExtensions as a Cake Tool #tool nuget:?package=Open.ChannelExtensions&version=5.1.1
Open.ChannelExtensions
A set of extensions for optimizing/simplifying System.Threading.Channels usage.
Click here for detailed documentation.
Highlights
Read & Write
With optional concurrency levels.
- Reading all entries in a channel.
- Writing all entries from a source to a channel.
- Piping (consuming) all entries to a buffer (channel).
.AsAsyncEnumerable()
(IAsyncEnumerable
) support for .NET Standard 2.1+ and .NET Core 3+
Special ChannelReader
Operations
Filter
Transform
Batch
Join
Examples
Being able to define an asynchronous pipeline with best practice usage using simple expressive syntax:
await Channel
.CreateBounded<T>(10)
.SourceAsync(source /* IEnumerable<Task<T>> */)
.PipeAsync(
maxConcurrency: 2,
capacity: 5,
transform: asyncTransform01)
.Pipe(transform02, /* capacity */ 3)
.ReadAllAsync(finalTransformedValue => {
// Do something async with each final value.
});
await source /* IEnumerable<T> */
.ToChannel(boundedSize: 10, singleReader: true)
.PipeAsync(asyncTransform01, /* capacity */ 5)
.Pipe(
maxConcurrency: 2,
capacity: 3,
transform: transform02)
.ReadAll(finalTransformedValue => {
// Do something with each final value.
});
Reading (until the channel is closed)
One by one read each entry from the channel
await channel.ReadAll(
entry => { /* Processing Code */ });
await channel.ReadAll(
(entry, index) => { /* Processing Code */ });
await channel.ReadAllAsync(
async entry => { await /* Processing Code */ });
await channel.ReadAllAsync(
async (entry, index) => { await /* Processing Code */ });
Read concurrently each entry from the channel
await channel.ReadAllConcurrently(
maxConcurrency,
entry => { /* Processing Code */ });
await channel.ReadAllConcurrentlyAsync(
maxConcurrency,
async entry => { await /* Processing Code */ });
Writing
If complete
is true
, the channel will be closed when the source is empty.
Dump a source enumeration into the channel
// source can be any IEnumerable<T>.
await channel.WriteAll(source, complete: true);
// source can be any IEnumerable<Task<T>> or IEnumerable<ValueTask<T>>.
await channel.WriteAllAsync(source, complete: true);
Synchronize reading from the source and process the results concurrently
// source can be any IEnumerable<Task<T>> or IEnumerable<ValueTask<T>>.
await channel.WriteAllConcurrentlyAsync(
maxConcurrency, source, complete: true);
Filter & Transform
// Filter and transform when reading.
channel.Reader
.Filter(predicate) // .Where()
.Transform(selector) // .Select()
.ReadAllAsync(async value => {/*...*/});
Batching
values.Reader
.Batch(10 /*batch size*/)
.WithTimeout(1000) // Any non-empty batches are flushed every second.
.ReadAllAsync(async batch => {/*...*/});
Joining
batches.Reader
.Join()
.ReadAllAsync(async value => {/*...*/});
Pipelining / Transforming
Transform and buffer entries
// Transform values in a source channel to new unbounded channel.
var transformed = channel.Pipe(
async value => /* transformation */);
// Transform values in a source channel to new unbounded channel with a max concurrency of X.
const X = 4;
var transformed = channel.Pipe(
X, async value => /* transformation */);
// Transform values in a source channel to new bounded channel bound of N entries.
const N = 5;
var transformed = channel.Pipe(
async value => /* transformation */, N);
// Transform values in a source channel to new bounded channel bound of N entries with a max concurrency of X.
const X = 4;
const N = 5;
var transformed = channel.Pipe(
X, async value => /* transformation */, N);
// or
transformed = channel.Pipe(
maxConcurrency: X,
capacity: N,
transform: async value => /* transformation */);
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 is compatible. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- System.Threading.Channels (>= 5.0.0)
-
.NETStandard 2.1
- System.Threading.Channels (>= 5.0.0)
NuGet packages (8)
Showing the top 5 NuGet packages that depend on Open.ChannelExtensions:
Package | Downloads |
---|---|
Indice.Common
Package Description |
|
Indice.Features.Identity.SignInLogs
Package Description |
|
Open.Database.Extensions.Channel
Database extensions for pipelining data through channels. Includes Open.Database.Extensions.Core. |
|
DataPipe.Core
Provides AWS support to the DataPipe platform. Reference this package if you need to read or write to S3 from a data pipe script |
|
ProjectoR.Core
A simple way of writing projections in .NET. |
GitHub repositories (3)
Showing the top 3 popular GitHub repositories that depend on Open.ChannelExtensions:
Repository | Stars |
---|---|
oskardudycz/EventSourcing.NetCore
Examples and Tutorials of Event Sourcing in .NET
|
|
mehdihadeli/food-delivery-microservices
🍔 A practical and imaginary food delivery microservices, built with .Net 8, MassTransit, Domain-Driven Design, CQRS, Vertical Slice Architecture, Event-Driven Architecture, and the latest technologies.
|
|
Excel-DNA/Samples
Various sample projects and snippets related to Excel-DNA
|
Version | Downloads | Last updated |
---|---|---|
9.0.0 | 241 | 11/18/2024 |
8.6.0 | 359 | 11/13/2024 |
8.5.0 | 10,305 | 9/22/2024 |
8.4.3 | 31,072 | 7/8/2024 |
8.4.2 | 2,509 | 7/1/2024 |
8.4.1 | 1,071 | 6/26/2024 |
8.4.0 | 111 | 6/26/2024 |
8.3.0 | 34,714 | 5/7/2024 |
8.2.0 | 531 | 5/5/2024 |
8.1.0 | 138 | 5/5/2024 |
8.0.2 | 44,704 | 1/31/2024 |
8.0.1 | 1,033 | 1/26/2024 |
8.0.0 | 3,729 | 1/23/2024 |
7.3.0 | 101 | 9/22/2024 |
7.2.0 | 109 | 7/8/2024 |
7.1.0 | 113 | 6/26/2024 |
7.0.2 | 9,674 | 1/31/2024 |
7.0.1 | 271 | 1/26/2024 |
7.0.0 | 378 | 1/23/2024 |
6.6.0 | 113 | 9/22/2024 |
6.5.0 | 108 | 7/8/2024 |
6.4.0 | 110 | 6/26/2024 |
6.3.2 | 11,218 | 1/31/2024 |
6.3.1 | 1,061 | 1/26/2024 |
6.3.0 | 332 | 1/23/2024 |
6.2.2 | 360,788 | 10/5/2022 |
6.2.1 | 46,834 | 9/3/2022 |
6.2.0 | 45,456 | 7/13/2022 |
6.1.0 | 5,499 | 6/30/2022 |
6.0.3 | 39,285 | 5/12/2022 |
6.0.2 | 31,226 | 4/13/2022 |
6.0.1 | 18,136 | 3/1/2022 |
6.0.0 | 4,032 | 2/22/2022 |
5.2.1 | 9,569 | 2/22/2022 |
5.2.0 | 4,479 | 2/6/2022 |
5.1.3 | 64,546 | 10/12/2021 |
5.1.2 | 1,097 | 10/8/2021 |
5.1.1 | 757 | 10/8/2021 |
5.1.0 | 841 | 10/8/2021 |
5.0.0 | 44,009 | 6/16/2021 |
3.5.0 | 211,550 | 7/16/2020 |
3.4.0 | 5,513 | 6/3/2020 |
3.3.2 | 970 | 6/1/2020 |
3.3.1 | 1,006 | 5/14/2020 |
3.3.0 | 4,659 | 2/19/2020 |
2.6.0 | 970 | 10/20/2019 |