Blog | Developer | | 26 min read

How to Build a Multi-Tenant RAG for Customer Support

multi-tenant RAG

Summary

  • Build a private multi-tenant RAG system that keeps support ticket data and embeddings inside your own infrastructure.
  • Sanitize PII before ingestion, then chunk, embed, and store tickets with customer_id metadata for tenant-aware retrieval.
  • Store shared knowledge base content separately in the same collection without customer IDs so it can be reused across tenants.
  • Enforce isolation by always combining semantic search with customer_id filters in application code.
  • Complete the system with a local LLM, audit logging, and incremental ingestion for new tickets.

Support tickets contain customer data, including names, email addresses, account details, and usage patterns. Under the General Data Protection Regulation (GDPR) and the California Consumer Privacy Act (CCPA), this data imposes processor obligations.

Embedding these tickets in a cloud vector index constitutes a processing activity. This requires a valid legal basis, adherence to data residency requirements, and explicit contractual permission.

Most platform engineers building internal AI tools hit the same wall. A cloud-based Retrieval Augmented Generation (RAG) prototype works well in staging, but compliance requirements stall deployment. A customer may require data to remain in the EEA under a Data Processing Agreement (DPA); an auditor may question where customer information flows during AI processing; or legal teams may flag tickets that contain regulated account numbers. Each case creates the same blocker: customer data leaves the internal infrastructure.

Companies that deploy semantic search across their support knowledge base report 40-60% faster resolution times when agents have access to it. RAG systems combine retrieval (searching relevant context) with generation (AI-powered answers), making them ideal for customer support. The performance case is strong. The architecture just needs to keep the data local and secure.

This tutorial shows how to use Actian VectorAI DB to build a multi-tenant RAG system that keeps customer data inside local infrastructure. VectorAI DB supports metadata filtering through its filter API, with tenant-enforced isolation at the application level. It embeds tickets locally, stores them with required customer ID fields, and runs queries that preserve tenant boundaries. No customer personally identifiable information (PII) crosses the network perimeter.

Three Ways Cloud RAG Creates Liability

Cloud RAG creates three critical failure modes when handling customer support data:

  1. Support tickets contain PII that cloud databases cannot adequately protect. Under GDPR and CCPA, embedding those tickets in a cloud vector index may qualify as data processing, triggering legal and data residency requirements beyond what existing BAA or SOC 2 coverage typically addresses.
  2. The embedding API call transmits data and requires the same privacy controls as any other external API request. When ticket text is sent to OpenAI’s servers for embedding, personal data is transferred to a third party.
  3. Multi-tenant architectures risk customer data exposure through misconfigured filters or API bugs. Asana’s AI connector exposed sensitive data across organizations due to improperly scoped tool access, creating contractual liabilities beyond vendor SLA coverage.
    The solution is architectural: keep embeddings on-premises, isolate customers at the metadata level, and never route sensitive data to external services. This tutorial shows how to build that architecture using VectorAI DB.

cloud rag vs. on premises rag

Data flow arrows crossing infrastructure boundary (cloud) vs. staying inside (on-prem)

Now that you understand why cloud RAG fails for customer support, let’s build a compliant alternative.

What you are Building

This tutorial builds a three-layer system that runs entirely on your infrastructure. Support agents ask questions; the system searches only that customer’s ticket history and shared knowledge base (KB) articles; and a local LLM generates cited answers with ticket numbers and KB references. No customer data leaves the internal network.

Layer 1: Ingestion

The system ingests support tickets and knowledge base articles separately. The system chunks tickets (512 tokens, 50-token overlap), embeds them locally using all-MiniLM-L6-v2, and stores them with the customer_id as a mandatory metadata field.

The collection schema includes:

  • source_type: “ticket” or “knowledge_base
  • customer_id: Mandatory for tickets, null for KB articles
  • product_line: Product area the ticket relates to
  • ticket_status: Open, closed, escalated
  • created_date: Timestamp for chronological filtering

Each ticket chunk includes these fields, along with the text content and its vector embedding. Knowledge base articles follow the same embedding process, with the source type set to “knowledge_base” and no customer ID required. These chunks are accessible to all customers during search.

Layer 2: Query

When a support agent asks a question about Customer A’s history, the query layer applies customer ID filtering before semantic search. The application code constructs filters that restrict results to the specified customer. VectorAI DB’s filter API applies these filters, but isolation depends on the application correctly passing them in every query. 

Hybrid search combines the customer_id filter with semantic matching. The system returns relevant chunks from that customer’s tickets plus shared knowledge base articles. Results include similarity scores and source references.

A local LLM (Ollama with llama3.2:3b or mistral:7b) generates answers using only the retrieved context. The RAG prompt instructs the model to cite sources and say it doesn’t know when context is absent. Local LLM generation typically completes in 2-5 seconds, depending on context size and hardware.

