Relevant source files
The following files were used as context for generating this wiki page: - [src/docs2db/ingest.py](https://github.com/b08x/docs2db/blob/main/src/docs2db/ingest.py) - [src/docs2db/config.py](https://github.com/b08x/docs2db/blob/main/src/docs2db/config.py) - [src/docs2db/docs2db.py](https://github.com/b08x/docs2db/blob/main/src/docs2db/docs2db.py) - [src/docs2db/chunks.py](https://github.com/b08x/docs2db/blob/main/src/docs2db/chunks.py) - [src/docs2db/multiproc.py](https://github.com/b08x/docs2db/blob/main/src/docs2db/multiproc.py) - [README.md](https://github.com/b08x/docs2db/blob/main/README.md)Document Ingestion
Introduction
Document ingestion in docs2db represents the initial phase of the RAG (Retrieval-Augmented Generation) pipeline, responsible for converting source documents from various formats into a standardized Docling JSON representation. The ingestion system accepts multiple input formats (PDF, HTML, Markdown, DOCX), processes them through the Docling library, and outputs structured JSON files that subsequent pipeline stages (chunking, embedding, loading) consume.
The ingestion mechanism operates as a batch processing system with parallel workers, supporting configuration through CLI options, environment variables, and .env files. It maintains the source directory structure in the output content directory, enabling traceability between original and processed files.
Architecture Overview
Core Components
The document ingestion system consists of three primary functional areas:
- File Discovery - Locating ingestible files within a source directory
- Format Conversion - Transforming source documents to Docling JSON format
- Batch Processing - Parallel execution with progress tracking and error handling
Processing Flow
The ingestion pipeline follows a sequential pattern: source discovery → validation → batch processing → file output. Each source file receives its own subdirectory within the content directory, containing the Docling JSON representation and processing metadata.
File Discovery Mechanism
Ingestible File Types
The system identifies files based on file extensions. The supported input formats include:
| Format | Extensions | Source |
|---|---|---|
.pdf |
ingest.py#L45-L55 | |
| HTML |
.html, .htm
|
ingest.py#L45-L55 |
| Markdown |
.md, .markdown
|
ingest.py#L45-L55 |
| DOCX | .docx |
ingest.py#L45-L55 |
File Discovery Function
# src/docs2db/ingest.py#L45-L55
def find_ingestible_files(root: Path) -> Generator[Path, None, None]:
"""Find all ingestible files in a directory.
Args:
root: Root directory to search
Yields:
Path objects for ingestible files
"""
extensions = {".pdf", ".html", ".htm", ".md", ".markdown", ".docx"}
for ext in extensions:
yield from root.rglob(f"*{ext}")
The function uses Path.rglob() for recursive globbing, enabling deep directory traversal. Files are yielded in arbitrary order and subsequently sorted to ensure deterministic processing sequence.
Batch Processing Infrastructure
BatchProcessor Class
The BatchProcessor class in multiproc.py manages parallel worker execution with the following characteristics:
| Parameter | Type | Purpose |
|---|---|---|
worker_function |
callable | Function executed per file |
worker_args |
tuple | Arguments passed to worker |
progress_message |
str | Display text in progress bar |
batch_size |
int | Files processed per worker batch |
mem_threshold_mb |
int | Memory threshold triggering worker recycling |
max_workers |
int | Maximum parallel processes |
The processor implements memory-aware worker recycling. When cumulative memory usage exceeds the threshold, workers are restarted to prevent memory leaks from Docling processes.
# src/docs2db/multiproc.py - BatchProcessor initialization
processor = BatchProcessor(
worker_function=ingest_batch,
worker_args=(str(source_root), force, pipeline, model, device, batch_size),
progress_message="Ingesting files...",
batch_size=settings.docling_batch_size,
mem_threshold_mb=1500, # Lower threshold for docling processes
max_workers=settings.docling_workers,
)
Ingestion Functions
Main Entry Point
The ingest() function serves as the primary CLI command handler:
# src/docs2db/ingest.py#L89-L159
def ingest(
source_path: str,
dry_run: bool = False,
force: bool = False,
pipeline: Optional[str] = None,
model: Optional[str] = None,
device: Optional[str] = None,
batch_size: Optional[int] = None,
workers: Optional[int] = None,
) -> bool:
Configuration override precedence:
- CLI arguments (highest priority)
- Environment variables
-
.envfile settings - Default values (lowest priority)
Single File Ingestion
# src/docs2db/ingest.py#L161-L205
def ingest_file(
source_file: Path,
content_path: Path,
source_metadata: dict | None = None,
) -> bool:
The function performs the following operations:
- Validates source file existence
- Creates target content directory structure
- Invokes Docling converter
- Serializes DoclingDocument to JSON
- Generates processing metadata
Content-Based Ingestion
# src/docs2db/ingest.py#L207-L260
def ingest_from_content(
content: str,
content_path: Path,
stream_name: str,
source_metadata: dict | None = None,
content_encoding: str = "utf-8",
) -> bool:
This function enables in-memory content ingestion without disk-based source files. It accepts raw content (HTML, markdown, etc.), converts it through Docling, and writes to the content directory.
Configuration System
Settings Management
Configuration is centralized in config.py with environment variable support:
| Setting | Environment Variable | Default | Purpose |
|---|---|---|---|
docling_pipeline |
DOCLING_PIPELINE |
"standard" |
Processing pipeline |
docling_model |
DOCLING_MODEL |
None |
Model selection |
docling_device |
DOCLING_DEVICE |
"auto" |
Compute device |
docling_batch_size |
DOCLING_BATCH_SIZE |
1 |
Files per batch |
docling_workers |
DOCLING_WORKERS |
4 |
Parallel workers |
# src/docs2db/ingest.py#L131-L144
if pipeline:
settings.docling_pipeline = pipeline
if model:
settings.docling_model = model
if device:
settings.docling_device = device
if batch_size:
settings.docling_batch_size = batch_size
if workers:
settings.docling_workers = workers
Output Structure
Content Directory Organization
Ingestion creates the following directory structure within docs2db_content/:
docs2db_content/
├── path/
│ └── to/
│ └── document/
│ ├── source.json # Docling JSON representation
│ └── meta.json # Processing metadata
Source File Format
The output source.json contains a complete DoclingDocument model with:
- Document text in Markdown format
- Bounding box coordinates for layout analysis
- Table structures
- Image references
- Metadata including page numbers and source file information
# src/docs2db/ingest.py#L280-L288
dl_doc = DoclingDocument.model_validate_json(
json_data=source_file.read_text().encoding("utf-8")
)
# ... processing ...
dl_doc.model_dump_json().encode("utf-8")
CLI Integration
Command Definition
The ingestion command integrates with the Typer CLI framework:
# src/docs2db/docs2db.py#L44-L92
@app.command()
def ingest(
source_path: Annotated[
Optional[str], typer.Argument(help="Path to directory or file to ingest")
],
dry_run: Annotated[bool, typer.Option(help="...")] = False,
force: Annotated[bool, typer.Option(help="...")] = False,
pipeline: Annotated[Optional[str], typer.Option("--pipeline", help="...")] = None,
model: Annotated[Optional[str], typer.Option("--model", help="...")] = None,
device: Annotated[Optional[str], typer.Option("--device", help="...")] = None,
batch_size: Annotated[Optional[int], typer.Option("--batch-size", help="...")] = None,
workers: Annotated[Optional[int], typer.Option("--workers", help="...")] = None,
) -> None:
Usage Patterns
# Basic ingestion
docs2db ingest /path/to/documents
# Dry run (preview without processing)
docs2db ingest /path/to/documents --dry-run
# Force reprocessing
docs2db ingest /path/to/documents --force
# Custom pipeline and model
docs2db ingest /path/to/documents --pipeline vlm --model docling-gm
# CPU-only processing
docs2db ingest /path/to/documents --device cpu
# Parallel workers
docs2db ingest /path/to/documents --workers 8
Error Handling and Resilience
Failure Modes
The batch processor tracks errors and continues processing:
# src/docs2db/ingest.py#L326-L330
if errors > 0:
logger.error(f"Ingestion completed with {errors} errors")
logger.info(f"{processed} files ingested in {end - start:.2f} seconds")
return errors == 0
Staleness Detection
The system implements incremental processing through file modification timestamps:
# Checks if chunks.json exists and is newer than source.json
# Used in chunking stage - same pattern applies to ingestion output
if not force and not is_chunks_stale(chunks_file, source_file):
return chunks_file
This prevents unnecessary reprocessing when source files remain unchanged.
Library Usage
Programmatic Access
The ingestion module exposes public functions for programmatic use:
from pathlib import Path
from docs2db.ingest import ingest_file, ingest_from_content
# Ingest from file system
ingest_file(
source_file=Path("document.pdf"),
content_path=Path("docs2db_content/my_docs/document"),
source_metadata={"source": "my_system", "retrieved_at": "2024-01-01"}
)
# Ingest from memory
ingest_from_content(
content="<html>...</html>",
content_path=Path("docs2db_content/my_docs/page"),
stream_name="page.html",
source_metadata={"url": "https://example.com"},
content_encoding="utf-8"
)
Observed Structural Gaps
-
No retry mechanism: Failed file ingestion does not trigger automatic retry, potentially requiring manual re-execution of the entire batch.
-
Limited validation: The system does not validate Docling conversion results beyond basic serialization, meaning corrupted or unreadable source files may produce empty
source.jsonfiles without raising errors. -
No progress persistence: If the batch processor is interrupted, progress is lost and the entire job must restart from the beginning.
-
Memory threshold fixed: The
mem_threshold_mb=1500is hardcoded in the CLI invocation, not configurable through settings.
Conclusion
Document ingestion in docs2db functions as the foundational pipeline stage, converting heterogeneous source documents into a uniform Docling JSON representation. The architecture employs parallel batch processing with memory-aware worker recycling, supporting configuration through CLI options and environment variables. The output maintains source directory structure within a dedicated content directory, enabling subsequent chunking, embedding, and loading stages to operate on standardized inputs. The system’s incremental processing capability via staleness detection reduces redundant work, though the absence of retry logic and progress persistence represents limitations in failure resilience.