Class ExtractorBase<TSource, TProgress>
- Namespace
- Wolfgang.Etl.Abstractions
- Assembly
- Wolfgang.Etl.Abstractions.dll
Provides a basic implementation for data extractors that extract data of type TSource. Library authors can use this base class to create custom extractors by inheriting from it and implementing ExtractWorkerAsync and CreateProgressReport methods.
public abstract class ExtractorBase<TSource, TProgress> : IExtractWithProgressAndCancellationAsync<TSource, TProgress>, IExtractWithCancellationAsync<TSource>, IExtractWithProgressAsync<TSource, TProgress>, IExtractAsync<TSource> where TSource : notnull where TProgress : notnull
Type Parameters
TSourceThe type of the object being extracted
TProgressThe type of the progress object
- Inheritance
-
ExtractorBase<TSource, TProgress>
- Implements
-
IExtractWithProgressAndCancellationAsync<TSource, TProgress>IExtractWithCancellationAsync<TSource>IExtractWithProgressAsync<TSource, TProgress>IExtractAsync<TSource>
- Inherited Members
Properties
CurrentItemCount
The current number of items extracted so far.
public int CurrentItemCount { get; }
Property Value
Remarks
It is the responsibility of the derived class to call IncrementCurrentItemCount() as each item is extracted. The base class has no way of knowing when an item has been processed.
CurrentSkippedItemCount
The current number of items skipped so far during extraction.
public int CurrentSkippedItemCount { get; }
Property Value
MaximumItemCount
The maximum number of items to extract. Once the extractor has reached this limit, it should stop extracting and signal the end of the sequence.
public int MaximumItemCount { get; set; }
Property Value
Examples
var count = 0;
using (var reader = new StreamReader(filePath))
{
while (!reader.EndOfStream)
{
yield return await reader.ReadLineAsync();
count++;
if (count >= MaximumItemCount)
{
Console.WriteLine("Maximum item count reached. Stopping extraction.");
break; // Stop extracting if the maximum item count is reached
}
}
}
Remarks
This is useful for partially extracting data from a source, especially when the source is large or infinite or during development.
Exceptions
- ArgumentOutOfRangeException
The specified value is less than 1.
ReportingInterval
The number of milliseconds between progress updates.
public int ReportingInterval { get; set; }
Property Value
Exceptions
- ArgumentOutOfRangeException
Value cannot be less than 1.
SkipItemCount
The number of items to skip before extracting. The extractor should skip the specified number of items before starting to yield results.
public int SkipItemCount { get; set; }
Property Value
Examples
using (var reader = new StreamReader(filePath))
{
// Skip the specified number of items before starting to yield results
var skipCount = 0;
while (!reader.EndOfStream && skipCount < SkipItemCount)
{
await reader.ReadLineAsync();
skipCount++;
}
// Now start yielding results
var count = 0;
while (!reader.EndOfStream)
{
yield return await reader.ReadLineAsync();
count++;
}
}
Remarks
This is useful for partially extracting data from a source during development, or to skip items that were already processed or are not relevant for the current extraction.
Exceptions
- ArgumentOutOfRangeException
The specified value is less than 0.
Methods
CreateProgressReport()
Creates a progress report of type TProgress. This gives the derived class the opportunity to implement a custom progress report that is specific to the extraction process.
protected abstract TProgress CreateProgressReport()
Returns
- TProgress
Progress of type TProgress
CreateProgressTimer(IProgress<TProgress>)
Creates the IProgressTimer used to drive progress callbacks. Override this method in a derived class to inject a custom timer (for example, a custom implementation that allows manual control in unit tests).
protected virtual IProgressTimer CreateProgressTimer(IProgress<TProgress> progress)
Parameters
progressIProgress<TProgress>The progress sink that will receive callbacks.
Returns
- IProgressTimer
A started IProgressTimer instance.
ExtractAsync()
Asynchronously extracts data of type TSource from a source.
public virtual IAsyncEnumerable<TSource> ExtractAsync()
Returns
- IAsyncEnumerable<TSource>
IAsyncEnumerable<TSource> The result may be an empty sequence if no data is available or if the extraction fails.
ExtractAsync(IProgress<TProgress>)
Asynchronously extracts data of type TSource from a source.
public virtual IAsyncEnumerable<TSource> ExtractAsync(IProgress<TProgress> progress)
Parameters
progressIProgress<TProgress>A provider for progress updates.
Returns
- IAsyncEnumerable<TSource>
IAsyncEnumerable<TSource> The result may be an empty sequence if no data is available or if the extraction fails.
Exceptions
- ArgumentNullException
The value of progress is null
ExtractAsync(IProgress<TProgress>, CancellationToken)
Asynchronously extracts data of type TSource from a source.
public virtual IAsyncEnumerable<TSource> ExtractAsync(IProgress<TProgress> progress, CancellationToken token)
Parameters
progressIProgress<TProgress>A provider for progress updates.
tokenCancellationTokenA CancellationToken to observe while waiting for the task to complete.
Returns
- IAsyncEnumerable<TSource>
IAsyncEnumerable<TSource> The result may be an empty sequence if no data is available or if the extraction fails.
Remarks
The extractor should be able to handle cancellation requests gracefully. If the caller doesn't plan on cancelling the extraction, they can pass CancellationToken.None.
Exceptions
- ArgumentNullException
The value of progress is null
ExtractAsync(CancellationToken)
Asynchronously extracts data of type TSource from a source.
public virtual IAsyncEnumerable<TSource> ExtractAsync(CancellationToken token)
Parameters
tokenCancellationTokenA CancellationToken to observe while waiting for the task to complete.
Returns
- IAsyncEnumerable<TSource>
IAsyncEnumerable<TSource> The result may be an empty sequence if no data is available or if the extraction fails.
Remarks
The extractor should be able to handle cancellation requests gracefully. If the caller doesn't plan on cancelling the extraction, they can pass CancellationToken.None.
ExtractWorkerAsync(CancellationToken)
This method is the core implementation of the extraction logic and should be overridden by derived classes.
protected abstract IAsyncEnumerable<TSource> ExtractWorkerAsync(CancellationToken token)
Parameters
tokenCancellationTokenA CancellationToken to observe while waiting for the task to complete.
Returns
- IAsyncEnumerable<TSource>
IAsyncEnumerable<TSource> The result may be an empty sequence if no data is available or if the extraction fails.
IncrementCurrentItemCount()
Increments the CurrentItemCount in a thread safe manner.
[SuppressMessage("IDE0058", "IDE0058:Expression value is never used", Justification = "Interlocked.Increment return value intentionally discarded; only the side-effect matters.")]
protected void IncrementCurrentItemCount()
Remarks
Simply calling CurrentItemCount++ or CurrentItemCount += 1 is not thread safe. This method ensures that CurrentItemCount is incremented safely.
IncrementCurrentSkippedItemCount()
Increments the CurrentSkippedItemCount in a thread safe manner.
[SuppressMessage("IDE0058", "IDE0058:Expression value is never used", Justification = "Interlocked.Increment return value intentionally discarded; only the side-effect matters.")]
protected void IncrementCurrentSkippedItemCount()
Remarks
Simply calling CurrentSkippedItemCount++ or CurrentSkippedItemCount += 1 is not thread safe. This method ensures that CurrentSkippedItemCount is incremented safely.