FSharp.Control.TaskSeq
0.1.1
See the version list below for details.
dotnet add package FSharp.Control.TaskSeq --version 0.1.1
NuGet\Install-Package FSharp.Control.TaskSeq -Version 0.1.1
<PackageReference Include="FSharp.Control.TaskSeq" Version="0.1.1" />
paket add FSharp.Control.TaskSeq --version 0.1.1
#r "nuget: FSharp.Control.TaskSeq, 0.1.1"
// Install FSharp.Control.TaskSeq as a Cake Addin #addin nuget:?package=FSharp.Control.TaskSeq&version=0.1.1 // Install FSharp.Control.TaskSeq as a Cake Tool #tool nuget:?package=FSharp.Control.TaskSeq&version=0.1.1
TaskSeq
An implementation IAsyncEnumerable<'T>
as a taskSeq
CE for F# with accompanying TaskSeq
module.
The IAsyncEnumerable
interface was added to .NET in .NET Core 3.0
and is part of .NET Standard 2.1
. The main use-case was for iterative asynchronous enumeration over some resource. For instance, an event stream or a REST API interface with pagination, where each page is a MoveNextAsync
call on the IAsyncEnumerator<'T>
given by a call to GetAsyncEnumerator()
. It has been relatively challenging to work properly with this type and dealing with each step being asynchronous, and the enumerator implementing IAsyncDisposable
as well, which requires careful handling.
Implementation progress
The resumable state machine backing the taskSeq
CE is considered stable. While bugs are always possible, we will mostly focus on adding functionality there, like adding more useful overloads for yield
and let!
. Suggestions are welcome!
We are working hard on getting a full set of module functions on TaskSeq
that can be used with IAsyncEnumerable
sequences. Our guide is the set of F# Seq
functions in F# Core and, where applicable, the functions provided from AsyncSeq
. Each implemented function is documented through XML doc comments to provide the necessary context-sensitive help.
The following is the progress report:
TODO!
Futher reading IAsyncEnumerable
- A good C#-based introduction can be found in this blog.
- An MSDN article written shortly after it was introduced.
- Converting a
seq
to anIAsyncEnumerable
demo gist as an example, thoughTaskSeq
contains many more utility functions and uses a slightly different approach. - If you're looking for using
IAsyncEnumerable
withasync
and nottask
, the excellentAsyncSeq
library should be used. WhileTaskSeq
is intended to consumeasync
just liketask
does, it won't create anAsyncSeq
type (at least not yet). If you want classic Async and parallelism, you should get this library instead.
Futher reading on resumable state machines
- A state machine from a monadic perspective in F# can be found here, which works with the pre-F# 6.0 non-resumable internals.
- The original RFC for F# 6.0 on resumable state machines
- The original RFC for introducing
task
to F# 6.0. - A pre F# 6.0
TaskBuilder
that motivated thetask
CE later added to F# Core. - MSDN Documentation on
task
andasync
.
Further reading on computation expressions
- Docs on MSDN form a good summary and starting point.
- Arguably the best step-by-step tutorial to using and building computation expressions by Scott Wlaschin.
Building & testing
TLDR: just run build
. Or load the sln
file in Visual Studio or VS Code and compile.
Prerequisites
- .NET 6 or .NET 7 Preview
- F# 6.0 or 7.0 compiler
- To use
build.cmd
, thedotnet
command must be accessible from your path.
Just check-out this repo locally. Then, from the root of the repo, you can do:
Build the solution
build [build] [release|debug]
With no arguments, defaults to release
.
Run the tests
build test [release|debug]
With no arguments, defaults to release
. By default, all tests are output to the console. If you don't want that, you can use --logger console;verbosity=summary
.
Furthermore, no TRX file is generated and the --blame-xxx
flags aren't set.
Run the CI command
build ci [release|debug]
With no arguments, defaults to release
. This will run dotnet test
with the --blame-xxx
settings enabled to prevent hanging tests caused by
an xUnit runner bug.
There are no special CI environment variables that need to be set for running this locally.
Advanced
You can pass any additional options that are valid for dotnet test
and dotnet build
respectively. However,
these cannot be the very first argument, so you should either use build build --myadditionalOptions fizz buzz
, or
just specify the build-kind, i.e. this is fine:
build debug --verbosity detailed
build test --logger console;verbosity=summary
At this moment, additional options cannot have quotes in them.
Command modifiers, like release
and debug
, can be specified with -
or /
if you so prefer: dotnet build /release
.
Get help (duh!)
build help
For more info, see this PR: https://github.com/abelbraaksma/TaskSeq/pull/29.
In progress!!!
It's based on Don Symes taskSeq.fs
but expanded with useful utility functions and a few extra binding overloads.
Short-term feature planning
Not necessarily in order of importance:
- A minimal base set of useful functions and sensible CE overloads, like
map
,collect
,fold
,zip
. These functions will live in the moduleTaskSeq
. The CE will be calledtaskSeq
. - Packaging and publishing on Nuget
- Provide the same surface area of functions as
Seq
in F# Core - For each function, have a "normal" function, where the operator is non-async, and an async version. I.e.,
TaskSeq.map
andTaskSeq.mapAsync
, the difference being that themapper
function returns a#Task<'T>
in the second version. - Examples, documentation and tests
- Expand surface area based on user requests
- Improving the original code, adding benchmarks, and what have you.
Current set of TaskSeq
utility functions
The following is the current surface area of the TaskSeq
utility functions. This is just a dump of the signatures with doc comments
to be used as a quick ref.
module TaskSeq =
open System.Collections.Generic
open System.Threading.Tasks
open FSharp.Control.TaskSeqBuilders
/// Initialize an empty taskSeq.
val empty<'T> : taskSeq<'T>
/// <summary>
/// Returns <see cref="true" /> if the task sequence contains no elements, <see cref="false" /> otherwise.
/// </summary>
val isEmpty: taskSeq: taskSeq<'T> -> Task<bool>
/// Returns taskSeq as an array. This function is blocking until the sequence is exhausted and will properly dispose of the resources.
val toList: t: taskSeq<'T> -> 'T list
/// Returns taskSeq as an array. This function is blocking until the sequence is exhausted and will properly dispose of the resources.
val toArray: taskSeq: taskSeq<'T> -> 'T[]
/// Returns taskSeq as a seq, similar to Seq.cached. This function is blocking until the sequence is exhausted and will properly dispose of the resources.
val toSeqCached: taskSeq: taskSeq<'T> -> seq<'T>
/// Unwraps the taskSeq as a Task<array<_>>. This function is non-blocking.
val toArrayAsync: taskSeq: taskSeq<'T> -> Task<'T[]>
/// Unwraps the taskSeq as a Task<list<_>>. This function is non-blocking.
val toListAsync: taskSeq: taskSeq<'T> -> Task<'T list>
/// Unwraps the taskSeq as a Task<ResizeArray<_>>. This function is non-blocking.
val toResizeArrayAsync: taskSeq: taskSeq<'T> -> Task<ResizeArray<'T>>
/// Unwraps the taskSeq as a Task<IList<_>>. This function is non-blocking.
val toIListAsync: taskSeq: taskSeq<'T> -> Task<IList<'T>>
/// Unwraps the taskSeq as a Task<seq<_>>. This function is non-blocking,
/// exhausts the sequence and caches the results of the tasks in the sequence.
val toSeqCachedAsync: taskSeq: taskSeq<'T> -> Task<seq<'T>>
/// Create a taskSeq of an array.
val ofArray: array: 'T[] -> taskSeq<'T>
/// Create a taskSeq of a list.
val ofList: list: 'T list -> taskSeq<'T>
/// Create a taskSeq of a seq.
val ofSeq: sequence: seq<'T> -> taskSeq<'T>
/// Create a taskSeq of a ResizeArray, aka List.
val ofResizeArray: data: ResizeArray<'T> -> taskSeq<'T>
/// Create a taskSeq of a sequence of tasks, that may already have hot-started.
val ofTaskSeq: sequence: seq<#Task<'T>> -> taskSeq<'T>
/// Create a taskSeq of a list of tasks, that may already have hot-started.
val ofTaskList: list: #Task<'T> list -> taskSeq<'T>
/// Create a taskSeq of an array of tasks, that may already have hot-started.
val ofTaskArray: array: #Task<'T> array -> taskSeq<'T>
/// Create a taskSeq of a seq of async.
val ofAsyncSeq: sequence: seq<Async<'T>> -> taskSeq<'T>
/// Create a taskSeq of a list of async.
val ofAsyncList: list: Async<'T> list -> taskSeq<'T>
/// Create a taskSeq of an array of async.
val ofAsyncArray: array: Async<'T> array -> taskSeq<'T>
/// Iterates over the taskSeq applying the action function to each item. This function is non-blocking
/// exhausts the sequence as soon as the task is evaluated.
val iter: action: ('T -> unit) -> taskSeq: taskSeq<'T> -> Task<unit>
/// Iterates over the taskSeq applying the action function to each item. This function is non-blocking,
/// exhausts the sequence as soon as the task is evaluated.
val iteri: action: (int -> 'T -> unit) -> taskSeq: taskSeq<'T> -> Task<unit>
/// Iterates over the taskSeq applying the async action to each item. This function is non-blocking
/// exhausts the sequence as soon as the task is evaluated.
val iterAsync: action: ('T -> #Task<unit>) -> taskSeq: taskSeq<'T> -> Task<unit>
/// Iterates over the taskSeq, applying the async action to each item. This function is non-blocking,
/// exhausts the sequence as soon as the task is evaluated.
val iteriAsync: action: (int -> 'T -> #Task<unit>) -> taskSeq: taskSeq<'T> -> Task<unit>
/// Maps over the taskSeq, applying the mapper function to each item. This function is non-blocking.
val map: mapper: ('T -> 'U) -> taskSeq: taskSeq<'T> -> taskSeq<'U>
/// Maps over the taskSeq with an index, applying the mapper function to each item. This function is non-blocking.
val mapi: mapper: (int -> 'T -> 'U) -> taskSeq: taskSeq<'T> -> taskSeq<'U>
/// Maps over the taskSeq, applying the async mapper function to each item. This function is non-blocking.
val mapAsync: mapper: ('T -> #Task<'U>) -> taskSeq: taskSeq<'T> -> taskSeq<'U>
/// Maps over the taskSeq with an index, applying the async mapper function to each item. This function is non-blocking.
val mapiAsync: mapper: (int -> 'T -> #Task<'U>) -> taskSeq: taskSeq<'T> -> taskSeq<'U>
/// Applies the given function to the items in the taskSeq and concatenates all the results in order.
val collect: binder: ('T -> #taskSeq<'U>) -> taskSeq: taskSeq<'T> -> taskSeq<'U>
/// Applies the given function to the items in the taskSeq and concatenates all the results in order.
val collectSeq: binder: ('T -> #seq<'U>) -> taskSeq: taskSeq<'T> -> taskSeq<'U>
/// Applies the given async function to the items in the taskSeq and concatenates all the results in order.
val collectAsync: binder: ('T -> #Task<'TSeqU>) -> taskSeq: taskSeq<'T> -> taskSeq<'U> when 'TSeqU :> taskSeq<'U>
/// Applies the given async function to the items in the taskSeq and concatenates all the results in order.
val collectSeqAsync: binder: ('T -> #Task<'SeqU>) -> taskSeq: taskSeq<'T> -> taskSeq<'U> when 'SeqU :> seq<'U>
/// <summary>
/// Returns the first element of the <see cref="IAsyncEnumerable" />, or <see cref="None" /> if the sequence is empty.
/// </summary>
/// <exception cref="ArgumentException">Thrown when the sequence is empty.</exception>
val tryHead: taskSeq: taskSeq<'T> -> Task<'T option>
/// <summary>
/// Returns the first element of the <see cref="IAsyncEnumerable" />.
/// </summary>
/// <exception cref="ArgumentException">Thrown when the sequence is empty.</exception>
val head: taskSeq: taskSeq<'T> -> Task<'T>
/// <summary>
/// Returns the last element of the <see cref="IAsyncEnumerable" />, or <see cref="None" /> if the sequence is empty.
/// </summary>
/// <exception cref="ArgumentException">Thrown when the sequence is empty.</exception>
val tryLast: taskSeq: taskSeq<'T> -> Task<'T option>
/// <summary>
/// Returns the last element of the <see cref="IAsyncEnumerable" />.
/// </summary>
/// <exception cref="ArgumentException">Thrown when the sequence is empty.</exception>
val last: taskSeq: taskSeq<'T> -> Task<'T>
/// <summary>
/// Returns the nth element of the <see cref="IAsyncEnumerable" />, or <see cref="None" /> if the sequence
/// does not contain enough elements, or if <paramref name="index" /> is negative.
/// Parameter <paramref name="index" /> is zero-based, that is, the value 0 returns the first element.
/// </summary>
val tryItem: index: int -> taskSeq: taskSeq<'T> -> Task<'T option>
/// <summary>
/// Returns the nth element of the <see cref="IAsyncEnumerable" />, or <see cref="None" /> if the sequence
/// does not contain enough elements, or if <paramref name="index" /> is negative.
/// </summary>
/// <exception cref="ArgumentException">Thrown when the sequence has insufficient length or
/// <paramref name="index" /> is negative.</exception>
val item: index: int -> taskSeq: taskSeq<'T> -> Task<'T>
/// <summary>
/// Returns the only element of the task sequence, or <see cref="None" /> if the sequence is empty of
/// contains more than one element.
/// </summary>
val tryExactlyOne: source: taskSeq<'T> -> Task<'T option>
/// <summary>
/// Returns the only element of the task sequence.
/// </summary>
/// <exception cref="ArgumentException">Thrown when the input sequence does not contain precisely one element.</exception>
val exactlyOne: source: taskSeq<'T> -> Task<'T>
/// <summary>
/// Applies the given function <paramref name="chooser" /> to each element of the task sequence. Returns
/// a sequence comprised of the results "x" for each element where
/// the function returns <c>Some(x)</c>.
/// If <paramref name="chooser" /> is asynchronous, consider using <see cref="TaskSeq.chooseAsync" />.
/// </summary>
val choose: chooser: ('T -> 'U option) -> source: taskSeq<'T> -> taskSeq<'U>
/// <summary>
/// Applies the given asynchronous function <paramref name="chooser" /> to each element of the task sequence. Returns
/// a sequence comprised of the results "x" for each element where
/// the function returns <see cref="Some(x)" />.
/// If <paramref name="chooser" /> does not need to be asynchronous, consider using <see cref="TaskSeq.choose" />.
/// </summary>
val chooseAsync: chooser: ('T -> #Task<'U option>) -> source: taskSeq<'T> -> taskSeq<'U>
/// <summary>
/// Returns a new collection containing only the elements of the collection
/// for which the given <paramref name="predicate" /> function returns <see cref="true" />.
/// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.filterAsync" />.
/// </summary>
val filter: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T>
/// <summary>
/// Returns a new collection containing only the elements of the collection
/// for which the given asynchronous function <paramref name="predicate" /> returns <see cref="true" />.
/// If <paramref name="predicate" /> does not need to be asynchronous, consider using <see cref="TaskSeq.filter" />.
/// </summary>
val filterAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> taskSeq<'T>
/// <summary>
/// Applies the given function <paramref name="chooser" /> to successive elements of the task sequence
/// in <paramref name="source" />, returning the first result where the function returns <see cref="Some(x)" />.
/// If <paramref name="chooser" /> is asynchronous, consider using <see cref="TaskSeq.tryPickAsync" />.
/// </summary>
val tryPick: chooser: ('T -> 'U option) -> source: taskSeq<'T> -> Task<'U option>
/// <summary>
/// Applies the given asynchronous function <paramref name="chooser" /> to successive elements of the task sequence
/// in <paramref name="source" />, returning the first result where the function returns <see cref="Some(x)" />.
/// If <paramref name="chooser" /> does not need to be asynchronous, consider using <see cref="TaskSeq.tryPick" />.
/// </summary>
val tryPickAsync: chooser: ('T -> #Task<'U option>) -> source: taskSeq<'T> -> Task<'U option>
/// <summary>
/// Returns the first element of the task sequence in <paramref name="source" /> for which the given function
/// <paramref name="predicate" /> returns <see cref="true" />. Returns <see cref="None" /> if no such element exists.
/// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.tryFindAsync" />.
/// </summary>
val tryFind: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<'T option>
/// <summary>
/// Returns the first element of the task sequence in <paramref name="source" /> for which the given asynchronous function
/// <paramref name="predicate" /> returns <see cref="true" />. Returns <see cref="None" /> if no such element exists.
/// If <paramref name="predicate" /> does not need to be asynchronous, consider using <see cref="TaskSeq.tryFind" />.
/// </summary>
val tryFindAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<'T option>
/// <summary>
/// Applies the given function <paramref name="chooser" /> to successive elements of the task sequence
/// in <paramref name="source" />, returning the first result where the function returns <see cref="Some(x)" />.
/// If <paramref name="chooser" /> is asynchronous, consider using <see cref="TaskSeq.pickAsync" />.
/// <exception cref="KeyNotFoundException">Thrown when every item of the sequence
/// evaluates to <see cref="None" /> when the given function is applied.</exception>
/// </summary>
val pick: chooser: ('T -> 'U option) -> source: taskSeq<'T> -> Task<'U>
/// <summary>
/// Applies the given asynchronous function <paramref name="chooser" /> to successive elements of the task sequence
/// in <paramref name="source" />, returning the first result where the function returns <see cref="Some(x)" />.
/// If <paramref name="chooser" /> does not need to be asynchronous, consider using <see cref="TaskSeq.pick" />.
/// <exception cref="KeyNotFoundException">Thrown when every item of the sequence
/// evaluates to <see cref="None" /> when the given function is applied.</exception>
/// </summary>
val pickAsync: chooser: ('T -> #Task<'U option>) -> source: taskSeq<'T> -> Task<'U>
/// <summary>
/// Returns the first element of the task sequence in <paramref name="source" /> for which the given function
/// <paramref name="predicate" /> returns <see cref="true" />.
/// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.findAsync" />.
/// </summary>
/// <exception cref="KeyNotFoundException">Thrown if no element returns <see cref="true" /> when
/// evaluated by the <paramref name="predicate" /> function.</exception>
val find: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<'T>
/// <summary>
/// Returns the first element of the task sequence in <paramref name="source" /> for which the given
/// asynchronous function <paramref name="predicate" /> returns <see cref="true" />.
/// If <paramref name="predicate" /> does not need to be asynchronous, consider using <see cref="TaskSeq.find" />.
/// </summary>
/// <exception cref="KeyNotFoundException">Thrown if no element returns <see cref="true" /> when
/// evaluated by the <paramref name="predicate" /> function.</exception>
val findAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<'T>
/// <summary>
/// Zips two task sequences, returning a taskSeq of the tuples of each sequence, in order. May raise ArgumentException
/// if the sequences are or unequal length.
/// </summary>
/// <exception cref="ArgumentException">The sequences have different lengths.</exception>
val zip: taskSeq1: taskSeq<'T> -> taskSeq2: taskSeq<'U> -> IAsyncEnumerable<'T * 'U>
/// <summary>
/// Applies the function <paramref name="folder" /> to each element in the task sequence,
/// threading an accumulator argument of type <paramref name="'State" /> through the computation.
/// If the accumulator function <paramref name="folder" /> is asynchronous, consider using <see cref="TaskSeq.foldAsync" />.
/// </summary>
val fold: folder: ('State -> 'T -> 'State) -> state: 'State -> taskSeq: taskSeq<'T> -> Task<'State>
/// <summary>
/// Applies the asynchronous function <paramref name="folder" /> to each element in the task sequence,
/// threading an accumulator argument of type <paramref name="'State" /> through the computation.
/// If the accumulator function <paramref name="folder" /> does not need to be asynchronous, consider using <see cref="TaskSeq.fold" />.
/// </summary>
val foldAsync: folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> taskSeq: taskSeq<'T> -> Task<'State>
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. |
-
net6.0
- FSharp.Core (>= 6.0.5)
NuGet packages (12)
Showing the top 5 NuGet packages that depend on FSharp.Control.TaskSeq:
Package | Downloads |
---|---|
Propulsion
Efficient event streaming pipelines |
|
Equinox.CosmosStore
Efficient event sourced decisions and data |
|
Propulsion.Feed
Efficient event streaming pipelines |
|
Equinox.EventStore
Efficient event sourced decisions and data |
|
Equinox.DynamoStore
Efficient event sourced decisions and data |
GitHub repositories (1)
Showing the top 1 popular GitHub repositories that depend on FSharp.Control.TaskSeq:
Repository | Stars |
---|---|
J-Tech-Japan/Sekiban
Sekiban - an Opinionated Event Sourcing and CQRS Framework using C#. It can store data into Azure Cosmos DB, AWS Dynamo DB or Postgres
|