# SearchTool Component Deep dive into `SearchTool` - the core pipeline orchestrator that implements the 4-phase search-retrieve-rank workflow. ## Overview `SearchTool` is the workhorse of OpenQuery. It Takes search queries, fetches articles, generates embeddings, ranks by relevance, and returns formatted context for the final AI answer. ## Location `Tools/SearchTool.cs` ## Class Definition ```csharp public class SearchTool { private readonly SearxngClient _searxngClient; private readonly EmbeddingService _embeddingService; private readonly ParallelProcessingOptions _options; public static string Name => "search"; public static string Description => "Search the web for information on a topic"; public SearchTool( SearxngClient searxngClient, EmbeddingService embeddingService); public Task ExecuteAsync( string originalQuery, List generatedQueries, int maxResults, int topChunksLimit, Action? onProgress = null, bool verbose = true); } ``` **Dependencies**: - `SearxngClient` - for web searches - `EmbeddingService` - for vector generation - `ParallelProcessingOptions` - concurrency settings (hardcoded new instance) **Static Properties**: - `Name` - tool identifier (currently "search") - `Description` - tool description ## ExecuteAsync Method **Signature**: ```csharp public async Task ExecuteAsync( string originalQuery, // User's original question List generatedQueries, // Expanded search queries int maxResults, // Results per query int topChunksLimit, // Top N chunks to return Action? onProgress, // Progress callback bool verbose) // Verbose mode flag ``` **Returns**: `Task` - formatted context with source citations **Contract**: - Never returns `null` (returns "No search results found." on zero results) - Progress callback may be invoked frequently (many phases) - `verbose` passed to sub-components for their own logging ## The 4-Phase Pipeline ``` ExecuteAsync() │ ├─ Phase 1: ExecuteParallelSearchesAsync │ Input: generatedQueries × maxResults │ Output: List (deduplicated) │ ├─ Phase 2: ExecuteParallelArticleFetchingAsync │ Input: List │ Output: List (with content, url, title) │ ├─ Phase 3: ExecuteParallelEmbeddingsAsync │ Input: originalQuery + List │ Output: (queryEmbedding, chunkEmbeddings) │ (also sets Chunk.Embedding for valid chunks) │ ├─ Phase 4: RankAndSelectTopChunks │ Input: List + queryEmbedding + chunkEmbeddings │ Output: List topChunks (with Score set) │ └─ Format Context → return string ``` ### Phase 1: ExecuteParallelSearchesAsync **Purpose**: Execute all search queries in parallel, collect and deduplicate results. **Implementation**: ```csharp var allResults = new ConcurrentBag(); var searchTasks = generatedQueries.Select(async query => { onProgress?.Invoke($"[Searching web for '{query}'...]"); try { var results = await _searsult in results) { allResults.Add(result); } } catch (Exception ex) { if (verbose) Console.WriteLine($"Warning: Search failed for query '{query}': {ex.Message}"); } }); await Task.WhenAll(searchTasks); var uniqueResults = allResults.DistinctBy(r => r.Url).ToList(); return uniqueResults; ``` **Details**: - `ConcurrentBag` collects results thread-safely - `Task.WhenAll` - unbounded parallelism (parallel to `generatedQueries.Count`) - Each task: calls `_searxngClient.SearchAsync(query, maxResults)` - Errors caught and logged (verbose only); other queries continue - `DistinctBy(r => r.Url)` removes duplicates **Return**: `List` (unique URLs only) **Progress**: `[Searching web for '{query}'...]` **Potential Issues**: - Could overwhelm local SearxNG if `generatedQueries` is large (100+) - SearxNG itself may have its own rate limiting **Future Enhancement**: - Add semaphore to limit search concurrency - Add timeout per search task - Cache search results (same query across runs) ### Phase 2: ExecuteParallelArticleFetchingAsync **Purpose**: Fetch each search result URL, extract article content, split into chunks. **Implementation**: ```csharp var chunks = new ConcurrentBag(); var completedFetches = 0; var totalFetches = searchResults.Count; var semaphore = new SemaphoreSlim(_options.MaxConcurrentArticleFetches); // 10 var fetchTasks = searchResults.Select(async result => { await semaphore.WaitAsync(); try { var current = Interlocked.Increment(ref completedFetches); var uri = new Uri(result.Url); var domain = uri.Host; onProgress?.Invoke($"[Fetching article {current}/{totalFetches}: {domain}]"); try { var article = await ArticleService.FetchArticleAsync(result.Url); if (!article.IsReadable || string.IsNullOrEmpty(article.TextContent)) return; var textChunks = ChunkingService.ChunkText(article.TextContent); foreach (var chunkText in textChunks) { chunks.Add(new Chunk(chunkText, result.Url, article.Title)); } } catch (Exception ex) { if (verbose) Console.WriteLine($"Warning: Failed to fetch article {result.Url}: {ex.Message}"); } } finally { semaphore.Release(); } }); await Task.WhenAll(fetchTasks); return chunks.ToList(); ``` **Details**: - `SemaphoreSlim` limits concurrency to `MaxConcurrentArticleFetches` (10) - `Interlocked.Increment` for thread-safe progress counting - Progress: `[Fetching article X/Y: domain]` (extracts host from URL) - `ArticleService.FetchArticleAsync` uses SmartReader - Article must be `IsReadable` and have `TextContent` - `ChunkingService.ChunkText` splits into ~500-char pieces - Each chunk becomes a `Chunk(content, url, article.Title)` - Errors logged (verbose only); failed URLs yield no chunks **Return**: `List` (potentially many per article) **Chunk Count Estimate**: - 15 articles × average 3000 chars/article = 45,000 chars - With 500-char chunks ≈ 90 chunks - With natural breaks → maybe 70-80 chunks **Potential Issues**: - Some sites block SmartReader (JS-heavy, paywalls) - Slow article fetches may cause long tail latency - Large articles create many chunks → memory + embedding cost **Future Enhancements**: - Add per-URL timeout - Filter chunks by length threshold (skip tiny chunks) - Deduplicate chunks across articles (same content on different sites) - Cache article fetches by URL ### Phase 3: ExecuteParallelEmbeddingsAsync **Purpose**: Generate embeddings for the original query and all chunks, with batching, rate limiting, and concurrency control. **Implementation**: ```csharp onProgress?.Invoke($"[Generating embeddings for {chunks.Count} chunks and query...]"); // Start query embedding (single) and chunk embeddings (batch) concurrently var queryEmbeddingTask = _embeddingService.GetEmbeddingAsync(originalQuery); var chunkTexts = chunks.Select(c => c.Embedding).ToList(); // WRONG in original code? // Actually: chunks.Select(c => c.Content).ToList(); var chunkEmbeddingsTask = _embeddingService.GetEmbeddingsWithRateLimitAsync( chunkTexts, onProgress); await Task.WhenAll(queryEmbeddingTask, chunkEmbeddingsTask); var queryEmbedding = await queryEmbeddingTask; var chunkEmbeddings = await chunkEmbeddingsTask; // Filter out chunks with empty embeddings var validChunks = new List(); var validEmbeddings = new List(); for (var i = 0; i < chunks.Count; i++) { if (chunkEmbeddings[i].Length > 0) { validChunks.Add(chunks[i]); validEmbeddings.Add(chunkEmbeddings[i]); } } // Update chunks with embeddings for (var i = 0; i < validChunks.Count; i++) { validChunks[i].Embedding = validEmbeddings[i]; } return (queryEmbedding, validEmbeddings.ToArray()); ``` **Corrected Code** (matching actual source): ```csharp var chunkTexts = chunks.Select(c => c.Content).ToList(); var chunkEmbeddingsTask = _embeddingService.GetEmbeddingsWithRateLimitAsync( chunkTexts, onProgress); ``` **Details**: - **Query embedding**: Single request for original question (one embedding) - **Chunk embeddings**: Batch processing of all chunk texts - Both run concurrently via `Task.WhenAll` - `_embeddingService.GetEmbeddingsWithRateLimitAsync` uses: - Batch size: 300 (default) - Max concurrent batches: 4 (default) - Polly retry (3 attempts, exponential backoff) - `RateLimiter` (semaphore) for API concurrency - Failed batches return empty `float[]` (length 0) - Filters out failed chunks (won't be ranked) - `validChunks[i].Embedding = validEmbeddings[i]` attaches embedding to chunk **Return**: `(float[] queryEmbedding, float[][] chunkEmbeddings)` where: - `chunkEmbeddings` length = `validChunks.Count` (filtered) - Order matches `validChunks` order (since we filtered parallel arrays) **Progress**: Interleaved from embedding service's own progress callbacks (batch X/Y) **Potential Issues**: - `GetEmbeddingsWithRateLimitAsync` uses `results[batchIndex] = ...` which is not thread-safe without synchronization - **BUG**? - Actually `results` is an array, not a list, so indexing is thread-safe - But concurrent writes to different indices are safe - Filtering loop assumes `chunkEmbeddings` has same count as `chunks`; if embedding service returns fewer, might index out of range - Looking at `GetEmbeddingsWithRateLimitAsync`: returns `results.SelectMany(r => r).ToArray()` which should match input count (including empty arrays for failed batches) - So safe **Memory Consideration**: - `chunkTexts` list holds all chunk strings (may be large, but still in memory) - `chunkEmbeddings` holds all float arrays (600KB for 100 chunks) - Total: modest (~few MB) **Future Enhancements**: - Stream embeddings? (No benefit, need all for ranking) - Cache embeddings by content hash (cross-run) - Support different embedding model per query ### Phase 4: RankAndSelectTopChunks **Purpose**: Score chunks by semantic relevance to query, sort, and select top N. **Implementation**: ```csharp var chunksWithEmbeddings = chunks.Where(c => c.Embedding != null).ToList(); foreach (var chunk in chunksWithEmbeddings) { chunk.Score = EmbeddingService.CosineSimilarity(queryEmbedding, chunk.Embedding!); } var topChunks = chunksWithEmbeddings .OrderByDescending(c => c.Score) .Take(topChunksLimit) .ToList(); return topChunks; ``` **Details**: - Filters to chunks that have embeddings (successful phase 3) - For each: `Score = CosineSimilarity(queryEmbedding, chunkEmbedding)` - Uses `TensorPrimitives.CosineSimilarity` (SIMD-accelerated) - Returns float typically 0-1 (higher = more relevant) - `OrderByDescending` - highest scores first - `Take(topChunksLimit)` - select top N (from `--chunks` option) - Returns `List` (now with `Score` set) **Return**: Top N chunks ready for context formatting **Complexity**: - O(n) for scoring (where n = valid chunks, typically 50-100) - O(n log n) for sorting (fast for n=100) - Negligible CPU time **Edge Cases**: - If `topChunksLimit` > `chunksWithEmbeddings.Count`, returns all (no padding) - If all embeddings failed, returns empty list - Should handle `topChunksLimit == 0` (returns empty) ### Context Formatting (After Phase 4) **Location**: In `ExecuteAsync`, after ranking: ```csharp var context = string.Join("\n\n", topChunks.Select((c, i) => $"[Source {i + 1}: {c.Title ?? "Unknown"}]({c.SourceUrl})\n{c.Content}")); return context; ``` **Format**: ``` [Source 1: Article Title](https://example.com/article) Chunk content text... [Source 2: Another Title](https://example.com/another) Chunk content text... [Source 3: Third Title](https://example.com/third) Chunk content text... ``` **Features**: - Each source numbered 1, 2, 3... (matches order of topChunks = descending relevance) - Title or "Unknown" if null - Title is markdown link to original URL - Chunk content as plain text (may contain its own formatting) - Double newline between sources **Rationale**: - Markdown links allow copy-pasting to browsers - Numbers allow LLM to cite `[Source 1]` in answer - Original title helps user recognize source **Potential Issues**: - LLM might misinterpret "Source 1" as literal citation required - If chunks contain markdown, may conflict (no escaping) - Some titles may have markdown special chars (unlikely but possible) **Alternative**: Could use XML-style tags or more robust citation format. ## Error Handling & Edge Cases ### Empty Results Handling At end of `ExecuteAsync`: ```csharp if (searchResults.Count == 0) return "No search results found."; if (chunks.Count == 0) return "Found search results but could not extract readable content."; ``` These messages appear in final answer (LLM will respond to these contexts). ### Partial Failures - Some search queries fail → proceed with others - Some articles fail to fetch → continue - Some embedding batches fail → those chunks filtered out - Ranking proceeds with whatever valid embeddings exist ### Verbose vs Compact Progress `verbose` parameter affects what's passed to phases: - **Article fetching**: errors only shown if `verbose` - **Embeddings**: always shows batch progress via `onProgress` (from EmbeddingService) - **Searches**: no error suppression (warning always logged to Console, not through callback) ### Progress Callback Pattern `onProgress` is invoked at major milestones: - Searching: `[Searching web for '{query}'...]` - Article fetch: `[Fetching article X/Y: domain]` - Embeddings: `[Generating embeddings: batch X/Y]` - Final: `[Found top X most relevant chunks overall. Generating answer...]` Each phase may invoke many times (e.g., embedding batches). `StatusReporter` handles these appropriately. ## Performance Characteristics ### Time Estimate per Phase (for typical 3 queries, 5 results each, ~15 articles): | Phase | Time | Dominated By | |-------|------|--------------| | Searches | 3-8s | Network latency to SearxNG | | Article Fetching | 5-15s | Network + SmartReader CPU | | Embeddings | 2-4s | OpenRouter API latency (4 concurrent batches) | | Ranking | <0.1s | CPU (O(n log n) sort, n~100) | | **Total Pipeline** | **10-30s** | Articles + Searches | ### Concurrency Limits Effect **Article Fetching** (`MaxConcurrentArticleFetches` = 10): - 15 articles → 2 waves (10 then 5) - If each takes 2s → ~4s total (vs 30s sequential) **Embedding Batching** (`MaxConcurrentEmbeddingRequests` = 4, `EmbeddingBatchSize` = 300): - 80 chunks → 1 batch of 300 (all fit) - If 300 chunks → 1 batch (300 fits), but max concurrent = 4 if multiple embedding calls - Here: single embedding call with 80 items = 1 batch (no parallelism needed) ### Memory Usage - `searchResults` (15 items) → ~30KB - `chunks` (80 items × 500 chars) → ~40KB text + embeddings ~400KB (80 × 1536 × 4) - Total ≈ 500KB excluding temporary HTTP buffers ## Design Decisions ### Why Use ConcurrentBag for Results/Chunks? Thread-safe collection allows parallel tasks to add without locks. Order is not preserved (but we `DistinctBy` and `Select` maintains order of insertion? Actually no, `ConcurrentBag` doesn't guarantee order. But we later `ToList()` and `DistinctBy` preserves first occurrence order from the bag's enumeration (which is nondeterministic). This is acceptable because order doesn't matter (ranking is semantic). If order mattered, would need `ConcurrentQueue` or sorting by source. ### Why Not Use Parallel.ForEach for Article Fetching? We use `Task.WhenAll` with `Select` + semaphore. `Parallel.ForEachAsync` could also work but requires .NET 6+ and we want to use same pattern as other phases. Semaphore gives explicit concurrency control. ### Why Separate Query Embedding from Chunk Embeddings? `GetEmbeddingAsync` is called directly (not batched) because there's only one query. Could be batched with chunks but: - Query is small (single string) - Batch API has overhead (request structure) - Separate call allows independent completion (no need to wait for chunks to start query embedding) ### Why Two Different Embedding Methods? `EmbeddingService` has: - `GetEmbeddingsWithRateLimitAsync` (used in SearchTool) - `GetEmbeddingsAsync` (similar but different implementation) Probably legacy/refactor artifact. Could consolidate. ### Why Not Deduplicate URLs Earlier? Deduplication happens after search aggregation. Could also deduplicate within each search result (SearxNG might already dedupe across engines). But global dedupe is necessary. ### Why Not Early Filtering (e.g., by domain, length)? Possibly could improve quality: - Filter by domain reputation - Filter articles too short (<200 chars) or too long (>50KB) - Not implemented (keep simple) ## Testing Considerations **Unit Testability**: `SearchTool` is fairly testable with mocks: - Mock `SearxngClient` to return predetermined results - Mock `ArticleService` via `EmbeddingService` (or mock that too) - Verify progress callback invocations - Verify final context format **Integration Testing**: - End-to-end with real/mocked external services - Need test SearxNG instance and test OpenRouter key (or mock responses) **Performance Testing**: - Benchmark with different concurrency settings - Profile memory for large result sets (1000+ articles) - Measure embedding API latency impact ## Known Issues ### Bug in ExecuteParallelEmbeddingsAsync? Looking at the actual source code of `ExecuteParallelEmbeddingsAsync` **in the core SearchTool**: ```csharp var chunkTexts = chunks.Select(c => c.Content).ToList(); var chunkEmbeddingsTask = _embeddingService.GetEmbeddingsWithRateLimitAsync( chunkTexts, onProgress); ``` This is correct. But in the **initial search result**, I notice there might be confusion. I'll verify this when writing the full component documentation. ### Potential Race Condition in GetEmbeddingsWithRateLimitAsync ```csharp results[batchIndex] = batchResults; ``` This is writing to an array index from multiple parallel tasks. Array index writes are atomic for reference types on 64-bit? Actually, writes to different indices are safe because they don't overlap. This is fine. ### Progress Callback May Overwhelm If invoked synchronously from many parallel tasks, could saturate the channel. `Channel.TryWrite` will return false if buffer full; we ignore return value. Could drop messages under heavy load. Acceptable for CLI UI (some messages may be lost but overall progress visible). ## Related Components - **[OpenQueryApp](openquery-app.md)** - calls this - **[SearxngClient](../../services/SearxngClient.md)** - phase 1 - **[ArticleService](../../services/ArticleService.md)** - phase 2a - **[ChunkingService](../../services/ChunkingService.md)** - phase 2b - **[EmbeddingService](../../services/EmbeddingService.md)** - phase 3 - **[Ranking](../../services/EmbeddingService.md#cosinesimilarity)** - cosine similarity --- ## Next Steps - [Services Overview](../services/overview.md) - See supporting services - [CLI Reference](../../api/cli.md) - How users trigger this pipeline - [Performance](../performance.md) - Optimize pipeline settings