Case Study: End-to-End RAG Platform for Customer Support

Case Study: End-to-End RAG Platform for Customer Support

Simor Consulting | 05 Dec, 2025 | 05 Mins read

A SaaS company with 200 support agents and 10,000+ knowledge base articles had an 18-hour average response time and 23% first-contact resolution. Their largest enterprise client threatened to cancel a $2M contract. The support team couldn’t find information fast enough, answers varied between agents, and the knowledge base had become a graveyard of outdated information no one trusted.

Building a RAG system that actually works in production required solving hard problems: intelligent document processing, hybrid retrieval, contextual generation, and continuous learning from feedback.

The Problem Space

Customer support at scale creates specific challenges:

Information Overload:

  • 10,000+ help articles across 12 products
  • 50,000+ resolved support tickets
  • 2,000+ pages of internal documentation
  • 500+ product release notes
  • 100+ training videos with transcripts

Consistency Crisis: Different agents gave different answers. A study found 40% accuracy variation between agents on the same queries.

Speed vs. Quality: Agents faced an impossible choice—find the perfect answer (hurting response time) or give quick but potentially wrong answers.

Knowledge Decay: An estimated 30% of the knowledge base contained incorrect or obsolete information.

Architecture

This diagram requires JavaScript.

Enable JavaScript in your browser to use this feature.

Data Pipeline

Intelligent Document Processing

class IntelligentDocumentProcessor:
    def __init__(self, config):
        self.config = config
        self.processors = {
            'html': HTMLProcessor(),
            'pdf': PDFProcessor(),
            'docx': DocxProcessor(),
            'video': VideoTranscriptProcessor(),
            'structured': StructuredDataProcessor()
        }
        self.quality_checker = DocumentQualityChecker()

    def process_document(self, document):
        """Process document with type-specific handling"""

        doc_type = self.detect_document_type(document)
        metadata = self.extract_metadata(document)

        if not self.should_process(document, metadata):
            return None

        processor = self.processors[doc_type]
        processed_content = processor.process(document)

        structure = self.extract_semantic_structure(processed_content)

        quality_score = self.quality_checker.assess(processed_content)
        if quality_score < self.config['min_quality_score']:
            processed_content = self.enhance_content(processed_content)

        entities = self.extract_entities(processed_content)
        concepts = self.extract_concepts(processed_content)

        enhanced_metadata = {
            **metadata,
            'doc_type': doc_type,
            'quality_score': quality_score,
            'entities': entities,
            'concepts': concepts,
            'structure': structure,
            'processing_timestamp': datetime.now(),
            'content_hash': self.generate_content_hash(processed_content)
        }

        return {
            'content': processed_content,
            'metadata': enhanced_metadata,
            'chunks': self.intelligent_chunking(processed_content, structure)
        }

    def intelligent_chunking(self, content, structure):
        """Chunk content while preserving semantic boundaries"""

        chunks = []

        sections = self.identify_sections(content, structure)

        for section in sections:
            if len(section['content']) > self.config['max_chunk_size']:
                sub_chunks = self.semantic_split(
                    section['content'],
                    max_size=self.config['max_chunk_size'],
                    overlap=self.config['chunk_overlap']
                )

                for i, sub_chunk in enumerate(sub_chunks):
                    chunks.append({
                        'content': sub_chunk,
                        'metadata': {
                            **section['metadata'],
                            'chunk_index': i,
                            'total_chunks': len(sub_chunks),
                            'parent_section': section['title']
                        }
                    })
            else:
                chunks.append(section)

        chunks = self.add_chunk_context(chunks)

        return chunks

    def add_chunk_context(self, chunks):
        """Add surrounding context to each chunk"""

        enhanced_chunks = []

        for i, chunk in enumerate(chunks):
            prev_context = ""
            if i > 0:
                prev_context = self.summarize_chunk(chunks[i-1])

            next_context = ""
            if i < len(chunks) - 1:
                next_context = self.summarize_chunk(chunks[i+1])

            enhanced_chunks.append({
                **chunk,
                'context': {
                    'previous': prev_context,
                    'next': next_context,
                    'document_summary': self.generate_document_summary(chunks),
                    'position': f"{i+1}/{len(chunks)}"
                }
            })

        return enhanced_chunks

Multi-Modal Embedding