Layer 3: Multi-tenant isolation

  prevents cross-customer data leaks when application code implements filters correctly. All customers share a single collection (support_data), with customer separation enforced through customer_id filters passed at query time.

VectorAI DB executes these filters through its filter API. However, if the application code has a bug and omits the customer_id filter, the database will return all customer data. Schema rules do not prevent unrestricted queries. Application code enforces isolation, not the database.

Knowledge base articles are shared across all customers. Ticket data is logically separated. A query for Customer A should construct filters to retrieve:

  • All chunks with customer_id = “A” AND source_type = “ticket”.
  • All chunks with source_type = “knowledge_base” (no customer filter).

When filters are correctly applied, a query cannot return ticket chunks from Customer B. The application layer is responsible for filter consistency across all query paths.

Architectural alternatives:

  • True collection-level isolation: Create one collection per customer (e.g., customer_a_tickets, customer_b_tickets). This creates a hard boundary that prevents queries against Customer B’s collection when searching Customer A’s data, even in the presence of buggy code. The downside is added operational overhead.
  • Metadata-filter isolation: Use a single shared collection with application-enforced filters. It’s simpler to operate, but requires disciplined filter construction and code review to prevent leaks.

This tutorial uses metadata-filter isolation because it balances security with operational simplicity for teams that can enforce code review and testing standards.

Hardware baseline: This tutorial runs on a single 16GB RAM instance. Testing with similar workloads showed query latency under 500ms on commodity hardware. 

multi-tenant rag system architecture

Three-layer architecture (ingestion → storage → query) with customer ID enforcement layer

For more on measuring this system’s effectiveness, see How to Measure RAG System Performance.

Build It

Every code block below is designed to run. The code has been tested against VectorAI DB and will run without modification after you install the dependencies.

