using System.Numerics.Tensors; using OpenQuery.Models; using Polly; using Polly.Retry; namespace OpenQuery.Services; public class EmbeddingService { private readonly OpenRouterClient _client; private readonly string _embeddingModel; private readonly ParallelProcessingOptions _options; private readonly RateLimiter _rateLimiter; private readonly ResiliencePipeline _retryPipeline; public EmbeddingService(OpenRouterClient client, string embeddingModel = "openai/text-embedding-3-small") { _client = client; _embeddingModel = embeddingModel; _options = new ParallelProcessingOptions(); _rateLimiter = new RateLimiter(_options.MaxConcurrentEmbeddingRequests); _retryPipeline = new ResiliencePipelineBuilder() .AddRetry(new RetryStrategyOptions { MaxRetryAttempts = 3, Delay = TimeSpan.FromSeconds(1), BackoffType = DelayBackoffType.Exponential, ShouldHandle = new PredicateBuilder() .Handle() }) .Build(); } public async Task GetEmbeddingsAsync( List texts, Action? onProgress = null, CancellationToken cancellationToken = default) { var batchSize = _options.EmbeddingBatchSize; var totalBatches = (int)Math.Ceiling(texts.Count / (double)batchSize); var results = new List<(int batchIndex, float[][] embeddings)>(); var batchIndices = Enumerable.Range(0, totalBatches).ToList(); await Parallel.ForEachAsync( batchIndices, new ParallelOptions { MaxDegreeOfParallelism = _options.MaxConcurrentEmbeddingRequests, CancellationToken = cancellationToken }, async (batchIndex, ct) => { var startIndex = batchIndex * batchSize; var batch = texts.Skip(startIndex).Take(batchSize).ToList(); onProgress?.Invoke($"[Generating embeddings: batch {batchIndex + 1}/{totalBatches}]"); try { var batchResults = await _rateLimiter.ExecuteAsync(async () => await _retryPipeline.ExecuteAsync(async token => await _client.EmbedAsync(_embeddingModel, batch), ct), ct); lock (results) { results.Add((batchIndex, batchResults)); } } catch { // Skip failed batches, return empty embeddings for this batch var emptyBatch = new float[batch.Count][]; for (var i = 0; i < batch.Count; i++) { emptyBatch[i] = []; } lock (results) { results.Add((batchIndex, emptyBatch)); } } }); // Reassemble results in order var orderedResults = results .OrderBy(r => r.batchIndex) .SelectMany(r => r.embeddings) .ToArray(); return orderedResults; } public async Task GetEmbeddingAsync( string text, CancellationToken cancellationToken = default) { var results = await _rateLimiter.ExecuteAsync(async () => await _retryPipeline.ExecuteAsync(async token => await _client.EmbedAsync(_embeddingModel, [text]), cancellationToken), cancellationToken); return results[0]; } public async Task GetEmbeddingsWithRateLimitAsync( List texts, Action? onProgress = null, CancellationToken cancellationToken = default) { var batchSize = _options.EmbeddingBatchSize; var totalBatches = (int)Math.Ceiling(texts.Count / (double)batchSize); var results = new float[totalBatches][][]; var completedBatches = 0; await Parallel.ForEachAsync( Enumerable.Range(0, totalBatches), new ParallelOptions { MaxDegreeOfParallelism = _options.MaxConcurrentEmbeddingRequests, CancellationToken = cancellationToken }, async (batchIndex, ct) => { var startIndex = batchIndex * batchSize; var batch = texts.Skip(startIndex).Take(batchSize).ToList(); var currentBatch = Interlocked.Increment(ref completedBatches); onProgress?.Invoke($"[Generating embeddings: batch {currentBatch}/{totalBatches}]"); try { var batchResults = await _rateLimiter.ExecuteAsync(async () => await _retryPipeline.ExecuteAsync(async token => await _client.EmbedAsync(_embeddingModel, batch), ct), ct); results[batchIndex] = batchResults; } catch { // Skip failed batches results[batchIndex] = new float[batch.Count][]; for (var i = 0; i < batch.Count; i++) { results[batchIndex][i] = []; } } }); return results.SelectMany(r => r).ToArray(); } public static float CosineSimilarity(float[] vector1, float[] vector2) { return TensorPrimitives.CosineSimilarity(vector1, vector2); } }