Skip to content

大模型应用开发指南 - RAG与Agent技术实践

发布时间:2024-10-25
作者:AI技术研究者
标签:应用开发, RAG, Agent, 知识库, 智能助手, 应用架构

前言

如果说大模型是"大脑",那么RAG和Agent就是让这个"大脑"能够感知世界、获取知识、执行任务的"手脚"。作为一个深度参与大模型应用开发的架构师,我见证了从简单的问答系统到复杂的智能助手的演进过程。

我记得第一次实现RAG系统时的兴奋:通过检索增强,模型终于能够回答关于公司内部文档的问题了。而第一次构建Agent系统时的震撼:模型不仅能理解任务,还能自主规划、调用工具、完成复杂的多步骤任务。

从简单的文档问答到复杂的业务流程自动化,从单一的检索增强到多模态的智能交互,RAG和Agent技术正在重新定义人机交互的边界。

今天,让我们深入探讨大模型应用开发的核心技术:从RAG系统设计到Agent架构,从知识库构建到工具集成,全面解析如何构建实用的大模型应用。

RAG技术深度解析

RAG系统架构

python
class RAGSystem:
    def __init__(self, embedding_model, llm_model, vector_store):
        self.embedding_model = embedding_model
        self.llm_model = llm_model
        self.vector_store = vector_store
        self.document_processor = DocumentProcessor()
        self.retriever = AdvancedRetriever(vector_store, embedding_model)
        self.generator = ResponseGenerator(llm_model)
        
    def ingest_documents(self, documents):
        """
        文档摄取流程
        """
        processed_docs = []
        
        for doc in documents:
            # 文档预处理
            cleaned_doc = self.document_processor.clean_document(doc)
            
            # 文档分块
            chunks = self.document_processor.chunk_document(
                cleaned_doc, 
                chunk_size=512, 
                overlap=50
            )
            
            # 生成嵌入向量
            for chunk in chunks:
                embedding = self.embedding_model.encode(chunk['text'])
                chunk['embedding'] = embedding
                processed_docs.append(chunk)
        
        # 存储到向量数据库
        self.vector_store.add_documents(processed_docs)
        
        return len(processed_docs)
    
    def query(self, question, top_k=5, rerank=True):
        """
        RAG查询流程
        """
        # 1. 查询理解和重写
        refined_query = self.refine_query(question)
        
        # 2. 检索相关文档
        retrieved_docs = self.retriever.retrieve(
            refined_query, 
            top_k=top_k * 2 if rerank else top_k
        )
        
        # 3. 重排序(可选)
        if rerank:
            retrieved_docs = self.retriever.rerank(
                refined_query, 
                retrieved_docs, 
                top_k=top_k
            )
        
        # 4. 生成回答
        response = self.generator.generate_response(
            question=question,
            context_docs=retrieved_docs,
            refined_query=refined_query
        )
        
        return {
            'answer': response['answer'],
            'sources': retrieved_docs,
            'confidence': response['confidence'],
            'refined_query': refined_query
        }
    
    def refine_query(self, query):
        """
        查询优化和重写
        """
        # 查询扩展
        expanded_query = self.expand_query(query)
        
        # 查询重写
        rewritten_query = self.rewrite_query(expanded_query)
        
        return rewritten_query
    
    def expand_query(self, query):
        """
        查询扩展
        """
        # 使用同义词扩展
        synonyms = self.get_synonyms(query)
        
        # 使用相关术语扩展
        related_terms = self.get_related_terms(query)
        
        expanded = f"{query} {' '.join(synonyms)} {' '.join(related_terms)}"
        
        return expanded
    
    def rewrite_query(self, query):
        """
        查询重写
        """
        rewrite_prompt = f"""
        请将以下查询重写为更适合检索的形式,保持原意但使用更精确的关键词:
        
        原查询:{query}
        
        重写后的查询:
        """
        
        rewritten = self.llm_model.generate(rewrite_prompt, max_tokens=100)
        return rewritten.strip()
    
    def get_synonyms(self, query):
        """
        获取同义词
        """
        # 简化实现,实际应使用词典或模型
        synonym_dict = {
            '问题': ['疑问', '困惑', '难题'],
            '解决': ['处理', '解答', '应对'],
            '方法': ['方式', '途径', '手段']
        }
        
        synonyms = []
        for word in query.split():
            if word in synonym_dict:
                synonyms.extend(synonym_dict[word])
        
        return synonyms
    
    def get_related_terms(self, query):
        """
        获取相关术语
        """
        # 简化实现,实际应使用知识图谱或模型
        return []