Prerequisites

  • Docker and Docker Compose
  • Python 3.10 or higher
  • UV package manager installed (curl -LsSf https://astral.sh/uv/install.sh | sh)

Python dependencies (install on host):

uv add sentence-transformers pandas transformers actian-vectorai
# Or with pip:
pip install sentence-transformers pandas transformers actian-vectorai

Step 1: Deploy VectorAI DB

What this produces: A running VectorAI DB instance with persistent storage for ticket and knowledge base data.

# docker-compose.yml
version: '3.8'
services:
  vectorai-db:
    image: williamimoh/actian-vectorai-db:1.0b
    platform: linux/amd64
    container_name: vectorai-support-db
    ports:
      - "50052:50051"
    volumes:
      - ./data:/app/data
      - ./audit_logs:/app/audit_logs
    environment:
      - VECTORAI_LOG_LEVEL=info
    restart: unless-stopped

Understanding the volume mounts:

  • ./data:/app/data – VectorAI DB stores its database files in /app/data inside the container, which maps to ./data/ on your host.
  • ./audit_logs:/app/audit_logs – Created for future use if you run audit scripts inside the container.

Important: In this tutorial, all Python scripts run on your host machine, not inside the container. The scripts write audit logs to ./audit_logs/queries.jsonl on the host. The Docker volume mount is optional for this tutorial, but included for teams that later want to run Python scripts inside the container.

Start the database:

Run: 

docker-compose up -d

docker startup
Docker startup

Step 2: Sanitize PII before ingestion

This step produces clean ticket data with PII removed before any processing enters your vector database.

Common PII patterns to filter:

# pii_filter.py
"""PII filtering for GDPR/CCPA compliance."""
import re

# GDPR/CCPA PII patterns
PII_PATTERNS = [
# Emails
    (r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "[EMAIL]"),
# Phone numbers (US format)
    (r"\b\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b", "[PHONE]"),
# Credit card numbers
    (r"\b\d{4}[-\s]\d{4}[-\s]\d{4}[-\s]\d{4}\b", "[CARD]"),
# Social Security Numbers
    (r"\b\d{3}-\d{2}-\d{4}\b", "[SSN]"),
# Account numbers
    (r"\b[Aa]ccount\s*#?\s*:?\s*([A-Z]{2,4}[-]?)?\d{6,12}\b", "[ACCOUNT]"),
# Invoice numbers
    (r"\b[Ii]nvoice\s*#?\s*:?\s*\d{4,10}\b", "[INVOICE]"),
]

def sanitize_pii(text: str) -> str:
"""Remove GDPR/CCPA PII from text before embedding.
    Note: Date and IP patterns are excluded to avoid matching version numbers
    like "Python 3.10.4.1" and pagination like "page 3/10". For production,
    use Microsoft Presidio: https://microsoft.github.io/presidio/
    """
for pattern, replacement in PII_PATTERNS:
        text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
return text.strip()This filtering happens BEFORE chunking or embedding, ensuring no PII enters your pipeline.

pii sanitation
PII sanitization

Step 3: Build the ticket ingestion pipeline

What this produces: Sanitized customer support tickets that are chunked, embedded, and stored in VectorAI DB with required customer ID metadata. 

# ticket_ingestion.py
"""Ingest customer support tickets into VectorAI DB with tokenizer-based chunking."""
import hashlib
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer
from actian_vectorai import VectorAIClient, PointStruct, VectorParams, Distance
import pandas as pd
from pii_filter import sanitize_pii  # Import from Step 2

# Config
VECTORAI_HOST = "localhost:50052"
COLLECTION = "support_data"
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
VECTOR_DIM = 384

# Initialize tokenizer for accurate token-based chunking
print(f"Loading tokenizer for: {EMBED_MODEL}")
tokenizer = AutoTokenizer.from_pretrained(EMBED_MODEL)

def chunk_text(text, chunk_size=512, overlap=50):
    """Split text into overlapping chunks by TOKEN count (not words).
    Args:
        text: Text to chunk
        chunk_size: Number of tokens per chunk (default: 512)
        overlap: Number of tokens to overlap between chunks (default: 50)
        
    Returns:
        List of text chunks
    """
    # Tokenize the entire text
    tokens = tokenizer.encode(text, add_special_tokens=False)
    chunks = []
    
    # Create overlapping chunks
    for i in range(0, len(tokens), chunk_size - overlap):
        chunk_tokens = tokens[i:i + chunk_size]
        # Decode back to text
        chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
        chunks.append(chunk_text)
    
    return chunks

# Initialize embedding model
print(f"Loading embedding model: {EMBED_MODEL}")
model = SentenceTransformer(EMBED_MODEL)

def embed(texts: list[str]) -> list[list[float]]:
    """Generate embeddings for a list of texts."""
    return model.encode(texts, normalize_embeddings=True).tolist()

def _generate_stable_id(text: str, index: int) -> int:
    """Generate stable integer ID using SHA-256 hash."""
    hash_input = f"{text}:{index}".encode()
    hash_output = hashlib.sha256(hash_input).hexdigest()
    return int(hash_output[:15], 16)

def ingest_tickets(csv_path, customer_id):
    """Ingest tickets for a specific customer.
    
    Args:
        csv_path: Path to CSV file with columns: ticket_id, ticket_text, 
                  product_line, status, created_date
        customer_id: Customer identifier for isolation
    """
    print(f"\nProcessing {csv_path} for {customer_id}...")
    df = pd.read_csv(csv_path)
    
    with VectorAIClient(VECTORAI_HOST) as client:
        # Create collection if it doesn't exist
        if not client.collections.exists(COLLECTION):
            client.collections.create(
                COLLECTION,
                vectors_config=VectorParams(size=VECTOR_DIM, distance=Distance.Cosine)
            )
            print(f"✓ Collection '{COLLECTION}' created (dim={VECTOR_DIM})")
        else:
            print(f"✓ Collection '{COLLECTION}' already exists")
        
        points_batch = []
        total_chunks = 0
        
        for idx, row in df.iterrows():
            # Step 1: Sanitize PII FIRST (imported from pii_filter.py)
            clean_text = sanitize_pii(row['ticket_text'])
            
            # Step 2: Chunk by TOKENS (not words)
            chunks = chunk_text(clean_text)
            
            # Step 3: Embed
            vectors = embed(chunks)
            
            # Step 4: Create PointStruct objects with strict metadata
            for i, (chunk, vector) in enumerate(zip(chunks, vectors)):
                point_id = _generate_stable_id(row['ticket_id'], i)
                
                point = PointStruct(
                    id=point_id,
                    vector=vector,
                    payload={
                        "customer_id": customer_id,
                        "source_type": "ticket",
                        "ticket_id": row['ticket_id'],
                        "product_line": row.get('product_line', 'general'),
                        "ticket_status": row.get('status', 'closed'),
                        "created_date": row['created_date'],
                        "text": chunk,
                        "chunk_index": i,
                    }
                )
                points_batch.append(point)
            
            total_chunks += len(chunks)
        
        # Upload all points in batches
        if points_batch:
            client.upload_points(COLLECTION, points_batch, batch_size=100)
        
        print(f"✓ {customer_id}: ingested {len(df)} tickets, {total_chunks} chunks")

# Usage
if __name__ == "__main__":
    print("=" * 70)
    print("TICKET INGESTION - Token-Based Chunking")
    print("=" * 70)
    
    ingest_tickets("sample_tickets/customer_a_tickets.csv", customer_id="customer_a")
    ingest_tickets("sample_tickets/customer_b_tickets.csv", customer_id="customer_b")
    ingest_tickets("sample_tickets/customer_c_tickets.csv", customer_id="customer_c")
    
    print("\n" + "=" * 70)
    print("INGESTION COMPLETE!")
    print("=" * 70)

Run: 

uv run python ticket_ingestion.py

ticket ingestion starting
Ticket ingestion output showing chunks per customer

Step 4: Build the KB ingestion pipeline

What this produces: Knowledge base articles embedded and stored with source type “knowledge_base“, accessible to all customers.

# kb_ingestion.py
"""Ingest knowledge base articles into VectorAI DB with tokenizer-based chunking."""
import glob
import hashlib
from actian_vectorai import VectorAIClient, PointStruct
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer

VECTORAI_HOST = "localhost:50052"
COLLECTION = "support_data"
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"

# Initialize tokenizer for accurate token-based chunking
print(f"Loading tokenizer for: {EMBED_MODEL}")
tokenizer = AutoTokenizer.from_pretrained(EMBED_MODEL)

# Initialize embedding model
print(f"Loading embedding model: {EMBED_MODEL}")
model = SentenceTransformer(EMBED_MODEL)

def chunk_text(text, chunk_size=512, overlap=50):
"""Split text into overlapping chunks by TOKEN count (not words).
    Args:
        text: Text to chunk
        chunk_size: Number of tokens per chunk (default: 512)
        overlap: Number of tokens to overlap between chunks (default: 50)
    Returns:
        List of text chunks
    """
# Tokenize the entire text
    tokens = tokenizer.encode(text, add_special_tokens=False)
    chunks = []
# Create overlapping chunks
for i in range(0, len(tokens), chunk_size - overlap):
        chunk_tokens = tokens[i:i + chunk_size]
# Decode back to text
        chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
        chunks.append(chunk_text)
return chunks

def _generate_stable_id(text: str, index: int) -> int:
"""Generate stable integer ID using SHA-256 hash."""
    hash_input = f"{text}:{index}".encode()
    hash_output = hashlib.sha256(hash_input).hexdigest()
return int(hash_output[:15], 16)

def ingest_kb_articles(markdown_dir):
"""Ingest knowledge base articles from markdown files.
    Args:
        markdown_dir: Directory containing .md files
    """
    print("=" * 70)
    print("KNOWLEDGE BASE INGESTION - Token-Based Chunking")
    print("=" * 70)
with VectorAIClient(VECTORAI_HOST) as client:
        points_batch = []
        total_chunks = 0
for filepath in glob.glob(f"{markdown_dir}/*.md"):
with open(filepath, 'r', encoding='utf-8') as f:
                content = f.read()
            article_id = filepath.split('/')[-1].replace('.md', '')
# Chunk by TOKENS (not words)
            chunks = chunk_text(content)
            vectors = model.encode(chunks, normalize_embeddings=True).tolist()
for i, (chunk, vector) in enumerate(zip(chunks, vectors)):
                point_id = _generate_stable_id(article_id, i)
                point = PointStruct(
                    id=point_id,
                    vector=vector,
                    payload={
"source_type": "knowledge_base",
"article_id": article_id,
"text": chunk,
"chunk_index": i,
# NO customer_id -- shared across all customers
                    }
                )
                points_batch.append(point)
            total_chunks += len(chunks)
            print(f"✓ {article_id}: {len(chunks)} chunks")
# Upload all points
if points_batch:
            client.upload_points(COLLECTION, points_batch, batch_size=100)
        print(f"\n✓ KB ingestion complete: {total_chunks} chunks total")
        print("=" * 70)

if __name__ == "__main__":
    ingest_kb_articles("./knowledge_base")

Run:  

uv run python kb_ingestion.py

knowledge base ingestion starting

Knowledge base ingestion output

Step 5: Create shared search module:

What This Produces: The next step is to create a shared search module that provides a reusable search function for all subsequent scripts. This module eliminates function duplication. The next four steps (query, isolation demo, RAG system, and audit logging) all import search_customer_tickets() from this shared module rather than duplicating its logic.

#search.py

"""Shared search functionality for customer-scoped queries.
"""
from actian_vectorai import VectorAIClient, FilterBuilder, Field
from sentence_transformers import SentenceTransformer

VECTORAI_HOST = "localhost:50052"
COLLECTION = "support_data"
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"

# Initialize embedding model once
model = SentenceTransformer(EMBED_MODEL)

def search_customer_tickets(query_text, customer_id, top_k=5):
"""Search tickets for a specific customer plus shared KB articles.
    Args:
        query_text: Natural language query
        customer_id: Customer identifier for filtering
        top_k: Maximum number of results to return
    Returns:
        List of result dictionaries with score, customer_id, source_type, 
        ticket_id, article_id, and text fields
    """
    query_vector = model.encode([query_text], normalize_embeddings=True).tolist()[0]
    results = []
with VectorAIClient(VECTORAI_HOST) as client:
# Search customer's tickets
        ticket_filter = (
            FilterBuilder()
            .must(Field("customer_id").eq(customer_id))
            .must(Field("source_type").eq("ticket"))
            .build()
        )
        ticket_hits = client.points.search(
            collection_name=COLLECTION,
            vector=query_vector,
            limit=top_k,
            filter=ticket_filter
        )
# Search shared KB articles
        kb_filter = FilterBuilder().must(Field("source_type").eq("knowledge_base")).build()
        kb_hits = client.points.search(
            collection_name=COLLECTION,
            vector=query_vector,
            limit=top_k,
            filter=kb_filter
        )
# Combine results
for hit in ticket_hits + kb_hits:
            results.append({
"score": round(hit.score, 4),
"customer_id": hit.payload.get("customer_id"),
"source_type": hit.payload.get("source_type"),
"ticket_id": hit.payload.get("ticket_id"),
"article_id": hit.payload.get("article_id"),
"text": hit.payload.get("text", ""),
            })
# Sort by score and return top_k
    results.sort(key=lambda r: r["score"], reverse=True)
return results[:top_k]

Step 6: Run customer-scoped queries

What this produces: Search results filtered to a specific customer’s tickets plus shared KB articles.

# query.py
"""Run customer-scoped queries using shared search module."""
from search import search_customer_tickets

# Usage
if __name__ == "__main__":
    query = "How do I reset a customer's password?"
    customer = "customer_a"
    print(f"Query: {query}")
    print(f"Customer: {customer}\n")
    results = search_customer_tickets(query, customer_id=customer)
    print("Results:")
    print("=" * 70)
for i, hit in enumerate(results):
        source = hit['ticket_id'] or hit['article_id']
        cust = hit.get('customer_id') or 'N/A (KB)'
        print(f"\n{i+1}. [{hit['source_type']}] {source}")
        print(f"   Customer: {cust}")
        print(f"   Score: {hit['score']}")
        print(f"   Text: {hit['text'][:100]}...")
    print("\n" + "=" * 70)

Run:  

uv run python query.py

Notice how query.py is now more concise. It imports the search function from search.py instead of duplicating the search logic. The same pattern carries through the next three steps.

Step 7: Demonstrate multi-tenant isolation with three test cases

What this produces: Comprehensive proof that customer ID filtering prevents cross-customer data leakage in all three scenarios.

# isolation_demo.py
"""Demonstrate multi-tenant isolation with three test cases using shared search module."""
from search import search_customer_tickets

def demonstrate_isolation():
    """Three-part isolation test: valid query, cross-customer query, and explicit leak check."""
    
    query = "billing invoice payment issue"
    separator = "=" * 70
    
    # TEST CASE 1: Valid Customer A query
    print(f"\n{separator}")
    print("TEST CASE 1: Valid Customer A Query")
    print(separator)
    results_a = search_customer_tickets(query, customer_id="customer_a")
    print(f"✓ Retrieved {len(results_a)} results for Customer A\n")
    
    customers_in_a = set()
    for i, hit in enumerate(results_a):
        cust = hit.get('customer_id') or 'N/A (KB)'
        source = hit.get('ticket_id') or hit.get('article_id')
        customers_in_a.add(cust)
        print(f"{i+1}. Customer {cust} [{hit['source_type']}] {source}")
        print(f"   {hit['text'][:100]}...")
    
    print(f"\n✓ Customers in results: {customers_in_a}")
    test1_pass = customers_in_a <= {'customer_a', 'N/A (KB)'}
    print(f"✓ Isolation check: {'PASS' if test1_pass else 'FAIL'}")
    
    # TEST CASE 2: Valid Customer B query (same query, different customer)
    print(f"\n{separator}")
    print("TEST CASE 2: Valid Customer B Query (Same Question)")
    print(separator)
    results_b = search_customer_tickets(query, customer_id="customer_b")
    print(f"✓ Retrieved {len(results_b)} results for Customer B\n")
    
    customers_in_b = set()
    for i, hit in enumerate(results_b):
        cust = hit.get('customer_id') or 'N/A (KB)'
        source = hit.get('ticket_id') or hit.get('article_id')
        customers_in_b.add(cust)
        print(f"{i+1}. Customer {cust} [{hit['source_type']}] {source}")
        print(f"   {hit['text'][:100]}...")
    
    print(f"\n✓ Customers in results: {customers_in_b}")
    test2_pass = customers_in_b <= {'customer_b', 'N/A (KB)'}
    print(f"✓ Isolation check: {'PASS' if test2_pass else 'FAIL'}")
    
    # TEST CASE 3: Cross-customer leak detection
    print(f"\n{separator}")
    print("TEST CASE 3: Cross-Customer Leak Detection")
    print(separator)
    
    # Customer A queries asking about Customer B
    leak_query = "show me customer_b invoice issues and billing problems"
    results_leak = search_customer_tickets(leak_query, customer_id="customer_a")
    
    # Check if any Customer B data appears
    customer_b_leaked = any(
        r.get('customer_id') == 'customer_b' for r in results_leak
    )
    
    print(f"Query: '{leak_query}'")
    print(f"Querying as: customer_a")
    print(f"\nCustomer B data in results: {'YES - LEAK DETECTED ✗' if customer_b_leaked else 'NO ✓'}")
    
    if customer_b_leaked:
        print("\n⚠️ CRITICAL: Customer B tickets visible in Customer A query!")
        print("This indicates application-level filtering, NOT database-level enforcement.")
    else:
        print("\n✓ Metadata-filter isolation working correctly")
        print("Customer A cannot access Customer B data when filters are properly applied")
    
    test3_pass = not customer_b_leaked
    
    # Summary
    print(f"\n{separator}")
    print("ISOLATION TEST SUMMARY")
    print(separator)
    
    print(f"Test 1 (Customer A valid query): {'PASS ✓' if test1_pass else 'FAIL ✗'}")
    print(f"Test 2 (Customer B valid query): {'PASS ✓' if test2_pass else 'FAIL ✗'}")
    print(f"Test 3 (Cross-customer leak):   {'PASS ✓' if test3_pass else 'FAIL ✗'}")
    
    if test1_pass and test2_pass and test3_pass:
        print("\n✅ ALL TESTS PASSED - Multi-tenant isolation verified")
    else:
        print("\n❌ SOME TESTS FAILED - Check isolation configuration")

if __name__ == "__main__":
    demonstrate_isolation()

Run:  

uv run python isolation_demo.py

test case 2

Isolation test results

Step 8: Wire in the local LLM

What this produces: End-to-end RAG system that generates cited answers using Ollama.

# rag_system.py
"""End-to-end RAG system with local LLM using shared search module."""
import json
import urllib.request
import urllib.error
from search import search_customer_tickets

OLLAMA_URL = "http://localhost:11434/api/generate"
OLLAMA_MODEL = "llama3.2:3b"

def generate_answer(query_text, customer_id):
"""Generate a cited answer using local LLM and retrieved context.
    Args:
        query_text: User's question
        customer_id: Customer identifier for filtering
    Returns:
        Generated answer with citations or fallback context
    """
# Retrieve relevant chunks using shared search
    chunks = search_customer_tickets(query_text, customer_id, top_k=3)
# Build context with citations
    context_parts = []
for i, chunk in enumerate(chunks):
        source_id = chunk.get('ticket_id') or chunk.get('article_id')
        source_type = chunk['source_type']
        context_parts.append(f"[{source_type.upper()} {source_id}]: {chunk['text']}")
    context = "\n\n".join(context_parts)
# RAG prompt
    system_prompt = (
"You are a support assistant. Answer ONLY using the provided context. "
"Do NOT use external knowledge. Cite each fact as [TICKET T-####] or [KB article-name]. "
"If the context does not contain the answer, say 'I cannot answer from the available documents.'"
    )
    prompt = f"{system_prompt}\n\nContext:\n{context}\n\nQuestion: {query_text}\n\nAnswer:"
# Call Ollama
    payload = json.dumps({
"model": OLLAMA_MODEL,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.3, "num_predict": 400},
    }).encode()
