Major performance improvements: - Parallel search execution across all queries - Parallel article fetching with 10 concurrent limit - Parallel embeddings with rate limiting (4 concurrent) - Polly integration for retry resilience New features: - Add -v/--verbose flag for detailed output - Compact single-line status mode with braille spinner - StatusReporter service for unified output handling - Query generation and errors hidden in compact mode - ANSI escape codes for clean line updates New files: - Services/RateLimiter.cs - Semaphore-based concurrency control - Services/StatusReporter.cs - Verbose/compact output handler - Models/ParallelOptions.cs - Parallel processing configuration All changes maintain Native AOT compatibility.
165 lines
5.7 KiB
C#
165 lines
5.7 KiB
C#
using System.Numerics.Tensors;
|
|
using OpenQuery.Models;
|
|
using Polly;
|
|
using Polly.Retry;
|
|
|
|
namespace OpenQuery.Services;
|
|
|
|
public class EmbeddingService
|
|
{
|
|
private readonly OpenRouterClient _client;
|
|
private readonly string _embeddingModel;
|
|
private readonly ParallelProcessingOptions _options;
|
|
private readonly RateLimiter _rateLimiter;
|
|
private readonly ResiliencePipeline _retryPipeline;
|
|
|
|
public EmbeddingService(OpenRouterClient client, string embeddingModel = "openai/text-embedding-3-small")
|
|
{
|
|
_client = client;
|
|
_embeddingModel = embeddingModel;
|
|
_options = new ParallelProcessingOptions();
|
|
_rateLimiter = new RateLimiter(_options.MaxConcurrentEmbeddingRequests);
|
|
|
|
_retryPipeline = new ResiliencePipelineBuilder()
|
|
.AddRetry(new RetryStrategyOptions
|
|
{
|
|
MaxRetryAttempts = 3,
|
|
Delay = TimeSpan.FromSeconds(1),
|
|
BackoffType = DelayBackoffType.Exponential,
|
|
ShouldHandle = new PredicateBuilder()
|
|
.Handle<HttpRequestException>()
|
|
})
|
|
.Build();
|
|
}
|
|
|
|
public async Task<float[][]> GetEmbeddingsAsync(
|
|
List<string> texts,
|
|
Action<string>? onProgress = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var batchSize = _options.EmbeddingBatchSize;
|
|
var totalBatches = (int)Math.Ceiling(texts.Count / (double)batchSize);
|
|
var results = new List<(int batchIndex, float[][] embeddings)>();
|
|
|
|
var batchIndices = Enumerable.Range(0, totalBatches).ToList();
|
|
|
|
await Parallel.ForEachAsync(
|
|
batchIndices,
|
|
new ParallelOptions
|
|
{
|
|
MaxDegreeOfParallelism = _options.MaxConcurrentEmbeddingRequests,
|
|
CancellationToken = cancellationToken
|
|
},
|
|
async (batchIndex, ct) =>
|
|
{
|
|
var startIndex = batchIndex * batchSize;
|
|
var batch = texts.Skip(startIndex).Take(batchSize).ToList();
|
|
|
|
onProgress?.Invoke($"[Generating embeddings: batch {batchIndex + 1}/{totalBatches}]");
|
|
|
|
try
|
|
{
|
|
var batchResults = await _rateLimiter.ExecuteAsync(async () =>
|
|
await _retryPipeline.ExecuteAsync(async token =>
|
|
await _client.EmbedAsync(_embeddingModel, batch),
|
|
ct),
|
|
ct);
|
|
|
|
lock (results)
|
|
{
|
|
results.Add((batchIndex, batchResults));
|
|
}
|
|
}
|
|
catch
|
|
{
|
|
// Skip failed batches, return empty embeddings for this batch
|
|
var emptyBatch = new float[batch.Count][];
|
|
for (var i = 0; i < batch.Count; i++)
|
|
{
|
|
emptyBatch[i] = [];
|
|
}
|
|
lock (results)
|
|
{
|
|
results.Add((batchIndex, emptyBatch));
|
|
}
|
|
}
|
|
});
|
|
|
|
// Reassemble results in order
|
|
var orderedResults = results
|
|
.OrderBy(r => r.batchIndex)
|
|
.SelectMany(r => r.embeddings)
|
|
.ToArray();
|
|
|
|
return orderedResults;
|
|
}
|
|
|
|
public async Task<float[]> GetEmbeddingAsync(
|
|
string text,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var results = await _rateLimiter.ExecuteAsync(async () =>
|
|
await _retryPipeline.ExecuteAsync(async token =>
|
|
await _client.EmbedAsync(_embeddingModel, [text]),
|
|
cancellationToken),
|
|
cancellationToken);
|
|
|
|
return results[0];
|
|
}
|
|
|
|
public async Task<float[][]> GetEmbeddingsWithRateLimitAsync(
|
|
List<string> texts,
|
|
Action<string>? onProgress = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var batchSize = _options.EmbeddingBatchSize;
|
|
var totalBatches = (int)Math.Ceiling(texts.Count / (double)batchSize);
|
|
var results = new float[totalBatches][][];
|
|
|
|
var completedBatches = 0;
|
|
|
|
await Parallel.ForEachAsync(
|
|
Enumerable.Range(0, totalBatches),
|
|
new ParallelOptions
|
|
{
|
|
MaxDegreeOfParallelism = _options.MaxConcurrentEmbeddingRequests,
|
|
CancellationToken = cancellationToken
|
|
},
|
|
async (batchIndex, ct) =>
|
|
{
|
|
var startIndex = batchIndex * batchSize;
|
|
var batch = texts.Skip(startIndex).Take(batchSize).ToList();
|
|
|
|
var currentBatch = Interlocked.Increment(ref completedBatches);
|
|
onProgress?.Invoke($"[Generating embeddings: batch {currentBatch}/{totalBatches}]");
|
|
|
|
try
|
|
{
|
|
var batchResults = await _rateLimiter.ExecuteAsync(async () =>
|
|
await _retryPipeline.ExecuteAsync(async token =>
|
|
await _client.EmbedAsync(_embeddingModel, batch),
|
|
ct),
|
|
ct);
|
|
|
|
results[batchIndex] = batchResults;
|
|
}
|
|
catch
|
|
{
|
|
// Skip failed batches
|
|
results[batchIndex] = new float[batch.Count][];
|
|
for (var i = 0; i < batch.Count; i++)
|
|
{
|
|
results[batchIndex][i] = [];
|
|
}
|
|
}
|
|
});
|
|
|
|
return results.SelectMany(r => r).ToArray();
|
|
}
|
|
|
|
public static float CosineSimilarity(float[] vector1, float[] vector2)
|
|
{
|
|
return TensorPrimitives.CosineSimilarity(vector1, vector2);
|
|
}
|
|
}
|