Class ExtractorBase<TSource, TProgress>

Namespace
Wolfgang.Etl.Abstractions
Assembly
Wolfgang.Etl.Abstractions.dll

Provides a basic implementation for data extractors that extract data of type TSource. Library authors can use this base class to create custom extractors by inheriting from it and implementing ExtractWorkerAsync and CreateProgressReport methods.

public abstract class ExtractorBase<TSource, TProgress> : IExtractWithProgressAndCancellationAsync<TSource, TProgress>, IExtractWithCancellationAsync<TSource>, IExtractWithProgressAsync<TSource, TProgress>, IExtractAsync<TSource> where TSource : notnull where TProgress : notnull

Type Parameters

TSource

The type of the object being extracted

TProgress

The type of the progress object

Inheritance
ExtractorBase<TSource, TProgress>
Implements
IExtractWithProgressAsync<TSource, TProgress>
IExtractAsync<TSource>
Inherited Members

Properties

CurrentItemCount

The current number of items extracted so far.

public int CurrentItemCount { get; }

Property Value

int

Remarks

It is the responsibility of the derived class to call IncrementCurrentItemCount() as each item is extracted. The base class has no way of knowing when an item has been processed.

CurrentSkippedItemCount

The current number of items skipped so far during extraction.

public int CurrentSkippedItemCount { get; }

Property Value

int

MaximumItemCount

The maximum number of items to extract. Once the extractor has reached this limit, it should stop extracting and signal the end of the sequence.

public int MaximumItemCount { get; set; }

Property Value

int

Examples

var count = 0;
using (var reader = new StreamReader(filePath))
{
    while (!reader.EndOfStream)
    {
        yield return await reader.ReadLineAsync();
        count++;
        if (count >= MaximumItemCount)
        {
            Console.WriteLine("Maximum item count reached. Stopping extraction.");
            break; // Stop extracting if the maximum item count is reached
        }
    }
}

Remarks

This is useful for partially extracting data from a source, especially when the source is large or infinite or during development.

Exceptions

ArgumentOutOfRangeException

The specified value is less than 1.

ReportingInterval

The number of milliseconds between progress updates.

public int ReportingInterval { get; set; }

Property Value

int

Exceptions

ArgumentOutOfRangeException

Value cannot be less than 1.

SkipItemCount

The number of items to skip before extracting. The extractor should skip the specified number of items before starting to yield results.

public int SkipItemCount { get; set; }

Property Value

int

Examples

using (var reader = new StreamReader(filePath))
{
    // Skip the specified number of items before starting to yield results

    var skipCount = 0;
    while (!reader.EndOfStream && skipCount < SkipItemCount)
    {
        await reader.ReadLineAsync();
        skipCount++;
    }

    // Now start yielding results

    var count = 0;
    while (!reader.EndOfStream)
    {
        yield return await reader.ReadLineAsync();
        count++;
    }
}

Remarks

This is useful for partially extracting data from a source during development, or to skip items that were already processed or are not relevant for the current extraction.

Exceptions

ArgumentOutOfRangeException

The specified value is less than 0.

Methods

CreateProgressReport()

Creates a progress report of type TProgress. This gives the derived class the opportunity to implement a custom progress report that is specific to the extraction process.

protected abstract TProgress CreateProgressReport()

Returns

TProgress

Progress of type TProgress

CreateProgressTimer(IProgress<TProgress>)

Creates the IProgressTimer used to drive progress callbacks. Override this method in a derived class to inject a custom timer (for example, a custom implementation that allows manual control in unit tests).

protected virtual IProgressTimer CreateProgressTimer(IProgress<TProgress> progress)

Parameters

progress IProgress<TProgress>

The progress sink that will receive callbacks.

Returns

IProgressTimer

A started IProgressTimer instance.

ExtractAsync()

Asynchronously extracts data of type TSource from a source.

public virtual IAsyncEnumerable<TSource> ExtractAsync()

Returns

IAsyncEnumerable<TSource>

IAsyncEnumerable<TSource> The result may be an empty sequence if no data is available or if the extraction fails.

ExtractAsync(IProgress<TProgress>)

Asynchronously extracts data of type TSource from a source.

public virtual IAsyncEnumerable<TSource> ExtractAsync(IProgress<TProgress> progress)

Parameters

progress IProgress<TProgress>

A provider for progress updates.

Returns

IAsyncEnumerable<TSource>

IAsyncEnumerable<TSource> The result may be an empty sequence if no data is available or if the extraction fails.

Exceptions

ArgumentNullException

The value of progress is null

ExtractAsync(IProgress<TProgress>, CancellationToken)

Asynchronously extracts data of type TSource from a source.

public virtual IAsyncEnumerable<TSource> ExtractAsync(IProgress<TProgress> progress, CancellationToken token)

Parameters

progress IProgress<TProgress>

A provider for progress updates.

token CancellationToken

A CancellationToken to observe while waiting for the task to complete.

Returns

IAsyncEnumerable<TSource>

IAsyncEnumerable<TSource> The result may be an empty sequence if no data is available or if the extraction fails.

Remarks

The extractor should be able to handle cancellation requests gracefully. If the caller doesn't plan on cancelling the extraction, they can pass CancellationToken.None.

Exceptions

ArgumentNullException

The value of progress is null

ExtractAsync(CancellationToken)

Asynchronously extracts data of type TSource from a source.

public virtual IAsyncEnumerable<TSource> ExtractAsync(CancellationToken token)

Parameters

token CancellationToken

A CancellationToken to observe while waiting for the task to complete.

Returns

IAsyncEnumerable<TSource>

IAsyncEnumerable<TSource> The result may be an empty sequence if no data is available or if the extraction fails.

Remarks

The extractor should be able to handle cancellation requests gracefully. If the caller doesn't plan on cancelling the extraction, they can pass CancellationToken.None.

ExtractWorkerAsync(CancellationToken)

This method is the core implementation of the extraction logic and should be overridden by derived classes.

protected abstract IAsyncEnumerable<TSource> ExtractWorkerAsync(CancellationToken token)

Parameters

token CancellationToken

A CancellationToken to observe while waiting for the task to complete.

Returns

IAsyncEnumerable<TSource>

IAsyncEnumerable<TSource> The result may be an empty sequence if no data is available or if the extraction fails.

IncrementCurrentItemCount()

Increments the CurrentItemCount in a thread safe manner.

[SuppressMessage("IDE0058", "IDE0058:Expression value is never used", Justification = "Interlocked.Increment return value intentionally discarded; only the side-effect matters.")]
protected void IncrementCurrentItemCount()

Remarks

Simply calling CurrentItemCount++ or CurrentItemCount += 1 is not thread safe. This method ensures that CurrentItemCount is incremented safely.

IncrementCurrentSkippedItemCount()

Increments the CurrentSkippedItemCount in a thread safe manner.

[SuppressMessage("IDE0058", "IDE0058:Expression value is never used", Justification = "Interlocked.Increment return value intentionally discarded; only the side-effect matters.")]
protected void IncrementCurrentSkippedItemCount()

Remarks

Simply calling CurrentSkippedItemCount++ or CurrentSkippedItemCount += 1 is not thread safe. This method ensures that CurrentSkippedItemCount is incremented safely.