try:
        req = urllib.request.Request(
            OLLAMA_URL,
            data=payload,
            headers={"Content-Type": "application/json"},
        )
with urllib.request.urlopen(req, timeout=30) as resp:
            data = json.loads(resp.read())
return data.get("response", "").strip()
except urllib.error.URLError as e:
return f"[Ollama unreachable: {e}]\n\nRetrieved context:\n{context}"

# Usage
if __name__ == "__main__":
    query = "How do I reset a customer password?"
    customer = "customer_a"
    print(f"Query: {query}")
    print(f"Customer: {customer}\n")
    print("Generating answer...\n")
    answer = generate_answer(query, customer_id=customer)
    print("=" * 70)
    print("ANSWER:")
    print("=" * 70)
    print(answer)
    print("=" * 70)

Run:  

uv run python rag_system.py

Step 9: Configure audit logging

What this produces: Complete audit trail of every query, logged locally with full traceability.

# audit.py
"""Audit logging for compliance using shared search module."""
import json
from pathlib import Path
from datetime import datetime, timezone
from search import search_customer_tickets

AUDIT_LOG = Path("./audit_logs/queries.jsonl")

def log_query(user_id, customer_id, query_text, results, access_denied=False):
"""Log every query for compliance audit trail.
    Args:
        user_id: Agent/user identifier
        customer_id: Customer identifier
        query_text: Search query
        results: List of search results
        access_denied: Whether access was denied (default: False)
    """
    AUDIT_LOG.parent.mkdir(parents=True, exist_ok=True)
    record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"user_id": user_id,
