Class JsonMultiStreamExtractor<TRecord>

Namespace
Wolfgang.Etl.Json
Assembly
Wolfgang.Etl.Json.dll

Extracts items of type TRecord from multiple streams, reading one JSON object per stream.

public sealed class JsonMultiStreamExtractor<TRecord> : ExtractorBase<TRecord, JsonReport>, IExtractWithProgressAndCancellationAsync<TRecord, JsonReport>, IExtractWithCancellationAsync<TRecord>, IExtractWithProgressAsync<TRecord, JsonReport>, IExtractAsync<TRecord> where TRecord : notnull

Type Parameters

TRecord

The type of items to extract. Must be notnull.

Inheritance
ExtractorBase<TRecord, JsonReport>
JsonMultiStreamExtractor<TRecord>
Implements
IExtractWithProgressAndCancellationAsync<TRecord, JsonReport>
IExtractWithCancellationAsync<TRecord>
IExtractWithProgressAsync<TRecord, JsonReport>
IExtractAsync<TRecord>
Inherited Members
ExtractorBase<TRecord, JsonReport>.ExtractAsync()
ExtractorBase<TRecord, JsonReport>.ReportingInterval
ExtractorBase<TRecord, JsonReport>.CurrentItemCount
ExtractorBase<TRecord, JsonReport>.CurrentSkippedItemCount
ExtractorBase<TRecord, JsonReport>.MaximumItemCount
ExtractorBase<TRecord, JsonReport>.SkipItemCount

Examples

var streams = Directory.GetFiles("data/", "*.json").Select(File.OpenRead);
var extractor = new JsonMultiStreamExtractor<Person>(streams, logger);
await foreach (var person in extractor.ExtractAsync(cancellationToken))
{
    Console.WriteLine(person.Name);
}

Remarks

Iterates over an IEnumerable<T> of Stream instances, deserializing a single TRecord from each stream. Each stream is disposed after the item is read. Extraction stops when the enumerable is exhausted or Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.MaximumItemCount is reached.

Constructors

JsonMultiStreamExtractor(IEnumerable<Stream>)

Initializes a new instance of the JsonMultiStreamExtractor<TRecord> class.

public JsonMultiStreamExtractor(IEnumerable<Stream> streams)

Parameters

streams IEnumerable<Stream>

An enumerable of streams, each containing a single JSON object.

Exceptions

ArgumentNullException

Thrown when streams is null.

JsonMultiStreamExtractor(IEnumerable<Stream>, ILogger<JsonMultiStreamExtractor<TRecord>>)

Initializes a new instance of the JsonMultiStreamExtractor<TRecord> class with diagnostic logging.

public JsonMultiStreamExtractor(IEnumerable<Stream> streams, ILogger<JsonMultiStreamExtractor<TRecord>> logger)

Parameters

streams IEnumerable<Stream>

An enumerable of streams, each containing a single JSON object.

logger ILogger<JsonMultiStreamExtractor<TRecord>>

The logger instance for diagnostic output.

Exceptions

ArgumentNullException

Thrown when streams or logger is null.

JsonMultiStreamExtractor(IEnumerable<Stream>, JsonSerializerOptions, ILogger<JsonMultiStreamExtractor<TRecord>>?)

Initializes a new instance of the JsonMultiStreamExtractor<TRecord> class with custom serialization options.

public JsonMultiStreamExtractor(IEnumerable<Stream> streams, JsonSerializerOptions options, ILogger<JsonMultiStreamExtractor<TRecord>>? logger = null)

Parameters

streams IEnumerable<Stream>

An enumerable of streams, each containing a single JSON object.

options JsonSerializerOptions

The JSON serializer options to use for deserialization.

logger ILogger<JsonMultiStreamExtractor<TRecord>>

An optional logger instance for diagnostic output.

Exceptions

ArgumentNullException

Thrown when streams or options is null.

JsonMultiStreamExtractor(IEnumerable<Stream>, JsonTypeInfo<TRecord>, ILogger<JsonMultiStreamExtractor<TRecord>>?)

Initializes a new instance of the JsonMultiStreamExtractor<TRecord> class with source-generated type metadata for AOT-friendly, reflection-free deserialization.

public JsonMultiStreamExtractor(IEnumerable<Stream> streams, JsonTypeInfo<TRecord> typeInfo, ILogger<JsonMultiStreamExtractor<TRecord>>? logger = null)

Parameters

streams IEnumerable<Stream>

An enumerable of streams, each containing a single JSON object.

typeInfo JsonTypeInfo<TRecord>

The source-generated type metadata for TRecord.

logger ILogger<JsonMultiStreamExtractor<TRecord>>

An optional logger instance for diagnostic output.

Exceptions

ArgumentNullException

Thrown when streams or typeInfo is null.

Methods

CreateProgressReport()

Creates a progress report of type TProgress. This gives the derived class the opportunity to implement a custom progress report that is specific to the extraction process.

protected override JsonReport CreateProgressReport()

Returns

JsonReport

Progress of type TProgress

CreateProgressTimer(IProgress<JsonReport>)

Creates the Wolfgang.Etl.Abstractions.IProgressTimer used to drive progress callbacks. Override this method in a derived class to inject a custom timer (for example, a custom implementation that allows manual control in unit tests).

protected override IProgressTimer CreateProgressTimer(IProgress<JsonReport> progress)

Parameters

progress IProgress<JsonReport>

The progress sink that will receive callbacks.

Returns

IProgressTimer

A started Wolfgang.Etl.Abstractions.IProgressTimer instance.

ExtractWorkerAsync(CancellationToken)

This method is the core implementation of the extraction logic and should be overridden by derived classes.

protected override IAsyncEnumerable<TRecord> ExtractWorkerAsync(CancellationToken token)

Parameters

token CancellationToken

A CancellationToken to observe while waiting for the task to complete.

Returns

IAsyncEnumerable<TRecord>

IAsyncEnumerable<TSource> The result may be an empty sequence if no data is available or if the extraction fails.