class MultiModalEmbeddingPipeline:
    def __init__(self):
        self.text_encoder = self.load_text_encoder()
        self.table_encoder = self.load_table_encoder()
        self.code_encoder = self.load_code_encoder()
        self.image_encoder = self.load_image_encoder()

    def generate_embeddings(self, chunk):
        """Generate multi-faceted embeddings for chunk"""

        embeddings = {}

        embeddings['text'] = self.text_encoder.encode(chunk['content'])

        if 'title' in chunk['metadata']:
            embeddings['title'] = self.text_encoder.encode(chunk['metadata']['title'])

        if self.contains_table(chunk):
            table_data = self.extract_table(chunk)
            embeddings['table'] = self.table_encoder.encode(table_data)

        if self.contains_code(chunk):
            code_blocks = self.extract_code(chunk)
            embeddings['code'] = self.code_encoder.encode(code_blocks)

        if 'images' in chunk:
            embeddings['images'] = [
                self.image_encoder.encode(img)
                for img in chunk['images']
            ]

        if 'context' in chunk:
            embeddings['context'] = self.generate_context_embedding(chunk['context'])

        if 'concepts' in chunk['metadata']:
            embeddings['concepts'] = self.encode_concepts(chunk['metadata']['concepts'])

        combined_embedding = self.weighted_combination(embeddings)

        return {
            'primary': combined_embedding,
            'secondary': embeddings,
            'metadata': {
                'embedding_version': self.version,
                'timestamp': datetime.now(),
                'dimensions': {k: len(v) for k, v in embeddings.items()}
            }
        }

    def weighted_combination(self, embeddings):
        """Combine multiple embeddings with learned weights"""

        weights = {
            'text': 0.5,
            'title': 0.2,
            'context': 0.15,
            'concepts': 0.15
        }

        combined = np.zeros_like(embeddings['text'])

        for emb_type, embedding in embeddings.items():
            if emb_type in weights and isinstance(embedding, np.ndarray):
                weight = weights[emb_type]
                normalized = embedding / np.linalg.norm(embedding)
                combined += weight * normalized

        combined = combined / np.linalg.norm(combined)

        return combined

Retrieval System

class HybridSearchEngine:
    def __init__(self, vector_store, keyword_store, graph_store):
        self.vector_store = vector_store
        self.keyword_store = keyword_store
        self.graph_store = graph_store
        self.query_analyzer = QueryAnalyzer()
        self.result_merger = ResultMerger()

    def search(self, query, filters=None, top_k=20):
        """Perform hybrid search across multiple indexes"""

        query_analysis = self.query_analyzer.analyze(query)

        search_tasks = []

        if query_analysis['use_vector_search']:
            search_tasks.append(
                self.vector_search(
                    query,
                    filters,
                    top_k=top_k * 2
                )
            )

        if query_analysis['has_keywords']:
            search_tasks.append(
                self.keyword_search(
                    query_analysis['keywords'],
                    filters,
                    top_k=top_k
                )
            )

        if query_analysis['use_graph_search']:
            search_tasks.append(
                self.graph_search(
                    query_analysis['entities'],
                    query_analysis['concepts'],
                    filters,
                    top_k=top_k
                )
            )

        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = list(executor.map(lambda task: task, search_tasks))

        merged_results = self.result_merger.merge(
            results,
            strategy=query_analysis['merge_strategy']
        )

        reranked_results = self.rerank_results(
            merged_results,
            query,
            query_analysis
        )

        return reranked_results[:top_k]

    def vector_search(self, query, filters, top_k):
        """Semantic similarity search"""

        query_embedding = self.generate_query_embedding(query)

        results = self.vector_store.search(
            query_embedding,
            filter=self.build_vector_filter(filters),
            top_k=top_k
        )

        for result in results:
            result['explanation'] = self.explain_similarity(
                query_embedding,
                result['embedding'],
                result['content']
            )

        return results

    def keyword_search(self, keywords, filters, top_k):
        """BM25-based keyword search"""

        keyword_query = self.build_keyword_query(keywords)

        results = self.keyword_store.search(
            query=keyword_query,
            filters=filters,
            boost_fields={
                'title': 2.0,
                'keywords': 1.5,
                'content': 1.0
            },
            top_k=top_k
        )

        return results

    def graph_search(self, entities, concepts, filters, top_k):
        """Knowledge graph traversal search"""

        start_nodes = self.graph_store.find_nodes(entities + concepts)

        subgraph = self.graph_store.traverse(
            start_nodes,
            max_depth=3,
            relationship_types=['related_to', 'part_of', 'depends_on'],
            filters=filters
        )

        doc_scores = self.calculate_document_centrality(subgraph)

        results = []
        for doc_id, score in sorted(doc_scores.items(),
                                   key=lambda x: x[1],
                                   reverse=True)[:top_k]:
            doc = self.get_document(doc_id)
            results.append({
                'document': doc,
                'score': score,
                'path': self.get_reasoning_path(start_nodes, doc_id, subgraph)
            })

        return results