"customer_id": customer_id,
"query": query_text,
"results_count": len(results),
"sources": [
            {
"type": r.get('source_type'),
"id": r.get('ticket_id') or r.get('article_id'),
"score": r.get('score')
            }
for r in results
        ],
"access_denied": access_denied,
    }
with open(AUDIT_LOG, 'a', encoding='utf-8') as f:
        f.write(json.dumps(record, ensure_ascii=False) + "\n")

def query_with_audit(user_id, customer_id, query_text):
"""Execute query and log to audit trail.
    Args:
        user_id: Agent/user identifier
        customer_id: Customer identifier
        query_text: Search query
    Returns:
        List of search results
    """
    results = search_customer_tickets(query_text, customer_id)
    log_query(user_id, customer_id, query_text, results)
return results

# Example
if __name__ == "__main__":
    print("=" * 70)
    print("AUDIT LOGGING TEST")
    print("=" * 70)
    print("\nRunning sample queries with audit logging...\n")
# Query 1
    query1 = "password reset"
    results1 = search_customer_tickets(query1, "customer_a")
    log_query(
        user_id="agent_007",
        customer_id="customer_a",
        query_text=query1,
        results=results1
    )
    print(f"✓ Query 1 logged: '{query1}' (customer_a, {len(results1)} results)")