class DocumentProcessor:
    def __init__(self):
        self.text_extractors = {
            '.pdf': self.extract_pdf_text,
            '.docx': self.extract_docx_text,
            '.txt': self.extract_txt_text,
            '.html': self.extract_html_text
        }
    
    def clean_document(self, document):
        """
        文档清洗
        """
        # 提取文本
        text = self.extract_text(document)
        
        # 清理文本
        cleaned_text = self.clean_text(text)
        
        # 提取元数据
        metadata = self.extract_metadata(document)
        
        return {
            'text': cleaned_text,
            'metadata': metadata,
            'source': document.get('source', 'unknown')
        }
    
    def extract_text(self, document):
        """
        根据文档类型提取文本
        """
        file_type = document.get('type', '.txt')
        extractor = self.text_extractors.get(file_type, self.extract_txt_text)
        
        return extractor(document['content'])
    
    def clean_text(self, text):
        """
        文本清洗
        """
        import re
        
        # 移除多余的空白字符
        text = re.sub(r'\s+', ' ', text)
        
        # 移除特殊字符
        text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:]', '', text)
        
        # 移除过短的行
        lines = text.split('\n')
        cleaned_lines = [line.strip() for line in lines if len(line.strip()) > 10]
        
        return '\n'.join(cleaned_lines)
    
    def chunk_document(self, document, chunk_size=512, overlap=50):
        """
        文档分块
        """
        text = document['text']
        metadata = document['metadata']
        
        # 按句子分割
        sentences = self.split_into_sentences(text)
        
        chunks = []
        current_chunk = ""
        current_length = 0
        
        for sentence in sentences:
            sentence_length = len(sentence)
            
            if current_length + sentence_length > chunk_size and current_chunk:
                # 创建当前块
                chunk = {
                    'text': current_chunk.strip(),
                    'metadata': metadata.copy(),
                    'chunk_id': len(chunks),
                    'source': document['source']
                }
                chunks.append(chunk)
                
                # 开始新块,保留重叠
                overlap_text = self.get_overlap_text(current_chunk, overlap)
                current_chunk = overlap_text + " " + sentence
                current_length = len(current_chunk)
            else:
                current_chunk += " " + sentence
                current_length += sentence_length
        
        # 添加最后一个块
        if current_chunk.strip():
            chunk = {
                'text': current_chunk.strip(),
                'metadata': metadata.copy(),
                'chunk_id': len(chunks),
                'source': document['source']
            }
            chunks.append(chunk)
        
        return chunks
    
    def split_into_sentences(self, text):
        """
        分句
        """
        import re
        
        # 简化的分句实现
        sentences = re.split(r'[.!?。!?]', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def get_overlap_text(self, text, overlap_chars):
        """
        获取重叠文本
        """
        if len(text) <= overlap_chars:
            return text
        
        # 从末尾开始,找到完整的词边界
        overlap_text = text[-overlap_chars:]
        space_index = overlap_text.find(' ')
        
        if space_index != -1:
            return overlap_text[space_index:].strip()
        
        return overlap_text
    
    def extract_metadata(self, document):
        """
        提取元数据
        """
        return {
            'title': document.get('title', ''),
            'author': document.get('author', ''),
            'created_date': document.get('created_date', ''),
            'file_type': document.get('type', ''),
            'file_size': len(document.get('content', ''))
        }
    
    def extract_pdf_text(self, content):
        """
        提取PDF文本
        """
        # 实际实现应使用PyPDF2或pdfplumber
        return "PDF text extraction not implemented"
    
    def extract_docx_text(self, content):
        """
        提取DOCX文本
        """
        # 实际实现应使用python-docx
        return "DOCX text extraction not implemented"
    
    def extract_txt_text(self, content):
        """
        提取TXT文本
        """
        return content
    
    def extract_html_text(self, content):
        """
        提取HTML文本
        """
        # 实际实现应使用BeautifulSoup
        import re
        return re.sub(r'<[^>]+>', '', content)

class AdvancedRetriever:
    def __init__(self, vector_store, embedding_model):
        self.vector_store = vector_store
        self.embedding_model = embedding_model
        self.reranker = CrossEncoderReranker()
    
    def retrieve(self, query, top_k=5, retrieval_strategy='hybrid'):
        """
        高级检索
        """
        if retrieval_strategy == 'semantic':
            return self.semantic_search(query, top_k)
        elif retrieval_strategy == 'keyword':
            return self.keyword_search(query, top_k)
        elif retrieval_strategy == 'hybrid':
            return self.hybrid_search(query, top_k)
        else:
            raise ValueError(f"Unknown retrieval strategy: {retrieval_strategy}")
    
    def semantic_search(self, query, top_k):
        """
        语义检索
        """
        query_embedding = self.embedding_model.encode(query)
        
        results = self.vector_store.similarity_search(
            query_embedding, 
            top_k=top_k
        )
        
        return results
    
    def keyword_search(self, query, top_k):
        """
        关键词检索
        """
        # 使用BM25或其他关键词检索算法
        keywords = self.extract_keywords(query)
        
        results = self.vector_store.keyword_search(
            keywords, 
            top_k=top_k
        )
        
        return results
    
    def hybrid_search(self, query, top_k):
        """
        混合检索
        """
        # 语义检索结果
        semantic_results = self.semantic_search(query, top_k)
        
        # 关键词检索结果
        keyword_results = self.keyword_search(query, top_k)
        
        # 合并和重排序
        combined_results = self.combine_results(
            semantic_results, 
            keyword_results, 
            semantic_weight=0.7,
            keyword_weight=0.3
        )
        
        return combined_results[:top_k]
    
    def combine_results(self, semantic_results, keyword_results, semantic_weight, keyword_weight):
        """
        合并检索结果
        """
        # 创建文档ID到分数的映射
        doc_scores = {}
        
        # 添加语义检索分数
        for i, doc in enumerate(semantic_results):
            doc_id = doc['id']
            semantic_score = 1.0 - (i / len(semantic_results))  # 归一化分数
            doc_scores[doc_id] = doc_scores.get(doc_id, 0) + semantic_score * semantic_weight
        
        # 添加关键词检索分数
        for i, doc in enumerate(keyword_results):
            doc_id = doc['id']
            keyword_score = 1.0 - (i / len(keyword_results))  # 归一化分数
            doc_scores[doc_id] = doc_scores.get(doc_id, 0) + keyword_score * keyword_weight
        
        # 按分数排序
        sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
        
        # 构建结果列表
        doc_map = {doc['id']: doc for doc in semantic_results + keyword_results}
        combined_results = []
        
        for doc_id, score in sorted_docs:
            if doc_id in doc_map:
                doc = doc_map[doc_id].copy()
                doc['combined_score'] = score
                combined_results.append(doc)
        
        return combined_results
    
    def rerank(self, query, documents, top_k):
        """
        重排序
        """
        return self.reranker.rerank(query, documents, top_k)
    
    def extract_keywords(self, query):
        """
        提取关键词
        """
        # 简化实现,实际应使用更复杂的关键词提取算法
        import re
        
        # 移除停用词
        stop_words = {'的', '是', '在', '有', '和', '与', '或', '但', '然而', '因为', '所以'}
        
        words = re.findall(r'\w+', query.lower())
        keywords = [word for word in words if word not in stop_words and len(word) > 1]
        
        return keywords

class CrossEncoderReranker:
    def __init__(self):
        # 实际应加载预训练的重排序模型
        self.model = None
    
    def rerank(self, query, documents, top_k):
        """
        使用交叉编码器重排序
        """
        if not documents:
            return documents
        
        # 计算查询-文档相关性分数
        scored_docs = []
        
        for doc in documents:
            relevance_score = self.calculate_relevance(query, doc['text'])
            doc_with_score = doc.copy()
            doc_with_score['relevance_score'] = relevance_score
            scored_docs.append(doc_with_score)
        
        # 按相关性分数排序
        scored_docs.sort(key=lambda x: x['relevance_score'], reverse=True)
        
        return scored_docs[:top_k]
    
    def calculate_relevance(self, query, document_text):
        """
        计算相关性分数
        """
        # 简化实现:基于关键词重叠
        query_words = set(query.lower().split())
        doc_words = set(document_text.lower().split())
        
        intersection = query_words.intersection(doc_words)
        union = query_words.union(doc_words)
        
        if len(union) == 0:
            return 0.0
        
        jaccard_similarity = len(intersection) / len(union)
        
        return jaccard_similarity

class ResponseGenerator:
    def __init__(self, llm_model):
        self.llm_model = llm_model
        self.prompt_templates = self.load_prompt_templates()
    
    def generate_response(self, question, context_docs, refined_query=None):
        """
        生成回答
        """
        # 构建上下文
        context = self.build_context(context_docs)
        
        # 选择合适的提示模板
        template = self.select_prompt_template(question, context_docs)
        
        # 构建提示
        prompt = template.format(
            question=question,
            context=context,
            refined_query=refined_query or question
        )
        
        # 生成回答
        response = self.llm_model.generate(
            prompt,
            max_tokens=500,
            temperature=0.1
        )
        
        # 评估回答质量
        confidence = self.evaluate_response_quality(question, response, context_docs)
        
        return {
            'answer': response,
            'confidence': confidence,
            'prompt_used': template
        }
    
    def build_context(self, documents):
        """
        构建上下文
        """
        context_parts = []
        
        for i, doc in enumerate(documents):
            source_info = f"来源: {doc.get('source', '未知')}"
            text = doc['text']
            
            context_part = f"文档 {i+1}:\n{source_info}\n{text}\n"
            context_parts.append(context_part)
        
        return "\n".join(context_parts)
    
    def select_prompt_template(self, question, context_docs):
        """
        选择提示模板
        """
        # 根据问题类型和上下文选择模板
        if self.is_factual_question(question):
            return self.prompt_templates['factual']
        elif self.is_analytical_question(question):
            return self.prompt_templates['analytical']
        elif self.is_comparison_question(question):
            return self.prompt_templates['comparison']
        else:
            return self.prompt_templates['general']
    
    def is_factual_question(self, question):
        """
        判断是否为事实性问题
        """
        factual_keywords = ['什么是', '谁是', '何时', '哪里', '多少']
        return any(keyword in question for keyword in factual_keywords)
    
    def is_analytical_question(self, question):
        """
        判断是否为分析性问题
        """
        analytical_keywords = ['为什么', '如何', '怎样', '原因', '分析']
        return any(keyword in question for keyword in analytical_keywords)
    
    def is_comparison_question(self, question):
        """
        判断是否为比较性问题
        """
        comparison_keywords = ['比较', '区别', '差异', '相同', '不同']
        return any(keyword in question for keyword in comparison_keywords)
    
    def load_prompt_templates(self):
        """
        加载提示模板
        """
        return {
            'general': """
基于以下上下文信息,请回答用户的问题。如果上下文中没有相关信息,请明确说明。

上下文:
{context}

问题:{question}

请提供准确、详细的回答:
""",
            'factual': """
基于以下上下文信息,请回答这个事实性问题。请直接提供准确的事实信息。

上下文:
{context}

问题:{question}

回答:
""",
            'analytical': """
基于以下上下文信息,请分析并回答这个问题。请提供深入的分析和解释。

上下文:
{context}

问题:{question}

分析和回答:
""",
            'comparison': """
基于以下上下文信息,请比较并回答这个问题。请突出关键的相似点和差异点。

上下文:
{context}

问题:{question}

比较分析:
"""
        }
    
    def evaluate_response_quality(self, question, response, context_docs):
        """
        评估回答质量
        """
        # 简化的质量评估
        quality_score = 0.5  # 基础分数
        
        # 检查回答长度
        if 50 <= len(response) <= 1000:
            quality_score += 0.1
        
        # 检查是否引用了上下文
        context_text = " ".join([doc['text'] for doc in context_docs])
        common_words = set(response.lower().split()) & set(context_text.lower().split())
        
        if len(common_words) > 5:
            quality_score += 0.2
        
        # 检查是否直接回答了问题
        question_words = set(question.lower().split())
        response_words = set(response.lower().split())
        
        if len(question_words & response_words) > 2:
            quality_score += 0.2
        
        return min(quality_score, 1.0)

Agent系统架构

智能Agent设计

python
class IntelligentAgent:
    def __init__(self, llm_model, tool_registry, memory_system):
        self.llm_model = llm_model
        self.tool_registry = tool_registry
        self.memory_system = memory_system
        self.planner = TaskPlanner(llm_model)
        self.executor = TaskExecutor(tool_registry)
        self.monitor = ExecutionMonitor()
        
    def process_request(self, user_request, context=None):
        """
        处理用户请求
        """
        # 1. 理解用户意图
        intent = self.understand_intent(user_request, context)
        
        # 2. 制定执行计划
        plan = self.planner.create_plan(intent, self.tool_registry.get_available_tools())
        
        # 3. 执行计划
        execution_result = self.execute_plan(plan, user_request)
        
        # 4. 生成最终回答
        final_response = self.generate_final_response(
            user_request, 
            execution_result, 
            plan
        )
        
        # 5. 更新记忆
        self.memory_system.store_interaction(
            user_request, 
            final_response, 
            plan, 
            execution_result
        )
        
        return final_response
    
    def understand_intent(self, user_request, context):
        """
        理解用户意图
        """
        intent_prompt = f"""
分析以下用户请求,识别用户的意图和所需的操作:

用户请求:{user_request}
上下文:{context or '无'}

请分析:
1. 用户的主要意图是什么?
2. 需要执行哪些具体操作?
3. 需要哪些信息或工具?
4. 预期的输出格式是什么?

分析结果:
"""
        
        intent_analysis = self.llm_model.generate(intent_prompt, max_tokens=300)
        
        return {
            'original_request': user_request,
            'intent_analysis': intent_analysis,
            'context': context,
            'timestamp': time.time()
        }
    
    def execute_plan(self, plan, original_request):
        """
        执行计划
        """
        execution_results = []
        current_context = {}
        
        for step in plan['steps']:
            try:
                # 监控执行开始
                self.monitor.start_step(step)
                
                # 执行步骤
                step_result = self.executor.execute_step(step, current_context)
                
                # 更新上下文
                current_context.update(step_result.get('context_updates', {}))
                
                # 记录结果
                execution_results.append({
                    'step': step,
                    'result': step_result,
                    'status': 'success',
                    'timestamp': time.time()
                })
                
                # 监控执行完成
                self.monitor.complete_step(step, step_result)
                
            except Exception as e:
                # 处理执行错误
                error_result = {
                    'step': step,
                    'error': str(e),
                    'status': 'failed',
                    'timestamp': time.time()
                }
                execution_results.append(error_result)
                
                # 尝试错误恢复
                recovery_action = self.handle_execution_error(step, e, current_context)
                if recovery_action:
                    execution_results.append(recovery_action)
                else:
                    break  # 无法恢复,停止执行
        
        return {
            'plan': plan,
            'execution_results': execution_results,
            'final_context': current_context,
            'overall_status': 'success' if all(r['status'] == 'success' for r in execution_results) else 'partial_failure'
        }
    
    def handle_execution_error(self, failed_step, error, context):
        """
        处理执行错误
        """
        recovery_prompt = f"""
执行步骤时发生错误:

失败的步骤:{failed_step}
错误信息:{error}
当前上下文:{context}

请分析:
1. 错误的可能原因
2. 是否可以恢复
3. 如果可以恢复,建议的恢复策略

分析和建议:
"""
        
        recovery_analysis = self.llm_model.generate(recovery_prompt, max_tokens=200)
        
        # 简化的恢复逻辑
        if "重试" in recovery_analysis:
            try:
                # 重试执行
                retry_result = self.executor.execute_step(failed_step, context)
                return {
                    'step': failed_step,
                    'result': retry_result,
                    'status': 'recovered',
                    'recovery_method': 'retry',
                    'timestamp': time.time()
                }
            except Exception:
                pass
        
        return None
    
    def generate_final_response(self, original_request, execution_result, plan):
        """
        生成最终回答
        """
        response_prompt = f"""
基于以下执行结果,为用户生成最终回答:

原始请求:{original_request}
执行计划:{plan}
执行结果:{execution_result}

请生成一个清晰、有用的回答,包括:
1. 直接回答用户的问题
2. 总结执行的主要步骤
3. 提供相关的结果和数据
4. 如果有错误,说明原因和建议

最终回答:
"""
        
        final_response = self.llm_model.generate(response_prompt, max_tokens=500)
        
        return {
            'response': final_response,
            'execution_summary': execution_result,
            'plan_used': plan,
            'timestamp': time.time()
        }

class TaskPlanner:
    def __init__(self, llm_model):
        self.llm_model = llm_model
        self.planning_strategies = {
            'sequential': self.create_sequential_plan,
            'parallel': self.create_parallel_plan,
            'conditional': self.create_conditional_plan
        }
    
    def create_plan(self, intent, available_tools):
        """
        创建执行计划
        """
        # 分析任务复杂度
        complexity = self.analyze_task_complexity(intent)
        
        # 选择规划策略
        strategy = self.select_planning_strategy(complexity, available_tools)
        
        # 创建计划
        plan = self.planning_strategies[strategy](intent, available_tools)
        
        # 验证计划
        validated_plan = self.validate_plan(plan, available_tools)
        
        return validated_plan
    
    def analyze_task_complexity(self, intent):
        """
        分析任务复杂度
        """
        complexity_prompt = f"""
分析以下任务的复杂度:

任务意图:{intent['intent_analysis']}

请评估:
1. 任务是否需要多个步骤?
2. 步骤之间是否有依赖关系?
3. 是否需要条件判断?
4. 是否可以并行执行?

复杂度评估(简单/中等/复杂):
"""
        
        complexity_analysis = self.llm_model.generate(complexity_prompt, max_tokens=200)
        
        if "复杂" in complexity_analysis:
            return "complex"
        elif "中等" in complexity_analysis:
            return "medium"
        else:
            return "simple"
    
    def select_planning_strategy(self, complexity, available_tools):
        """
        选择规划策略
        """
        if complexity == "simple":
            return "sequential"
        elif complexity == "medium":
            return "conditional"
        else:
            return "parallel"
    
    def create_sequential_plan(self, intent, available_tools):
        """
        创建顺序执行计划
        """
        planning_prompt = f"""
为以下任务创建顺序执行计划:

任务:{intent['intent_analysis']}
可用工具:{[tool['name'] for tool in available_tools]}

请创建一个步骤清单,每个步骤包括:
1. 步骤描述
2. 使用的工具
3. 输入参数
4. 预期输出

执行计划:
"""
        
        plan_text = self.llm_model.generate(planning_prompt, max_tokens=400)
        
        # 解析计划文本为结构化数据
        steps = self.parse_plan_text(plan_text, available_tools)
        
        return {
            'type': 'sequential',
            'steps': steps,
            'estimated_duration': len(steps) * 30,  # 简化估算
            'complexity': 'simple'
        }
    
    def create_parallel_plan(self, intent, available_tools):
        """
        创建并行执行计划
        """
        # 简化实现
        sequential_plan = self.create_sequential_plan(intent, available_tools)
        
        # 识别可并行的步骤
        parallel_groups = self.identify_parallel_groups(sequential_plan['steps'])
        
        return {
            'type': 'parallel',
            'parallel_groups': parallel_groups,
            'estimated_duration': max(len(group) for group in parallel_groups) * 30,
            'complexity': 'complex'
        }
    
    def create_conditional_plan(self, intent, available_tools):
        """
        创建条件执行计划
        """
        # 简化实现
        base_plan = self.create_sequential_plan(intent, available_tools)
        
        # 添加条件分支
        conditional_steps = []
        for step in base_plan['steps']:
            conditional_step = step.copy()
            conditional_step['conditions'] = self.generate_step_conditions(step)
            conditional_steps.append(conditional_step)
        
        return {
            'type': 'conditional',
            'steps': conditional_steps,
            'estimated_duration': len(conditional_steps) * 40,
            'complexity': 'medium'
        }
    
    def parse_plan_text(self, plan_text, available_tools):
        """
        解析计划文本
        """
        # 简化的解析实现
        steps = []
        lines = plan_text.split('\n')
        
        current_step = {}
        for line in lines:
            line = line.strip()
            if line.startswith('步骤') or line.startswith('Step'):
                if current_step:
                    steps.append(current_step)
                current_step = {'description': line}
            elif '工具' in line or 'Tool' in line:
                tool_name = self.extract_tool_name(line, available_tools)
                current_step['tool'] = tool_name
            elif '参数' in line or 'Parameter' in line:
                current_step['parameters'] = self.extract_parameters(line)
        
        if current_step:
            steps.append(current_step)
        
        return steps
    
    def extract_tool_name(self, line, available_tools):
        """
        提取工具名称
        """
        for tool in available_tools:
            if tool['name'] in line:
                return tool['name']
        return 'unknown'
    
    def extract_parameters(self, line):
        """
        提取参数
        """
        # 简化实现
        return {}
    
    def identify_parallel_groups(self, steps):
        """
        识别可并行的步骤组
        """
        # 简化实现:假设没有依赖的步骤可以并行
        return [steps]  # 返回单个组
    
    def generate_step_conditions(self, step):
        """
        生成步骤条件
        """
        # 简化实现
        return []
    
    def validate_plan(self, plan, available_tools):
        """
        验证计划
        """
        # 检查工具可用性
        tool_names = {tool['name'] for tool in available_tools}
        
        for step in plan.get('steps', []):
            if step.get('tool') not in tool_names:
                step['tool'] = 'fallback_tool'
        
        return plan

class TaskExecutor:
    def __init__(self, tool_registry):
        self.tool_registry = tool_registry
    
    def execute_step(self, step, context):
        """
        执行单个步骤
        """
        tool_name = step.get('tool')
        parameters = step.get('parameters', {})
        
        # 获取工具
        tool = self.tool_registry.get_tool(tool_name)
        if not tool:
            raise Exception(f"Tool {tool_name} not found")
        
        # 准备参数
        prepared_params = self.prepare_parameters(parameters, context)
        
        # 执行工具
        result = tool.execute(prepared_params)
        
        return {
            'tool_used': tool_name,
            'parameters': prepared_params,
            'result': result,
            'context_updates': self.extract_context_updates(result)
        }
    
    def prepare_parameters(self, parameters, context):
        """
        准备参数
        """
        prepared = {}
        
        for key, value in parameters.items():
            if isinstance(value, str) and value.startswith('${'):
                # 从上下文中获取值
                context_key = value[2:-1]  # 移除 ${ 和 }
                prepared[key] = context.get(context_key, value)
            else:
                prepared[key] = value
        
        return prepared
    
    def extract_context_updates(self, result):
        """
        从结果中提取上下文更新
        """
        # 简化实现
        if isinstance(result, dict):
            return {k: v for k, v in result.items() if k.startswith('context_')}
        
        return {}

class ToolRegistry:
    def __init__(self):
        self.tools = {}
        self.register_default_tools()
    
    def register_tool(self, tool):
        """
        注册工具
        """
        self.tools[tool.name] = tool
    
    def get_tool(self, name):
        """
        获取工具
        """
        return self.tools.get(name)
    
    def get_available_tools(self):
        """
        获取可用工具列表
        """
        return [
            {
                'name': tool.name,
                'description': tool.description,
                'parameters': tool.parameters
            }
            for tool in self.tools.values()
        ]
    
    def register_default_tools(self):
        """
        注册默认工具
        """
        self.register_tool(WebSearchTool())
        self.register_tool(CalculatorTool())
        self.register_tool(FileOperationTool())
        self.register_tool(DatabaseQueryTool())

class BaseTool:
    def __init__(self, name, description, parameters):
        self.name = name
        self.description = description
        self.parameters = parameters
    
    def execute(self, parameters):
        """
        执行工具
        """
        raise NotImplementedError

class WebSearchTool(BaseTool):
    def __init__(self):
        super().__init__(
            name="web_search",
            description="搜索网络信息",
            parameters={
                "query": {"type": "string", "description": "搜索查询"},
                "num_results": {"type": "integer", "description": "结果数量", "default": 5}
            }
        )
    
    def execute(self, parameters):
        """
        执行网络搜索
        """
        query = parameters.get('query', '')
        num_results = parameters.get('num_results', 5)
        
        # 模拟搜索结果
        results = [
            {
                'title': f'搜索结果 {i+1}',
                'url': f'https://example.com/result{i+1}',
                'snippet': f'关于 "{query}" 的搜索结果 {i+1}'
            }
            for i in range(num_results)
        ]
        
        return {
            'query': query,
            'results': results,
            'context_search_results': results
        }

class CalculatorTool(BaseTool):
    def __init__(self):
        super().__init__(
            name="calculator",
            description="执行数学计算",
            parameters={
                "expression": {"type": "string", "description": "数学表达式"}
            }
        )
    
    def execute(self, parameters):
        """
        执行计算
        """
        expression = parameters.get('expression', '')
        
        try:
            # 安全的表达式评估
            result = eval(expression, {"__builtins__": {}}, {})
            return {
                'expression': expression,
                'result': result,
                'context_calculation_result': result
            }
        except Exception as e:
            return {
                'expression': expression,
                'error': str(e),
                'result': None
            }

class FileOperationTool(BaseTool):
    def __init__(self):
        super().__init__(
            name="file_operation",
            description="文件操作",
            parameters={
                "operation": {"type": "string", "description": "操作类型:read/write/list"},
                "path": {"type": "string", "description": "文件路径"},
                "content": {"type": "string", "description": "写入内容(仅写操作)"}
            }
        )
    
    def execute(self, parameters):
        """
        执行文件操作
        """
        operation = parameters.get('operation', 'read')
        path = parameters.get('path', '')
        content = parameters.get('content', '')
        
        # 模拟文件操作
        if operation == 'read':
            return {
                'operation': 'read',
                'path': path,
                'content': f'模拟读取文件 {path} 的内容',
                'context_file_content': f'文件内容:{path}'
            }
        elif operation == 'write':
            return {
                'operation': 'write',
                'path': path,
                'content': content,
                'success': True,
                'context_file_written': path
            }
        elif operation == 'list':
            return {
                'operation': 'list',
                'path': path,
                'files': [f'file{i}.txt' for i in range(3)],
                'context_file_list': [f'file{i}.txt' for i in range(3)]
            }
        
        return {'error': f'Unknown operation: {operation}'}

class DatabaseQueryTool(BaseTool):
    def __init__(self):
        super().__init__(
            name="database_query",
            description="查询数据库",
            parameters={
                "query": {"type": "string", "description": "SQL查询语句"},
                "database": {"type": "string", "description": "数据库名称"}
            }
        )
    
    def execute(self, parameters):
        """
        执行数据库查询
        """
        query = parameters.get('query', '')
        database = parameters.get('database', 'default')
        
        # 模拟数据库查询结果
        results = [
            {'id': 1, 'name': '张三', 'age': 25},
            {'id': 2, 'name': '李四', 'age': 30},
            {'id': 3, 'name': '王五', 'age': 28}
        ]
        
        return {
            'query': query,
            'database': database,
            'results': results,
            'context_query_results': results
        }

总结

RAG和Agent技术代表了大模型应用开发的两个重要方向,它们让大模型从"知识库"进化为"智能助手":

RAG技术价值

  • 知识增强:让模型获得最新、专业的知识
  • 可解释性:提供信息来源,增强可信度
  • 成本效益:无需重新训练即可更新知识
  • 领域适应:快速适应特定领域和场景

Agent技术价值

  • 任务自动化:自主规划和执行复杂任务
  • 工具集成:连接外部系统和服务
  • 智能决策:基于上下文做出合理决策
  • 持续学习:从交互中学习和改进

技术实现要点

  • 文档处理:高质量的文档解析和分块
  • 检索优化:语义检索、关键词检索、混合检索
  • 生成质量:上下文构建、提示工程、质量评估
  • Agent设计:意图理解、任务规划、工具执行

关键启示

  1. 场景导向:根据具体应用场景选择合适的技术方案
  2. 质量为王:数据质量决定应用效果的上限
  3. 用户体验:技术服务于用户需求,体验是关键
  4. 持续优化:基于用户反馈不断改进系统性能
  5. 生态建设:构建完整的工具生态和知识体系

RAG和Agent技术还在快速发展,多模态RAG、自主Agent、工具学习等新方向不断涌现。掌握这些核心技术,是构建实用AI应用的基础。


相关文章推荐:

想了解更多应用开发技术,欢迎关注我们的技术博客!


🎉 大模型技术博客系列完成!

至此,我们的大模型技术博客系列已经全部完成!这个系列涵盖了大模型技术的方方面面:

📚 完整系列回顾

第一阶段:发展演变篇

  1. 大模型发展简史 - 技术演进脉络
  2. Transformer架构深度解析 - 技术基础
  3. 预训练范式演变 - 训练策略
  4. 开源vs闭源大模型对比 - 技术路线

第二阶段:技术原理篇 5. 大模型训练技术详解 - 分布式训练 6. 推理优化技术全解 - 性能优化 7. 大模型安全与对齐 - RLHF技术 8. 多模态大模型技术 - 视觉语言模型

第三阶段:生产实践篇 9. 大模型部署架构设计 - 高并发服务 10. 大模型成本优化实战 - 降本增效 11. 大模型监控与运维 - 稳定性保障 12. 大模型应用开发指南 - RAG与Agent

🎯 系列特色与价值

技术深度:每篇文章都深入技术细节,提供完整的代码实现 体系完整:从历史发展到实际应用,形成完整的知识体系 实用价值:面向实际工程需求,提供可操作的方法和经验 前沿视角:涵盖最新的技术发展和行业趋势

这个系列为AI从业者、研究人员和技术爱好者提供了全面的大模型技术指南,希望能够帮助大家更好地理解和应用大模型技术!