Net — Fx 4.0 Fix
public async Task<WorkItem> ProcessAsync(WorkItem input, CancellationToken token) { // Simulate validation work await Task.Delay(50, token); input.IsValid = !string.IsNullOrWhiteSpace(input.InputData) && input.InputData.Length >= 3; if (!input.IsValid) { input.ProcessedData = "INVALID"; } Console.WriteLine($"[{StageName}] Item {input.Id}: Valid = {input.IsValid}"); return input; } }
// Simulate enrichment (e.g., database lookup, API call) await Task.Delay(80, token); input.ProcessedData = $"[ENRICHED] {input.ProcessedData} - Length: {input.ProcessedData.Length}"; Console.WriteLine($"[{StageName}] Item {input.Id}: Enriched data"); return input; } }
public async Task<WorkItem> ProcessAsync(WorkItem input, CancellationToken token) { if (!input.IsValid) return input; net fx 4.0
// Parallel Pipeline Processor public class ParallelPipelineProcessor { private readonly List<IPipelineStage<WorkItem, WorkItem>> _stages; private readonly int _parallelismLevel;
// Pipeline stage interface public interface IPipelineStage<TInput, TOutput> { Task<TOutput> ProcessAsync(TInput input, CancellationToken token); string StageName { get; } } _queues[stageIndex] : null; } } This feature demonstrates
public async Task<List<WorkItem>> ProcessItemsAsync( IEnumerable<WorkItem> items, CancellationToken cancellationToken, IProgress<string> progress = null) { var inputQueue = new BlockingCollection<WorkItem>(); var results = new ConcurrentBag<WorkItem>(); // Start producer task var producerTask = Task.Run(() => { foreach (var item in items) { cancellationToken.ThrowIfCancellationRequested(); inputQueue.Add(item, cancellationToken); } inputQueue.CompleteAdding(); }, cancellationToken);
// Create consumer tasks for each pipeline stage var stageTasks = new List<Task>(); for (int i = 0; i < _stages.Count; i++) { var stageIndex = i; var stage = _stages[stageIndex]; var nextQueue = (stageIndex < _stages.Count - 1) ? new BlockingCollection<WorkItem>() : null; var stageTask = Task.Run(async () => { var sourceQueue = (stageIndex == 0) ? inputQueue : GetQueueForStage(stageIndex - 1); foreach (var item in sourceQueue.GetConsumingEnumerable()) { cancellationToken.ThrowIfCancellationRequested(); progress?.Report($"Processing item {item.Id} in {stage.StageName}"); var processedItem = await stage.ProcessAsync(item, cancellationToken); if (nextQueue != null) { nextQueue.Add(processedItem, cancellationToken); } else { results.Add(processedItem); } } nextQueue?.CompleteAdding(); }, cancellationToken); stageTasks.Add(stageTask); StoreQueueForStage(stageIndex, nextQueue); } await Task.WhenAll(stageTasks.ToArray()); await producerTask; return results.ToList(); } private Dictionary<int, BlockingCollection<WorkItem>> _queues = new Dictionary<int, BlockingCollection<WorkItem>>(); private void StoreQueueForStage(int stageIndex, BlockingCollection<WorkItem> queue) { if (queue != null) _queues[stageIndex] = queue; } private BlockingCollection<WorkItem> GetQueueForStage(int stageIndex) { return _queues.ContainsKey(stageIndex) ? _queues[stageIndex] : null; } } _queues[stageIndex] : null
This feature demonstrates parallel processing, task coordination, and cancellation tokens. Complete Implementation using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace ParallelDataPipeline { // Custom data entity public class WorkItem { public int Id { get; set; } public string InputData { get; set; } public string ProcessedData { get; set; } public DateTime StartTime { get; set; } public DateTime EndTime { get; set; } public bool IsValid { get; set; } }