# Query 2
    query2 = "billing invoice payment"
    results2 = search_customer_tickets(query2, "customer_b")
    log_query(
        user_id="agent_008",
        customer_id="customer_b",
        query_text=query2,
        results=results2
    )
    print(f"✓ Query 2 logged: '{query2}' (customer_b, {len(results2)} results)")
    print(f"\n✓ Audit log written to: {AUDIT_LOG}")
    print("\nSample log entries:")
    print("-" * 70)
# Display last 2 entries
if AUDIT_LOG.exists():
with open(AUDIT_LOG, 'r', encoding='utf-8') as f:
            lines = f.readlines()
for line in lines[-2:]:
                entry = json.loads(line)
                print(json.dumps({
"timestamp": entry["timestamp"],
"user_id": entry["user_id"],
"customer_id": entry["customer_id"],
"query": entry["query"],
"results_count": entry["results_count"],
                }, indent=2))
                print()
    print("=" * 70)

Run:  

uv run python audit.py

audit logging test

Audit log entries with user and customer IDs

Step 10: Handle ticket updates

What this produces: Incremental ingestion of new tickets without rebuilding the entire collection.

# incremental_ingestion.py
"""Incremental ingestion - only ingest new tickets that don't exist in the database."""
from actian_vectorai import VectorAIClient, Field, FilterBuilder, PointStruct
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer
import pandas as pd
import hashlib
import os
from pii_filter import sanitize_pii  # Import from Step 2

