Class JsonMultiStreamLoader<TRecord>

Namespace
Wolfgang.Etl.Json
Assembly
Wolfgang.Etl.Json.dll

Loads items of type TRecord into multiple streams, writing one JSON object per stream.

public sealed class JsonMultiStreamLoader<TRecord> : LoaderBase<TRecord, JsonReport>, ILoadWithProgressAndCancellationAsync<TRecord, JsonReport>, ILoadWithProgressAsync<TRecord, JsonReport>, ILoadWithCancellationAsync<TRecord>, ILoadAsync<TRecord> where TRecord : notnull

Type Parameters

TRecord

The type of items to load. Must be notnull.

Inheritance
LoaderBase<TRecord, JsonReport>
JsonMultiStreamLoader<TRecord>
Implements
ILoadWithProgressAndCancellationAsync<TRecord, JsonReport>
ILoadWithProgressAsync<TRecord, JsonReport>
ILoadWithCancellationAsync<TRecord>
ILoadAsync<TRecord>
Inherited Members
LoaderBase<TRecord, JsonReport>.ReportingInterval
LoaderBase<TRecord, JsonReport>.CurrentItemCount
LoaderBase<TRecord, JsonReport>.CurrentSkippedItemCount
LoaderBase<TRecord, JsonReport>.MaximumItemCount
LoaderBase<TRecord, JsonReport>.SkipItemCount

Examples

var loader = new JsonMultiStreamLoader<Person>
(
    person => File.Create($"output/{person.Id}.json"),
    logger
);
await loader.LoadAsync(items, cancellationToken);

Remarks

For each item in the input sequence, calls a factory function to obtain a Stream, serializes the item as a single JSON object, and disposes the stream. The factory receives the item being written, allowing stream creation based on item properties (e.g., generating file names from record fields).

Constructors

JsonMultiStreamLoader(Func<TRecord, Stream>)

Initializes a new instance of the JsonMultiStreamLoader<TRecord> class.

public JsonMultiStreamLoader(Func<TRecord, Stream> streamFactory)

Parameters

streamFactory Func<TRecord, Stream>

A factory function that receives the item to be written and returns a Stream to write it to. The loader will dispose the stream after writing.

Exceptions

ArgumentNullException

Thrown when streamFactory is null.

JsonMultiStreamLoader(Func<TRecord, Stream>, ILogger<JsonMultiStreamLoader<TRecord>>)

Initializes a new instance of the JsonMultiStreamLoader<TRecord> class with diagnostic logging.

public JsonMultiStreamLoader(Func<TRecord, Stream> streamFactory, ILogger<JsonMultiStreamLoader<TRecord>> logger)

Parameters

streamFactory Func<TRecord, Stream>

A factory function that receives the item to be written and returns a Stream to write it to. The loader will dispose the stream after writing.

logger ILogger<JsonMultiStreamLoader<TRecord>>

The logger instance for diagnostic output.

Exceptions

ArgumentNullException

Thrown when streamFactory or logger is null.

JsonMultiStreamLoader(Func<TRecord, Stream>, JsonSerializerOptions, ILogger<JsonMultiStreamLoader<TRecord>>?)

Initializes a new instance of the JsonMultiStreamLoader<TRecord> class with custom serialization options.

public JsonMultiStreamLoader(Func<TRecord, Stream> streamFactory, JsonSerializerOptions options, ILogger<JsonMultiStreamLoader<TRecord>>? logger = null)

Parameters

streamFactory Func<TRecord, Stream>

A factory function that receives the item to be written and returns a Stream to write it to. The loader will dispose the stream after writing.

options JsonSerializerOptions

The JSON serializer options to use for serialization.

logger ILogger<JsonMultiStreamLoader<TRecord>>

An optional logger instance for diagnostic output.

Exceptions

ArgumentNullException

Thrown when streamFactory or options is null.

JsonMultiStreamLoader(Func<TRecord, Stream>, JsonTypeInfo<TRecord>, ILogger<JsonMultiStreamLoader<TRecord>>?)

Initializes a new instance of the JsonMultiStreamLoader<TRecord> class with source-generated type metadata for AOT-friendly, reflection-free serialization.

public JsonMultiStreamLoader(Func<TRecord, Stream> streamFactory, JsonTypeInfo<TRecord> typeInfo, ILogger<JsonMultiStreamLoader<TRecord>>? logger = null)

Parameters

streamFactory Func<TRecord, Stream>

A factory function that receives the item to be written and returns a Stream to write it to. The loader will dispose the stream after writing.

typeInfo JsonTypeInfo<TRecord>

The source-generated type metadata for TRecord.

logger ILogger<JsonMultiStreamLoader<TRecord>>

An optional logger instance for diagnostic output.

Exceptions

ArgumentNullException

Thrown when streamFactory or typeInfo is null.

Methods

CreateProgressReport()

Creates a progress report of type TProgress. This gives the derived class the opportunity to implement a custom progress report that is specific to the loading process.

protected override JsonReport CreateProgressReport()

Returns

JsonReport

Progress of type TProgress

CreateProgressTimer(IProgress<JsonReport>)

Creates the Wolfgang.Etl.Abstractions.IProgressTimer used to drive progress callbacks. Override this method in a derived class to inject a custom timer (for example, a custom implementation that allows manual control in unit tests).

protected override IProgressTimer CreateProgressTimer(IProgress<JsonReport> progress)

Parameters

progress IProgress<JsonReport>

The progress sink that will receive callbacks.

Returns

IProgressTimer

A started Wolfgang.Etl.Abstractions.IProgressTimer instance.

LoadWorkerAsync(IAsyncEnumerable<TRecord>, CancellationToken)

This method is the core implementation of the loading logic and should be overridden by derived classes.

protected override Task LoadWorkerAsync(IAsyncEnumerable<TRecord> items, CancellationToken token)

Parameters

items IAsyncEnumerable<TRecord>

The items to be loaded to the destination.

token CancellationToken

A CancellationToken to observe while waiting for the task to complete.

Returns

Task

A task representing the asynchronous operation.

Remarks

Items may be an empty sequence if no data is available or if the loading fails.

Exceptions

ArgumentNullException

Argument items is null