Reactive.AzureStorage.Table
1.0.0
dotnet add package Reactive.AzureStorage.Table --version 1.0.0
NuGet\Install-Package Reactive.AzureStorage.Table -Version 1.0.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Reactive.AzureStorage.Table" Version="1.0.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Reactive.AzureStorage.Table --version 1.0.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Reactive.AzureStorage.Table, 1.0.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Reactive.AzureStorage.Table as a Cake Addin #addin nuget:?package=Reactive.AzureStorage.Table&version=1.0.0 // Install Reactive.AzureStorage.Table as a Cake Tool #tool nuget:?package=Reactive.AzureStorage.Table&version=1.0.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
Reactive.AzurePackages
A reactive interface for Azure storage SDK - a natural programming model for asynchronous data streams.
Features
Azure Storage
Table
While working with azure storage, its throughput has to be taken into consideration and APIs need to be designed to enable and encourage better utilization of threads. The current throughput limitation on azure table storage is upto 2000 entities per second for single table partition with 1KiB entities - [AzureStorageScalabilityandPerformanceTargets] (https://docs.microsoft.com/en-us/azure/storage/common/storage-scalability-targets). This will further be affected by the partitioning node server distribution, network bandwidth, message size and the table query.
All these factors results into non deterministic nature of the table operation and the timeline. Azure provides an indicator/tracking pointer on the table operation as continuous-token along with intermediate results. As long as it is having a valid pointer (from where next operation - say a 'read' begins), the operation is not yet completed. With this approach, the operation results will be provided as packets/chunks over a period of time. As the each chunk requires a network round trip and should move linearly on the line connected by the token pointers, processing a chunk soon after it arrives improves performance and uses CPU cores optimally. Providing table-operation-result chunk as a reactive stream will build the scaffolding structure for smooth flow of the processing pipeline.
Example
public async Task BulkReadTest()
{
var communicator = new Communicator(_accountName, _accountKey);
var tableQuery = new TableQuery<CustomerEntity>()
.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.GreaterThan, "Partition"));
var results = await communicator.ReadAsync("USA", tableQuery) // Process the table operation continuously on a separate thread
.SelectMany(e => GetCustomer(e)) // Process the intermediate results on a separate thread soon after it becomes available
.ToArray();
}
private async Task<Customer> GetCustomer(CustomerEntity e)
{
await Task.Delay(5); // simulated delay
return new Customer { CustomerId = e.CustomerId, FirstName = e.FirstName, LastName = e.LastName };
}
The reactive stream can further be extended all the way up, like
public IObservable<Customer> GetGoldCustomers()
{
var communicator = new Communicator(_accountName, _accountKey);
var tableQuery = new TableQuery<CustomerEntity>()
.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.GreaterThan, "Partition"));
return communicator.ReadAsync("USA", tableQuery)
.SelectMany(e => GetCustomer(e))
.SelectMany(c => IsValidCustomerService(c))
.Where(tple => tple.Item2 == true)
.SelectMany(tple => IsGold(tple.Item1))
.Where(tple => tple.Item2 == true)
.Select(tple => tple.Item1);
}
private async Task<(Customer,bool)> IsValidCustomerService(Customer c) => await Task.Delay(5).ContinueWith(_ => (c,true));
private async Task<(Customer, bool)> IsGold(Customer c) => await Task.Delay(5).ContinueWith(_ => (c, true));
This stream based programming model helps join query based deletion of entities like
public async Task Delete()
{
var communicator = new Communicator(_accountName, _accountKey);
var tableQuery = new TableQuery<CustomerEntity>()
.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.GreaterThan, "DateRange"));
var results = await communicator.ReadAsync("USA", tableQuery)
.Buffer(100)
.SelectMany(ets => communicator.BatchDeleteAsync("Table", ets.ToArray()))
.ToArray();
}
Important Points
As this is considered as the tight skin on top of the WindowsAzure.Storage
sdk, it doesn't take additional responsibilities on input data. It is a pass through layer and provides reactive api to the caller. Additional customized layers can be
added as decorators to this.
- Validation of parameters like null or empty table name are delegated to sdk
- Validation of common partition key and batch size of 100 is delegated to sdk.
- Utilities are provided to serialize and deserialzie business entities to/from table entities using
TableEntity
class. Test project contains examples of its usage - Using
DataFlow
blocks with reactive streams results in more modular models
License
This software is open source
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
.NETStandard 2.0
- System.Reactive (>= 3.1.1)
- WindowsAzure.Storage (>= 9.2.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
1.0.0 | 25,456 | 5/27/2018 |