VECTORAI_HOST = "localhost:50052"
COLLECTION = "support_data"
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
VECTOR_DIM = 384

# Initialize tokenizer for accurate token-based chunking
print(f"Loading tokenizer for: {EMBED_MODEL}")
tokenizer = AutoTokenizer.from_pretrained(EMBED_MODEL)

# Initialize embedding model
print(f"Loading embedding model: {EMBED_MODEL}")
model = SentenceTransformer(EMBED_MODEL)

def chunk_text(text, chunk_size=512, overlap=50):
    """Split text into overlapping chunks by TOKEN count (not words).
    
    Args:
        text: Text to chunk
        chunk_size: Number of tokens per chunk (default: 512)
        overlap: Number of tokens to overlap between chunks (default: 50)
        
    Returns:
        List of text chunks
    """
    # Tokenize the entire text
    tokens = tokenizer.encode(text, add_special_tokens=False)
    chunks = []
    
    # Create overlapping chunks
    for i in range(0, len(tokens), chunk_size - overlap):
        chunk_tokens = tokens[i:i + chunk_size]
        # Decode back to text
        chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
        chunks.append(chunk_text)
    
    return chunks

def embed(texts):
    return model.encode(texts, normalize_embeddings=True).tolist()

def _generate_stable_id(text: str, index: int) -> int:
    """Generate stable integer ID using SHA-256 hash."""
    hash_input = f"{text}:{index}".encode()
    hash_output = hashlib.sha256(hash_input).hexdigest()
    return int(hash_output[:15], 16)