Intelligent Reranking

class IntelligentReranker:
    def __init__(self, model_path):
        self.rerank_model = self.load_rerank_model(model_path)
        self.feature_extractor = FeatureExtractor()

    def rerank_results(self, results, query, query_analysis):
        """Rerank results using learned ranking model"""

        reranked = []

        for result in results:
            features = self.extract_ranking_features(
                query,
                result,
                query_analysis
            )

            rerank_score = self.rerank_model.predict(features)

            final_score = self.combine_scores(
                original_score=result.get('score', 0),
                rerank_score=rerank_score,
                weight=0.7
            )

            reranked.append({
                **result,
                'final_score': final_score,
                'rerank_features': features,
                'rerank_explanation': self.explain_reranking(features)
            })

        reranked.sort(key=lambda x: x['final_score'], reverse=True)

        if query_analysis.get('needs_diversity', False):
            reranked = self.inject_diversity(reranked)

        return reranked

    def extract_ranking_features(self, query, result, query_analysis):
        """Extract features for ranking decision"""

        features = {}

        features['exact_match_score'] = self.calculate_exact_matches(query, result)
        features['fuzzy_match_score'] = self.calculate_fuzzy_matches(query, result)
        features['semantic_similarity'] = result.get('score', 0)

        features['title_match'] = self.title_relevance(query, result)
        features['keyword_coverage'] = self.keyword_coverage(
            query_analysis['keywords'],
            result
        )

        features['content_age_days'] = self.calculate_age(result)
        features['last_updated_days'] = self.calculate_last_update(result)

        features['document_quality_score'] = result['metadata'].get('quality_score', 0)
        features['source_authority'] = self.get_source_authority(result)

        features['product_match'] = self.check_product_relevance(
            query_analysis.get('product'),
            result
        )
        features['query_type_match'] = self.check_query_type_match(
            query_analysis['query_type'],
            result['metadata'].get('content_type')
        )

        features['historical_ctr'] = self.get_historical_ctr(result)
        features['agent_ratings'] = self.get_agent_ratings(result)

        return features

Generation Layer

Context-Aware Prompt Engineering

class ContextAwarePromptBuilder:
    def __init__(self, templates_path):
        self.templates = self.load_templates(templates_path)
        self.context_analyzer = ContextAnalyzer()

    def build_prompt(self, query, retrieved_docs, conversation_history=None):
        """Build optimized prompt for response generation"""

        context = self.context_analyzer.analyze(
            query,
            retrieved_docs,
            conversation_history
        )

        template = self.select_template(context)

        doc_context = self.build_document_context(
            retrieved_docs,
            max_tokens=self.calculate_context_budget(context)
        )

        conv_context = self.build_conversation_context(
            conversation_history,
            max_tokens=1000
        )

        prompt = template.format(
            query=query,
            document_context=doc_context,
            conversation_context=conv_context,
            instructions=self.get_contextual_instructions(context),
            constraints=self.get_constraints(context),
            output_format=self.get_output_format(context)
        )

        if context.get('needs_examples', False):
            prompt = self.add_few_shot_examples(prompt, context)

        return {
            'prompt': prompt,
            'metadata': {
                'template_used': template.name,
                'context_tokens': self.count_tokens(doc_context),
                'total_tokens': self.count_tokens(prompt),
                'expected_response_type': context['response_type']
            }
        }

    def get_contextual_instructions(self, context):
        """Generate context-specific instructions"""

        instructions = ["Base your response on the provided documentation."]

        if context['query_type'] == 'troubleshooting':
            instructions.append("Provide step-by-step troubleshooting instructions.")
            instructions.append("Consider multiple potential causes.")

        elif context['query_type'] == 'how_to':
            instructions.append("Provide clear, actionable steps.")
            instructions.append("Include any prerequisites or warnings.")

        elif context['query_type'] == 'conceptual':
            instructions.append("Explain the concept clearly.")
            instructions.append("Use examples where helpful.")

        if context.get('technical_level') == 'beginner':
            instructions.append("Use simple, non-technical language.")
            instructions.append("Avoid jargon or explain technical terms.")

        elif context.get('technical_level') == 'expert':
            instructions.append("Provide technical details and advanced options.")

        if context.get('needs_citations', True):
            instructions.append("Cite specific documents using [Source: title] format.")

        return "\n".join(instructions)

