Relevant source files
The following files were used as context for generating this wiki page: - [src/docs2db/ingest.py](https://github.com/b08x/docs2db/blob/main/src/docs2db/ingest.py) - [src/docs2db/chunks.py](https://github.com/b08x/docs2db/blob/main/src/docs2db/chunks.py) - [src/docs2db/docs2db.py](https://github.com/b08x/docs2db/blob/main/src/docs2db/docs2db.py) - [src/docs2db/multiproc.py](https://github.com/b08x/docs2db/blob/main/src/docs2db/multiproc.py) - [README.md](https://github.com/b08x/docs2db/blob/main/README.md)Processing Pipeline
Introduction
The docs2db processing pipeline transforms source documents (PDFs, HTML, Markdown, Office files) into a queryable RAG (Retrieval-Augmented Generation) database. The system operates as a multi-stage pipeline where each stage produces intermediate artifacts that subsequent stages consume. The architecture demonstrates a clear separation of concerns: ingestion converts documents to Docling JSON format, chunking splits documents into manageable segments with optional LLM-generated context, embedding converts text to vector representations, and database operations handle loading and querying.
The pipeline supports incremental processing through staleness detection, automatically skipping files that have not changed since last processing. Parallel execution via multiprocessing enables efficient handling of large document collections, with configurable worker counts and batch sizes.
Pipeline Architecture
Overview
The processing pipeline consists of five primary stages, each implemented as a discrete CLI command that can be executed independently or as part of a unified pipeline:
- Ingest - Convert source documents to Docling JSON format
- Chunk - Split documents into text chunks with optional LLM-generated contextual enrichment
- Embed - Generate vector embeddings for each chunk
- Load - Import processed data into PostgreSQL database
- Dump - Export database contents to portable SQL format
The content directory (docs2db_content/) serves as the persistent storage for all intermediate processing artifacts. Each source document receives its own subdirectory containing the generated files from each pipeline stage.
Stage Dependencies
Each pipeline stage reads artifacts from the previous stage and produces outputs consumed by subsequent stages. This creates a clear dependency chain where skipping stages requires manual intervention:
| Stage | Input | Output | Dependencies |
|---|---|---|---|
| Ingest | Source files (PDF, HTML, etc.) | source.json |
None |
| Chunk | source.json |
chunks.json |
Ingest |
| Embed | chunks.json |
gran.json (model-specific) |
Chunk |
| Load |
gran.json, chunks.json
|
Database tables | Embed |
| Dump | Database tables | SQL file | Load |
The pipeline command (docs2db pipeline) executes all stages sequentially with automatic database lifecycle management.
Ingestion Stage
Purpose and Scope
The ingestion stage converts source documents into a standardized Docling JSON format. Docling is a document understanding library that extracts structured content from various file formats while preserving semantic information such as headings, tables, and page layouts.
Core Components
The ingestion functionality centers on the ingest function in src/docs2db/ingest.py:
def ingest(
source_path: str,
dry_run: bool = False,
force: bool = False,
pipeline: Optional[str] = None,
model: Optional[str] = None,
device: Optional[str] = None,
batch_size: Optional[int] = None,
workers: Optional[int] = None,
) -> bool:
Parameters:
| Parameter | Type | Description |
|---|---|---|
source_path |
str | Path to directory or file to ingest |
dry_run |
bool | Show what would be processed without executing |
force |
bool | Force reprocessing even if files are up-to-date |
pipeline |
Optional[str] | Docling pipeline: ‘standard’ or ‘vlm’ |
model |
Optional[str] | Docling model specific to pipeline |
device |
Optional[str] | Device for docling: ‘auto’, ‘cpu’, ‘cuda’, ‘mps’ |
batch_size |
Optional[int] | Docling batch size per worker |
workers |
Optional[int] | Number of parallel workers |
Sources: src/docs2db/ingest.py#L1-L100
Processing Flow
The ingestion process follows a structured workflow:
File Discovery
The system discovers ingestible files using find_ingestible_files(), which collects files matching supported extensions:
def find_ingestible_files(source_root: Path) -> list[Path]:
"""Find all files that can be ingested."""
patterns = [
"**/*.pdf",
"**/*.html",
"**/*.htm",
"**/*.md",
"**/*.docx",
"**/*.doc",
]
# ... implementation
Sources: src/docs2db/ingest.py#L50-L80
Content Path Generation
Each source file maps to a corresponding content directory structure that preserves the original hierarchy:
def generate_content_path(source_file: Path, source_root: Path) -> Path:
"""Generate content path preserving directory structure."""
relative_path = source_file.relative_to(source_root)
content_path = CONTENT_DIR / relative_path.parent / relative_path.stem
return content_path
Sources: src/docs2db/ingest.py#L80-L90
Chunking Stage
Purpose and Scope
The chunking stage splits documents into smaller text segments suitable for embedding and retrieval. It supports two chunking strategies: basic text splitting using a HybridChunker from Docling, and optional contextual enrichment using LLM providers to improve retrieval accuracy.
Core Components
The chunking functionality is implemented in src/docs2db/chunks.py, which contains:
- LLM Providers - Abstraction layer for multiple LLM backends
- Chunk Generation - Document-to-chunk conversion with structural context
- Contextual Enrichment - Optional LLM-generated context for each chunk
LLM Provider Architecture
The system implements a provider abstraction pattern supporting multiple LLM backends:
Each provider implements a common interface:
class LLMProvider(ABC):
@abstractmethod
def get_chunk_context(self, chunk_prompt: str) -> str:
"""Get context for a chunk using LLM."""
pass
@abstractmethod
def summarize_text(self, text: str) -> str:
"""Summarize text using LLM."""
pass
@abstractmethod
def set_document(self, doc_text: str):
"""Set document context."""
pass
Sources: src/docs2db/chunks.py#L1-L50
Provider Implementations
WatsonX Provider
The WatsonX provider interfaces with IBM WatsonX for contextual generation:
class WatsonXProvider(LLMProvider):
def __init__(
self,
api_key: str,
project_id: str,
url: str,
model: str,
shared_state: dict | None = None,
):
credentials = Credentials(api_key=api_key, url=url)
self.api_client = APIClient(credentials=credentials, project_id=project_id)
self.model_inference = ModelInference(
model_id=model,
api_client=self.api_client,
)
Sources: src/docs2db/chunks.py#L100-L120
Mistral Provider
The Mistral provider uses the Mistral AI API for text generation:
class MistralProvider(LLMProvider):
def __init__(
self,
base_url: str,
model: str,
api_key: str,
shared_state: dict | None = None,
):
if not api_key:
raise ValueError(
"Mistral API key required. "
"Set MISTRAL_API_KEY environment variable or get one from https://console.mistral.ai/"
)
# ... initialization
Sources: src/docs2db/chunks.py#L200-L220
OpenAI and OpenRouter Providers
Both providers use HTTP-based communication with OpenAI-compatible APIs:
class OpenAIProvider(LLMProvider):
def __init__(
self,
base_url: str,
model: str,
api_key: str,
shared_state: dict | None = None,
):
self.base_url = base_url
self.model = model
self.client = httpx.Client(
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
timeout=60.0,
)
Sources: src/docs2db/chunks.py#L150-L180
Chunk Generation Process
The chunk generation process combines document parsing with optional LLM enrichment:
Token Estimation
The system uses character-based token estimation for chunk size management:
def estimate_tokens(text: str) -> int:
"""Estimate token count for text.
Uses a conservative approximation based on character count.
Formula: chars / 3.0
"""
char_count = len(text)
return int(char_count / 3.0)
Sources: src/docs2db/chunks.py#L350-L380
This estimation method accounts for diverse content types:
- Regular English prose: ~4-5 chars/token (conservative at 3)
- Code/data/numbers: ~2-3 chars/token
Context Safety Margins
The system applies a safety margin to model context limits:
CONTEXT_SAFETY_MARGIN = 0.75 # Use 75% of max context
This ensures generated prompts fit within model limits while accounting for tokenization variance.
Embedding Stage
Purpose and Scope
The embedding stage converts text chunks into vector representations suitable for semantic similarity search. The system supports multiple embedding models and stores vectors in model-specific files within each document’s content directory.
Supported Models
Based on the file naming conventions observed in the content directory structure:
| Model | Output File | Description |
|---|---|---|
| granite-30m-english | gran.json |
IBM Granite 30M English embeddings |
| e5-small-v2 | e5sm.json |
E5 small v2 embeddings |
| slate-125m | slate.json |
Slate 125M embeddings |
| noinstruct-small | varies | No-instruct small model |
The embedding generation is triggered via:
@app.command()
def embed(
content_dir: Annotated[str | None, typer.Option(help="Path to content directory")] = None,
model: Annotated[str | None, typer.Option(help="Embedding model")],
# ... additional parameters
) -> None:
Sources: src/docs2db/docs2db.py#L1-L50
Database Operations
Loading Data
The load stage imports processed chunks and embeddings into PostgreSQL:
def load_documents(
content_dir: Path,
pattern: str = "**",
# ... connection parameters
) -> bool:
The system supports:
- Vector similarity search via pgvector extension
- Full-text search via PostgreSQL tsvector with GIN indexing
- Hybrid search combining vector and BM25 approaches
Sources: src/docs2db/database.py#L1-L50
Database Lifecycle
The system provides commands for database management:
| Command | Function |
|---|---|
db-start |
Start PostgreSQL via Docker/Podman compose |
db-stop |
Stop the running database |
db-destroy |
Remove database containers and volumes |
db-status |
Check database connection status |
db-logs |
Retrieve database logs |
Pipeline Orchestration
Complete Pipeline Command
The docs2db pipeline command orchestrates all stages:
@app.command()
def pipeline(
source_path: Annotated[str | None, typer.Argument(help="Source path")],
# ... many options
) -> None:
"""Run complete docs2db pipeline: start DB → ingest → chunk → embed → load → dump → stop."""
# Step 1: Start database
if not start_database():
raise Docs2DBException("Failed to start database")
# Step 2: Ingest
if not ingest_command(...):
raise Docs2DBException("Failed to ingest documents")
# Step 3: Chunk
if not generate_chunks(...):
raise Docs2DBException("Failed to generate chunks")
# Step 4: Embed
if not generate_embeddings(...):
raise Docs2DBException("Failed to generate embeddings")
# Step 5: Load
if not load_documents(...):
raise Docs2DBException("Failed to load documents")
# Step 6: Dump
if not dump_database(...):
raise Docs2DBException("Failed to dump database")
# Step 7: Stop
stop_database()
Sources: src/docs2db/docs2db.py#L100-L150
Parallel Processing
The BatchProcessor class handles parallel execution across pipeline stages:
class BatchProcessor:
def __init__(
self,
worker_function: callable,
worker_args: tuple,
progress_message: str,
batch_size: int,
mem_threshold_mb: int,
max_workers: int,
use_shared_state: bool = False,
):
Sources: src/docs2db/multiproc.py#L1-L50
Configuration parameters control parallelism:
| Parameter | Default | Description |
|---|---|---|
chunking_workers |
CPU count | Number of parallel chunking workers |
docling_workers |
CPU count | Number of parallel ingestion workers |
docling_batch_size |
1 | Files processed per batch |
mem_threshold_mb |
2000 | Memory threshold for chunking |
mem_threshold_mb |
1500 | Memory threshold for ingestion |
Content Directory Structure
The content directory stores all intermediate artifacts:
docs2db_content/
├── path/
│ └── to/
│ └── document/
│ ├── source.json # Docling ingested document
│ ├── chunks.json # Text chunks with context
│ ├── gran.json # Vector embeddings (model-specific)
│ └── meta.json # Processing metadata
Each stage reads from and writes to this directory, enabling:
- Incremental processing (stale file detection)
- Debugging and inspection at each stage
- Manual intervention between stages
Sources: README.md#L50-L70
Staleness Detection
The pipeline implements automatic staleness detection to skip unchanged files:
def is_chunks_stale(chunks_file: Path, source_file: Path) -> bool:
"""Check if chunks need regeneration."""
if not chunks_file.exists():
return True
chunks_mtime = chunks_file.stat().st_mtime
source_mtime = source_file.stat().st_mtime
return chunks_mtime < source_mtime
This check compares modification times between source and output files, avoiding unnecessary reprocessing.
Configuration
Environment Variables
The system uses environment variables for configuration:
| Variable | Description |
|---|---|
WATSONX_API_KEY |
IBM WatsonX API key |
WATSONX_PROJECT_ID |
WatsonX project ID |
OPENAI_API_KEY |
OpenAI API key |
MISTRAL_API_KEY |
Mistral AI API key |
OPENROUTER_API_KEY |
OpenRouter API key |
CLI Options
Each pipeline command accepts extensive CLI options for configuration. Run docs2db <command> --help for complete option lists.
Conclusion
The docs2db processing pipeline implements a staged architecture for transforming source documents into a searchable RAG database. Each stage produces intermediate artifacts stored in a hierarchical content directory, enabling incremental processing and manual inspection. The system demonstrates several architectural patterns:
- Provider Abstraction - Multiple LLM backends (WatsonX, OpenAI, Mistral, OpenRouter) implement a common interface
- Parallel Processing - BatchProcessor enables concurrent execution with configurable worker counts
- Incremental Processing - Staleness detection avoids unnecessary reprocessing
- Pipeline Orchestration - The unified pipeline command coordinates all stages with automatic database lifecycle management
The separation between stages allows independent execution and debugging, while the content directory structure provides visibility into each transformation step. The lack of explicit database.py and embed.py in the provided context represents a structural gap in the available source information, as the database loading and embedding generation stages are referenced but their implementations were not included in the provided file set.