def get_existing_ticket_ids(customer_id):
    """Query VectorAI DB for existing ticket IDs for a customer."""
    existing = set()
    
    with VectorAIClient(VECTORAI_HOST) as client:
        # Build filter for this customer's tickets
        ticket_filter = (
            FilterBuilder()
            .must(Field("customer_id").eq(customer_id))
            .must(Field("source_type").eq("ticket"))
            .build()
        )
        
        # Dummy vector for metadata-only search
        dummy_vector = [0.0] * VECTOR_DIM
        
        # Search with high limit to get all tickets
        hits = client.points.search(
            collection_name=COLLECTION,
            vector=dummy_vector,
            limit=10000,
            filter=ticket_filter
        )
        
        for hit in hits:
            ticket_id = hit.payload.get('ticket_id')
            if ticket_id:
                existing.add(ticket_id)
    
    return existing

def ingest_new_tickets(csv_path, customer_id):
    """Ingest only new tickets that don't exist in the database."""
    print(f"\nChecking for new tickets in {csv_path}...")
    df = pd.read_csv(csv_path)
    
    # Get existing ticket IDs from database
    existing_ids = get_existing_ticket_ids(customer_id)
    print(f"Found {len(existing_ids)} existing tickets for {customer_id}")
    
    # Filter to only new tickets
    new_tickets = df[~df['ticket_id'].isin(existing_ids)]
    
    if new_tickets.empty:
        print(f"✓ No new tickets for {customer_id} - all up to date!")
        return
    
    print(f"Found {len(new_tickets)} new tickets to ingest")
    
    # Ingest new tickets using same logic as main ingestion
    with VectorAIClient(VECTORAI_HOST) as client:
        points_batch = []
        total_chunks = 0
        
        for idx, row in new_tickets.iterrows():
            # Sanitize PII (imported from pii_filter.py)
            clean_text = sanitize_pii(row['ticket_text'])
            
            # Chunk by TOKENS (not words)
            chunks = chunk_text(clean_text)
            
            # Embed
            vectors = embed(chunks)
            
            # Create PointStruct objects
            for i, (chunk, vector) in enumerate(zip(chunks, vectors)):
                point_id = _generate_stable_id(row['ticket_id'], i)
                
                point = PointStruct(
                    id=point_id,
                    vector=vector,
                    payload={
                        "customer_id": customer_id,
                        "source_type": "ticket",
                        "ticket_id": row['ticket_id'],
                        "product_line": row.get('product_line', 'general'),
                        "ticket_status": row.get('status', 'closed'),
                        "created_date": row['created_date'],
                        "text": chunk,
                        "chunk_index": i,
                    }
                )
                points_batch.append(point)
            
            total_chunks += len(chunks)
        
        # Upload new points
        if points_batch:
            client.upload_points(COLLECTION, points_batch, batch_size=100)
        
        print(f"✓ Ingested {len(new_tickets)} new tickets, {total_chunks} chunks for {customer_id}")

if __name__ == "__main__":
    print("=" * 70)
    print("INCREMENTAL INGESTION - New Tickets Only (Token-Based Chunking)")
    print("=" * 70)
    
    # Check if we have new ticket files
    if os.path.exists(os.path.join("sample_tickets", "customer_a_new_tickets.csv")):
        ingest_new_tickets(
            os.path.join("sample_tickets", "customer_a_new_tickets.csv"),
            customer_id="customer_a"
        )
    else:
        print("\nNo new ticket files found.")
        print("To test incremental ingestion:")
        print("1. Create customer_a_new_tickets.csv with new tickets")
        print("2. Run this script again")
    
    print("\n" + "=" * 70)

You Just Built a Private Multi-Tenant RAG System

You built a production-ready multi-tenant RAG system powered by VectorAI DB that keeps customer PII inside your infrastructure. Multi-tenant isolation is enforced through metadata filtering. No customer data crosses your network boundary. When regulators ask where embeddings live, the answer is “our infrastructure.”

Manufacturing plants, healthcare providers, and financial services teams use this pattern to search sensitive records without cloud dependencies. The regulatory framework changes GDPR versus HIPAA versus PCI-DSS but the deployment pattern stays the same: keep embeddings local, never route data to external services.

Extend this for production with semantic routing that escalates low-confidence queries, cross-customer analytics that protect PII, feedback loops that improve retrieval quality, and incremental learning from resolved tickets

Test VectorAI DB against your data using the GitHub repo. Before production, decide whether to use RAG or fine-tune and when to choose on-premises versus cloud. Join the Actian community on Discord where platform engineers share production deployment patterns.

Your customers’ data deserves better than cloud RAG systems where the DPA covers the vendor, not the customer. SaaS vendors encrypt data in transit but their embedding models still see your customers’ PII in plaintext. You just proved you can build a compliant alternative.