Response Generation with Guardrails

class GuardedResponseGenerator:
    def __init__(self, llm_gateway, safety_config):
        self.llm = llm_gateway
        self.safety_checker = SafetyChecker(safety_config)
        self.fact_checker = FactChecker()
        self.citation_engine = CitationEngine()

    def generate_response(self, prompt, retrieved_docs):
        """Generate response with multiple safety checks"""

        prompt_safety = self.safety_checker.check_prompt(prompt)
        if not prompt_safety['safe']:
            return self.generate_safety_fallback(prompt_safety)

        raw_response = self.llm.generate(
            prompt,
            temperature=0.3,
            max_tokens=1000,
            stop_sequences=["</response>"]
        )

        response_safety = self.safety_checker.check_response(raw_response)
        if not response_safety['safe']:
            raw_response = self.sanitize_response(raw_response, response_safety)

        fact_check_results = self.fact_checker.verify_claims(
            raw_response,
            retrieved_docs
        )

        if fact_check_results['accuracy'] < 0.9:
            raw_response = self.correct_inaccuracies(
                raw_response,
                fact_check_results
            )

        cited_response = self.citation_engine.add_citations(
            raw_response,
            retrieved_docs,
            fact_check_results
        )

        final_response = self.format_response(cited_response)

        qa_results = self.quality_assurance(final_response, prompt)

        return {
            'response': final_response,
            'metadata': {
                'safety_checks': response_safety,
                'fact_accuracy': fact_check_results['accuracy'],
                'citations_added': len(cited_response['citations']),
                'qa_score': qa_results['score'],
                'confidence': self.calculate_confidence(qa_results)
            }
        }

Results

This diagram requires JavaScript.

Enable JavaScript in your browser to use this feature.

Business Impact:

  • $4.2M annual savings from improved efficiency
  • Retained $2M enterprise client, grew account by 50%
  • 87% of agents reported RAG made their job easier
  • Handled 3x ticket volume without adding agents

Technical Achievements:

  • 92% precision at k=5 for retrieval
  • 89% of responses rated helpful by customers
  • P95 < 2 seconds end-to-end latency
  • 99.95% availability

Decision Rules

Build production RAG when:

  • Search/answer quality directly impacts business metrics
  • Knowledge base is too large for manual searching
  • Consistency across agents is required
  • Feedback can be systematically collected and applied

Key principles:

  • Data quality is everything: invest 40% of development time in data processing
  • Retrieval quality matters more than generation quality
  • Build feedback loops from day one
  • Human-in-the-loop often beats full automation
  • Deploy incrementally to catch issues early

Ready to Implement These AI Data Engineering Solutions?

Get a comprehensive AI Readiness Assessment to determine the best approach for your organization's data infrastructure and AI implementation needs.

Similar Articles

How a retailer reduced inference latency 90% with feature store caching
How a retailer reduced inference latency 90% with feature store caching
21 Apr, 2026 | 04 Mins read

A mid-market e-commerce retailer with roughly $200M in annual revenue had invested eighteen months building a product recommendation engine. The models were accurate. Offline evaluation showed meaning

The data pipeline that cost $50K/month — and the audit that found why
The data pipeline that cost $50K/month — and the audit that found why
22 Apr, 2026 | 04 Mins read

A financial services firm running analytics on trade settlement data came to us with a specific complaint: their cloud data platform cost had tripled in eighteen months, and nobody could explain why.

Retrieval-Augmented Generation at Scale: Designing the RAG Pipeline
Retrieval-Augmented Generation at Scale: Designing the RAG Pipeline
17 Apr, 2025 | 07 Mins read

Large language models suffer from a critical flaw: their knowledge is frozen at training time, encoded implicitly in billions of parameters, and prone to confident fabrication. This limitation becomes

Case Study: Building a Production AI Knowledge Layer for Financial Services
Case Study: Building a Production AI Knowledge Layer for Financial Services
01 Mar, 2026 | 10 Mins read

A regional bank's investment research team spent 60% of their time gathering information and 40% doing analysis. Analysts had to search through regulatory filings, internal research memos, market data