# Article 4: Generating and Storing Embeddings in pgvector

## Introduction

After the chunker produces a list of text segments, each segment needs to be converted to a vector — a list of floating-point numbers that represents its semantic meaning in a high-dimensional space. That vector is what gets stored in pgvector and queried during retrieval.

This article covers the two embedding providers I use (`sentence-transformers` for local inference and the GitHub Models API for API-based inference), the batching strategy that makes bulk ingestion practical, and how the vectors are stored in the database.

***

## Table of Contents

1. [What an Embedding Is](#what-an-embedding-is)
2. [Embedding Provider Design](#embedding-provider-design)
3. [Local Embeddings with sentence-transformers](#local-embeddings)
4. [API Embeddings with GitHub Models](#api-embeddings)
5. [Batching for Bulk Ingestion](#batching)
6. [Storing Embeddings in pgvector](#storing-embeddings)
7. [The Embedding Background Worker](#background-worker)
8. [What I Learned](#what-i-learned)

***

## What an Embedding Is

An embedding model takes text and outputs a fixed-length vector of floats. Models are trained so that semantically similar texts produce vectors that are close together in vector space (measured by cosine distance), while unrelated texts are far apart.

For example:

* "configure TLS on Kubernetes ingress" and "set up HTTPS certificate for ingress controller" should have high cosine similarity
* "configure TLS on Kubernetes ingress" and "Python list comprehension syntax" should have low cosine similarity

The model learns this structure from large text corpora — it's the same training objective as language model pre-training but the output is a dense vector rather than a next-token prediction.

The embedding is not interpretable directly. You can't look at position 142 of a 384-dimensional vector and know what it means. What matters is the relative distances, not the absolute values.

***

## Embedding Provider Design

I use an abstract base class so the ingestion pipeline and query pipeline are decoupled from the specific embedding model:

```python
# src/embeddings/base.py
from abc import ABC, abstractmethod

class EmbeddingProvider(ABC):
    
    @property
    @abstractmethod
    def dimensions(self) -> int:
        """Return the vector dimension this provider produces."""
        ...
    
    @property
    @abstractmethod
    def model_name(self) -> str:
        """Return the canonical model identifier."""
        ...
    
    @abstractmethod
    async def embed(self, texts: list[str]) -> list[list[float]]:
        """
        Embed a list of texts.
        Returns a list of vectors, one per input text.
        """
        ...
    
    async def embed_one(self, text: str) -> list[float]:
        """Convenience wrapper for single-text embedding."""
        results = await self.embed([text])
        return results[0]
```

Both the local and API provider implement this interface, and the ingestion pipeline only knows about `EmbeddingProvider`. Switching providers requires changing one environment variable.

***

## Local Embeddings with sentence-transformers <a href="#local-embeddings" id="local-embeddings"></a>

`all-MiniLM-L6-v2` is the model I use for local inference. It produces 384-dimensional vectors, is fast (\~10ms per batch on CPU), has an 256-token input limit, and its quality is good enough for sentence and short-paragraph similarity.

```python
# src/embeddings/local.py
import asyncio
from functools import lru_cache
from sentence_transformers import SentenceTransformer
from src.embeddings.base import EmbeddingProvider
import structlog

log = structlog.get_logger()

class LocalEmbeddingProvider(EmbeddingProvider):
    """
    sentence-transformers inference running in the same process.
    Runs in a thread pool to avoid blocking the event loop during
    model inference.
    """
    
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self._model_name = model_name
        self._model: SentenceTransformer | None = None
    
    def _load_model(self) -> SentenceTransformer:
        if self._model is None:
            log.info("embedding.model.load", model=self._model_name)
            self._model = SentenceTransformer(self._model_name)
        return self._model
    
    @property
    def dimensions(self) -> int:
        return 384
    
    @property
    def model_name(self) -> str:
        return self._model_name
    
    async def embed(self, texts: list[str]) -> list[list[float]]:
        loop = asyncio.get_event_loop()
        # Run blocking model inference in a thread pool
        vectors = await loop.run_in_executor(
            None,
            lambda: self._load_model().encode(
                texts,
                batch_size=64,
                normalize_embeddings=True,  # L2 normalize for cosine similarity
                show_progress_bar=False,
            ).tolist()
        )
        return vectors
```

`normalize_embeddings=True` is important. It L2-normalizes each vector before returning it, which means cosine similarity is equivalent to dot product, and the `<=>` pgvector operator gives meaningful results.

Running model inference in `run_in_executor` is necessary because `SentenceTransformer.encode()` is synchronous and CPU-bound — calling it directly from an async function would block the event loop.

***

## API Embeddings with GitHub Models <a href="#api-embeddings" id="api-embeddings"></a>

When I want higher-quality embeddings (particularly for longer documents), I use `text-embedding-3-small` via the GitHub Models API. The same OpenAI SDK client works because GitHub Models uses the OpenAI API format.

```python
# src/embeddings/github_models.py
import asyncio
from openai import AsyncOpenAI
import os
from src.embeddings.base import EmbeddingProvider
import structlog

log = structlog.get_logger()

class GitHubModelsEmbeddingProvider(EmbeddingProvider):
    """
    Embedding via GitHub Models API (text-embedding-3-small or text-embedding-3-large).
    Requires GITHUB_TOKEN environment variable.
    """
    
    _DIMENSIONS = {
        "text-embedding-3-small": 1536,
        "text-embedding-3-large": 3072,
    }
    
    def __init__(self, model: str = "text-embedding-3-small"):
        self._model = model
        self._client = AsyncOpenAI(
            base_url="https://models.inference.ai.azure.com",
            api_key=os.environ["GITHUB_TOKEN"],
        )
    
    @property
    def dimensions(self) -> int:
        return self._DIMENSIONS[self._model]
    
    @property
    def model_name(self) -> str:
        return self._model
    
    async def embed(self, texts: list[str]) -> list[list[float]]:
        response = await self._client.embeddings.create(
            model=self._model,
            input=texts,
            encoding_format="float",
        )
        # Sort by index to preserve input order (API may reorder)
        sorted_data = sorted(response.data, key=lambda d: d.index)
        return [item.embedding for item in sorted_data]
```

The GitHub Models API has a batch limit of 2,048 input items per call. In practice my batches are much smaller (32–64 texts), so this is not an issue.

***

## Batching for Bulk Ingestion <a href="#batching" id="batching"></a>

Embedding models — both local and API — are much more efficient in batch mode than one-at-a-time. A batch of 64 texts takes roughly the same time as a batch of 1 on the local model (because the GPU/CPU can parallelize the matrix multiplications), and using API batches reduces the number of HTTP round-trips.

```python
# src/ingestion/pipeline.py
from itertools import islice
from src.embeddings.base import EmbeddingProvider
from src.db.models import Chunk
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
import structlog

log = structlog.get_logger()

def _batched(iterable, n: int):
    """Yield successive n-sized batches from an iterable."""
    it = iter(iterable)
    while batch := list(islice(it, n)):
        yield batch

async def embed_pending_chunks(
    db: AsyncSession,
    embedder: EmbeddingProvider,
    batch_size: int = 64,
) -> int:
    """
    Find chunks with embedding=NULL and embed them in batches.
    Returns the total number of chunks embedded.
    """
    # Fetch all chunk IDs and content where embedding is NULL
    result = await db.execute(
        select(Chunk.id, Chunk.content)
        .where(Chunk.embedding == None)
        .order_by(Chunk.id)
    )
    pending = result.all()
    
    if not pending:
        return 0
    
    log.info("embedding.start", pending_count=len(pending), batch_size=batch_size)
    total_embedded = 0
    
    for batch in _batched(pending, batch_size):
        ids    = [row.id for row in batch]
        texts  = [row.content for row in batch]
        
        vectors = await embedder.embed(texts)
        
        # Bulk update the embedding column for this batch
        for chunk_id, vector in zip(ids, vectors):
            await db.execute(
                update(Chunk)
                .where(Chunk.id == chunk_id)
                .values(
                    embedding=vector,
                    embedding_model=embedder.model_name,
                )
            )
        
        await db.commit()
        total_embedded += len(batch)
        log.info("embedding.batch.done", embedded=total_embedded, remaining=len(pending) - total_embedded)
    
    return total_embedded
```

The bulk update uses per-row `UPDATE` statements. For very large corpora, this could be improved with `executemany` or a temporary table approach, but for a personal knowledge base of a few thousand chunks it's fast enough.

***

## Storing Embeddings in pgvector <a href="#storing-embeddings" id="storing-embeddings"></a>

The `pgvector` Python package provides `Vector` for SQLAlchemy and handles serialization automatically. When writing a Python `list[float]` to a `Vector(384)` column, the package converts it to the binary wire format that PostgreSQL expects.

```python
# How it works under the hood (you don't write this yourself)
# pgvector.sqlalchemy.Vector serializes list[float] → PostgreSQL binary vector
# and deserializes PostgreSQL vector → list[float] on reads

# Writing:
chunk.embedding = [0.021, -0.043, 0.117, ...]   # list[float]
await db.commit()   # pgvector handles conversion

# Reading:
result = await db.execute(select(Chunk).where(Chunk.id == 1))
chunk = result.scalar_one()
print(chunk.embedding)  # → [0.021, -0.043, 0.117, ...]
```

For the raw SQL path (used in retrieval), vectors are cast to the PostgreSQL type using `::vector`:

```python
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

async def similarity_search_raw(
    db: AsyncSession,
    query_vector: list[float],
    limit: int = 5,
) -> list[dict]:
    result = await db.execute(
        text("""
            SELECT
                c.id,
                c.content,
                c.tokens,
                d.file_path,
                d.title,
                1 - (c.embedding <=> :query_vec::vector) AS similarity
            FROM chunks c
            JOIN documents d ON d.id = c.document_id
            WHERE c.embedding IS NOT NULL
            ORDER BY c.embedding <=> :query_vec::vector
            LIMIT :limit
        """),
        {"query_vec": str(query_vector), "limit": limit},
    )
    return [dict(row._mapping) for row in result]
```

The `:query_vec::vector` cast in the raw SQL query is the pattern that pgvector uses when you pass a Python list as a string representation of the vector.

***

## The Embedding Background Worker <a href="#background-worker" id="background-worker"></a>

The ingestion pipeline creates chunks with `embedding=NULL` and then triggers a background task to embed them asynchronously. This keeps the ingestion HTTP response fast (the file is loaded and chunked immediately) while the embedding computation happens in the background.

```python
# src/ingestion/pipeline.py
import asyncio
from src.ingestion.pipeline import embed_pending_chunks
from src.embeddings.base import EmbeddingProvider
from src.db.base import AsyncSessionLocal
import structlog

log = structlog.get_logger()

async def embedding_worker(embedder: EmbeddingProvider, interval_seconds: int = 30) -> None:
    """
    Background task that periodically finds unembedded chunks and embeds them.
    Runs until cancelled.
    """
    log.info("embedding_worker.start", interval=interval_seconds)
    
    while True:
        try:
            async with AsyncSessionLocal() as db:
                count = await embed_pending_chunks(db, embedder)
                if count > 0:
                    log.info("embedding_worker.cycle", embedded=count)
        except asyncio.CancelledError:
            log.info("embedding_worker.stopped")
            raise
        except Exception as exc:
            # Log and continue — don't let a transient error kill the worker
            log.error("embedding_worker.error", error=str(exc))
        
        await asyncio.sleep(interval_seconds)
```

This worker is started as a background task in FastAPI's lifespan. If the process restarts mid-embedding (e.g., power outage, container restart), the worker will find the partially-embedded chunks on next startup and resume from where it left off — because the chunk `embedding` column is `NULL` until it's successfully committed.

***

## What I Learned

**`normalize_embeddings=True` is easy to forget and hard to diagnose.** If you forget to normalize, cosine similarity doesn't work correctly — you're measuring the angle between vectors, but unnormalized vectors have magnitudes that pollute the distance calculation. The symptom is "retrieval returns random-looking results". Always normalize.

**Batch size of 64 is a reasonable default but test on your hardware.** On my laptop with an Apple M2, the local model processes 64 texts in about 60ms. On a CPU-only x86 machine in a cloud VM, the same batch takes \~350ms. There's no universal optimal batch size — measure on your target hardware.

**Embedding after ingestion, not during, keeps the API responsive.** My first version embedded synchronously inside the HTTP handler. Ingesting 200 files took 40 seconds and the HTTP connection timed out. The background worker pattern eliminates this — the ingestion call returns in under 2 seconds, and embedding happens asynchronously over the following minutes.

**Track which model embedded each chunk.** I store `embedding_model` on each `Chunk` row. When I switched from `all-MiniLM-L6-v2` to `text-embedding-3-small`, I needed to know which chunks were embedded with which model — mixing models in the same similarity search produces garbage results because they live in different vector spaces. The `embedding_model` column made it easy to find and re-embed the old chunks.

***

**Next**: [Article 5 — Semantic Search, Cosine Similarity, and Hybrid Retrieval](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/rag-101/rag-101-retrieval)
