Class JsonLineExtractor<TRecord>

Namespace
Wolfgang.Etl.Json
Assembly
Wolfgang.Etl.Json.dll

Extracts items of type TRecord from a JSONL (JSON Lines / NDJSON) stream.

public sealed class JsonLineExtractor<TRecord> : ExtractorBase<TRecord, JsonReport>, IExtractWithProgressAndCancellationAsync<TRecord, JsonReport>, IExtractWithCancellationAsync<TRecord>, IExtractWithProgressAsync<TRecord, JsonReport>, IExtractAsync<TRecord> where TRecord : notnull

Type Parameters

TRecord

The type of items to extract. Must be notnull.

Inheritance
ExtractorBase<TRecord, JsonReport>
JsonLineExtractor<TRecord>
Implements
IExtractWithProgressAndCancellationAsync<TRecord, JsonReport>
IExtractWithCancellationAsync<TRecord>
IExtractWithProgressAsync<TRecord, JsonReport>
IExtractAsync<TRecord>
Inherited Members
ExtractorBase<TRecord, JsonReport>.ExtractAsync()
ExtractorBase<TRecord, JsonReport>.ReportingInterval
ExtractorBase<TRecord, JsonReport>.CurrentItemCount
ExtractorBase<TRecord, JsonReport>.CurrentSkippedItemCount
ExtractorBase<TRecord, JsonReport>.MaximumItemCount
ExtractorBase<TRecord, JsonReport>.SkipItemCount

Examples

using var stream = File.OpenRead("data.jsonl");
var extractor = new JsonLineExtractor<Person>(stream);
await foreach (var person in extractor.ExtractAsync(cancellationToken))
{
    Console.WriteLine(person.Name);
}

Remarks

Reads a stream, line by line, deserializing each non-empty line as a single JSON object. Blank lines are skipped with a warning logged. Compatible with both JSONL and NDJSON formats.

Constructors

JsonLineExtractor(Stream)

Initializes a new instance of the JsonLineExtractor<TRecord> class.

public JsonLineExtractor(Stream stream)

Parameters

stream Stream

The stream containing JSONL data to read from.

Exceptions

ArgumentNullException

Thrown when stream is null.

JsonLineExtractor(Stream, ILogger<JsonLineExtractor<TRecord>>)

Initializes a new instance of the JsonLineExtractor<TRecord> class with diagnostic logging.

public JsonLineExtractor(Stream stream, ILogger<JsonLineExtractor<TRecord>> logger)

Parameters

stream Stream

The stream containing JSONL data to read from.

logger ILogger<JsonLineExtractor<TRecord>>

The logger instance for diagnostic output.

Exceptions

ArgumentNullException

Thrown when stream or logger is null.

JsonLineExtractor(Stream, JsonSerializerOptions, ILogger<JsonLineExtractor<TRecord>>?)

Initializes a new instance of the JsonLineExtractor<TRecord> class with custom serialization options.

public JsonLineExtractor(Stream stream, JsonSerializerOptions options, ILogger<JsonLineExtractor<TRecord>>? logger = null)

Parameters

stream Stream

The stream containing JSONL data to read from.

options JsonSerializerOptions

The JSON serializer options to use for deserialization.

logger ILogger<JsonLineExtractor<TRecord>>

An optional logger instance for diagnostic output.

Exceptions

ArgumentNullException

Thrown when stream or options is null.

JsonLineExtractor(Stream, JsonTypeInfo<TRecord>, ILogger<JsonLineExtractor<TRecord>>?)

Initializes a new instance of the JsonLineExtractor<TRecord> class with source-generated type metadata for AOT-friendly, reflection-free deserialization.

public JsonLineExtractor(Stream stream, JsonTypeInfo<TRecord> typeInfo, ILogger<JsonLineExtractor<TRecord>>? logger = null)

Parameters

stream Stream

The stream containing JSONL data to read from.

typeInfo JsonTypeInfo<TRecord>

The source-generated type metadata for TRecord.

logger ILogger<JsonLineExtractor<TRecord>>

An optional logger instance for diagnostic output.

Exceptions

ArgumentNullException

Thrown when stream or typeInfo is null.

Methods

CreateProgressReport()

Creates a progress report of type TProgress. This gives the derived class the opportunity to implement a custom progress report that is specific to the extraction process.

protected override JsonReport CreateProgressReport()

Returns

JsonReport

Progress of type TProgress

CreateProgressTimer(IProgress<JsonReport>)

Creates the Wolfgang.Etl.Abstractions.IProgressTimer used to drive progress callbacks. Override this method in a derived class to inject a custom timer (for example, a custom implementation that allows manual control in unit tests).

protected override IProgressTimer CreateProgressTimer(IProgress<JsonReport> progress)

Parameters

progress IProgress<JsonReport>

The progress sink that will receive callbacks.

Returns

IProgressTimer

A started Wolfgang.Etl.Abstractions.IProgressTimer instance.

ExtractWorkerAsync(CancellationToken)

This method is the core implementation of the extraction logic and should be overridden by derived classes.

protected override IAsyncEnumerable<TRecord> ExtractWorkerAsync(CancellationToken token)

Parameters

token CancellationToken

A CancellationToken to observe while waiting for the task to complete.

Returns

IAsyncEnumerable<TRecord>

IAsyncEnumerable<TSource> The result may be an empty sequence if no data is available or if the extraction fails.