大模型应用开发指南 - RAG与Agent技术实践
发布时间:2024-10-25
作者:AI技术研究者
标签:应用开发, RAG, Agent, 知识库, 智能助手, 应用架构
前言
如果说大模型是"大脑",那么RAG和Agent就是让这个"大脑"能够感知世界、获取知识、执行任务的"手脚"。作为一个深度参与大模型应用开发的架构师,我见证了从简单的问答系统到复杂的智能助手的演进过程。
我记得第一次实现RAG系统时的兴奋:通过检索增强,模型终于能够回答关于公司内部文档的问题了。而第一次构建Agent系统时的震撼:模型不仅能理解任务,还能自主规划、调用工具、完成复杂的多步骤任务。
从简单的文档问答到复杂的业务流程自动化,从单一的检索增强到多模态的智能交互,RAG和Agent技术正在重新定义人机交互的边界。
今天,让我们深入探讨大模型应用开发的核心技术:从RAG系统设计到Agent架构,从知识库构建到工具集成,全面解析如何构建实用的大模型应用。
RAG技术深度解析
RAG系统架构
python
class RAGSystem:
def __init__(self, embedding_model, llm_model, vector_store):
self.embedding_model = embedding_model
self.llm_model = llm_model
self.vector_store = vector_store
self.document_processor = DocumentProcessor()
self.retriever = AdvancedRetriever(vector_store, embedding_model)
self.generator = ResponseGenerator(llm_model)
def ingest_documents(self, documents):
"""
文档摄取流程
"""
processed_docs = []
for doc in documents:
# 文档预处理
cleaned_doc = self.document_processor.clean_document(doc)
# 文档分块
chunks = self.document_processor.chunk_document(
cleaned_doc,
chunk_size=512,
overlap=50
)
# 生成嵌入向量
for chunk in chunks:
embedding = self.embedding_model.encode(chunk['text'])
chunk['embedding'] = embedding
processed_docs.append(chunk)
# 存储到向量数据库
self.vector_store.add_documents(processed_docs)
return len(processed_docs)
def query(self, question, top_k=5, rerank=True):
"""
RAG查询流程
"""
# 1. 查询理解和重写
refined_query = self.refine_query(question)
# 2. 检索相关文档
retrieved_docs = self.retriever.retrieve(
refined_query,
top_k=top_k * 2 if rerank else top_k
)
# 3. 重排序(可选)
if rerank:
retrieved_docs = self.retriever.rerank(
refined_query,
retrieved_docs,
top_k=top_k
)
# 4. 生成回答
response = self.generator.generate_response(
question=question,
context_docs=retrieved_docs,
refined_query=refined_query
)
return {
'answer': response['answer'],
'sources': retrieved_docs,
'confidence': response['confidence'],
'refined_query': refined_query
}
def refine_query(self, query):
"""
查询优化和重写
"""
# 查询扩展
expanded_query = self.expand_query(query)
# 查询重写
rewritten_query = self.rewrite_query(expanded_query)
return rewritten_query
def expand_query(self, query):
"""
查询扩展
"""
# 使用同义词扩展
synonyms = self.get_synonyms(query)
# 使用相关术语扩展
related_terms = self.get_related_terms(query)
expanded = f"{query} {' '.join(synonyms)} {' '.join(related_terms)}"
return expanded
def rewrite_query(self, query):
"""
查询重写
"""
rewrite_prompt = f"""
请将以下查询重写为更适合检索的形式,保持原意但使用更精确的关键词:
原查询:{query}
重写后的查询:
"""
rewritten = self.llm_model.generate(rewrite_prompt, max_tokens=100)
return rewritten.strip()
def get_synonyms(self, query):
"""
获取同义词
"""
# 简化实现,实际应使用词典或模型
synonym_dict = {
'问题': ['疑问', '困惑', '难题'],
'解决': ['处理', '解答', '应对'],
'方法': ['方式', '途径', '手段']
}
synonyms = []
for word in query.split():
if word in synonym_dict:
synonyms.extend(synonym_dict[word])
return synonyms
def get_related_terms(self, query):
"""
获取相关术语
"""
# 简化实现,实际应使用知识图谱或模型
return []
class DocumentProcessor:
def __init__(self):
self.text_extractors = {
'.pdf': self.extract_pdf_text,
'.docx': self.extract_docx_text,
'.txt': self.extract_txt_text,
'.html': self.extract_html_text
}
def clean_document(self, document):
"""
文档清洗
"""
# 提取文本
text = self.extract_text(document)
# 清理文本
cleaned_text = self.clean_text(text)
# 提取元数据
metadata = self.extract_metadata(document)
return {
'text': cleaned_text,
'metadata': metadata,
'source': document.get('source', 'unknown')
}
def extract_text(self, document):
"""
根据文档类型提取文本
"""
file_type = document.get('type', '.txt')
extractor = self.text_extractors.get(file_type, self.extract_txt_text)
return extractor(document['content'])
def clean_text(self, text):
"""
文本清洗
"""
import re
# 移除多余的空白字符
text = re.sub(r'\s+', ' ', text)
# 移除特殊字符
text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:]', '', text)
# 移除过短的行
lines = text.split('\n')
cleaned_lines = [line.strip() for line in lines if len(line.strip()) > 10]
return '\n'.join(cleaned_lines)
def chunk_document(self, document, chunk_size=512, overlap=50):
"""
文档分块
"""
text = document['text']
metadata = document['metadata']
# 按句子分割
sentences = self.split_into_sentences(text)
chunks = []
current_chunk = ""
current_length = 0
for sentence in sentences:
sentence_length = len(sentence)
if current_length + sentence_length > chunk_size and current_chunk:
# 创建当前块
chunk = {
'text': current_chunk.strip(),
'metadata': metadata.copy(),
'chunk_id': len(chunks),
'source': document['source']
}
chunks.append(chunk)
# 开始新块,保留重叠
overlap_text = self.get_overlap_text(current_chunk, overlap)
current_chunk = overlap_text + " " + sentence
current_length = len(current_chunk)
else:
current_chunk += " " + sentence
current_length += sentence_length
# 添加最后一个块
if current_chunk.strip():
chunk = {
'text': current_chunk.strip(),
'metadata': metadata.copy(),
'chunk_id': len(chunks),
'source': document['source']
}
chunks.append(chunk)
return chunks
def split_into_sentences(self, text):
"""
分句
"""
import re
# 简化的分句实现
sentences = re.split(r'[.!?。!?]', text)
return [s.strip() for s in sentences if s.strip()]
def get_overlap_text(self, text, overlap_chars):
"""
获取重叠文本
"""
if len(text) <= overlap_chars:
return text
# 从末尾开始,找到完整的词边界
overlap_text = text[-overlap_chars:]
space_index = overlap_text.find(' ')
if space_index != -1:
return overlap_text[space_index:].strip()
return overlap_text
def extract_metadata(self, document):
"""
提取元数据
"""
return {
'title': document.get('title', ''),
'author': document.get('author', ''),
'created_date': document.get('created_date', ''),
'file_type': document.get('type', ''),
'file_size': len(document.get('content', ''))
}
def extract_pdf_text(self, content):
"""
提取PDF文本
"""
# 实际实现应使用PyPDF2或pdfplumber
return "PDF text extraction not implemented"
def extract_docx_text(self, content):
"""
提取DOCX文本
"""
# 实际实现应使用python-docx
return "DOCX text extraction not implemented"
def extract_txt_text(self, content):
"""
提取TXT文本
"""
return content
def extract_html_text(self, content):
"""
提取HTML文本
"""
# 实际实现应使用BeautifulSoup
import re
return re.sub(r'<[^>]+>', '', content)
class AdvancedRetriever:
def __init__(self, vector_store, embedding_model):
self.vector_store = vector_store
self.embedding_model = embedding_model
self.reranker = CrossEncoderReranker()
def retrieve(self, query, top_k=5, retrieval_strategy='hybrid'):
"""
高级检索
"""
if retrieval_strategy == 'semantic':
return self.semantic_search(query, top_k)
elif retrieval_strategy == 'keyword':
return self.keyword_search(query, top_k)
elif retrieval_strategy == 'hybrid':
return self.hybrid_search(query, top_k)
else:
raise ValueError(f"Unknown retrieval strategy: {retrieval_strategy}")
def semantic_search(self, query, top_k):
"""
语义检索
"""
query_embedding = self.embedding_model.encode(query)
results = self.vector_store.similarity_search(
query_embedding,
top_k=top_k
)
return results
def keyword_search(self, query, top_k):
"""
关键词检索
"""
# 使用BM25或其他关键词检索算法
keywords = self.extract_keywords(query)
results = self.vector_store.keyword_search(
keywords,
top_k=top_k
)
return results
def hybrid_search(self, query, top_k):
"""
混合检索
"""
# 语义检索结果
semantic_results = self.semantic_search(query, top_k)
# 关键词检索结果
keyword_results = self.keyword_search(query, top_k)
# 合并和重排序
combined_results = self.combine_results(
semantic_results,
keyword_results,
semantic_weight=0.7,
keyword_weight=0.3
)
return combined_results[:top_k]
def combine_results(self, semantic_results, keyword_results, semantic_weight, keyword_weight):
"""
合并检索结果
"""
# 创建文档ID到分数的映射
doc_scores = {}
# 添加语义检索分数
for i, doc in enumerate(semantic_results):
doc_id = doc['id']
semantic_score = 1.0 - (i / len(semantic_results)) # 归一化分数
doc_scores[doc_id] = doc_scores.get(doc_id, 0) + semantic_score * semantic_weight
# 添加关键词检索分数
for i, doc in enumerate(keyword_results):
doc_id = doc['id']
keyword_score = 1.0 - (i / len(keyword_results)) # 归一化分数
doc_scores[doc_id] = doc_scores.get(doc_id, 0) + keyword_score * keyword_weight
# 按分数排序
sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
# 构建结果列表
doc_map = {doc['id']: doc for doc in semantic_results + keyword_results}
combined_results = []
for doc_id, score in sorted_docs:
if doc_id in doc_map:
doc = doc_map[doc_id].copy()
doc['combined_score'] = score
combined_results.append(doc)
return combined_results
def rerank(self, query, documents, top_k):
"""
重排序
"""
return self.reranker.rerank(query, documents, top_k)
def extract_keywords(self, query):
"""
提取关键词
"""
# 简化实现,实际应使用更复杂的关键词提取算法
import re
# 移除停用词
stop_words = {'的', '是', '在', '有', '和', '与', '或', '但', '然而', '因为', '所以'}
words = re.findall(r'\w+', query.lower())
keywords = [word for word in words if word not in stop_words and len(word) > 1]
return keywords
class CrossEncoderReranker:
def __init__(self):
# 实际应加载预训练的重排序模型
self.model = None
def rerank(self, query, documents, top_k):
"""
使用交叉编码器重排序
"""
if not documents:
return documents
# 计算查询-文档相关性分数
scored_docs = []
for doc in documents:
relevance_score = self.calculate_relevance(query, doc['text'])
doc_with_score = doc.copy()
doc_with_score['relevance_score'] = relevance_score
scored_docs.append(doc_with_score)
# 按相关性分数排序
scored_docs.sort(key=lambda x: x['relevance_score'], reverse=True)
return scored_docs[:top_k]
def calculate_relevance(self, query, document_text):
"""
计算相关性分数
"""
# 简化实现:基于关键词重叠
query_words = set(query.lower().split())
doc_words = set(document_text.lower().split())
intersection = query_words.intersection(doc_words)
union = query_words.union(doc_words)
if len(union) == 0:
return 0.0
jaccard_similarity = len(intersection) / len(union)
return jaccard_similarity
class ResponseGenerator:
def __init__(self, llm_model):
self.llm_model = llm_model
self.prompt_templates = self.load_prompt_templates()
def generate_response(self, question, context_docs, refined_query=None):
"""
生成回答
"""
# 构建上下文
context = self.build_context(context_docs)
# 选择合适的提示模板
template = self.select_prompt_template(question, context_docs)
# 构建提示
prompt = template.format(
question=question,
context=context,
refined_query=refined_query or question
)
# 生成回答
response = self.llm_model.generate(
prompt,
max_tokens=500,
temperature=0.1
)
# 评估回答质量
confidence = self.evaluate_response_quality(question, response, context_docs)
return {
'answer': response,
'confidence': confidence,
'prompt_used': template
}
def build_context(self, documents):
"""
构建上下文
"""
context_parts = []
for i, doc in enumerate(documents):
source_info = f"来源: {doc.get('source', '未知')}"
text = doc['text']
context_part = f"文档 {i+1}:\n{source_info}\n{text}\n"
context_parts.append(context_part)
return "\n".join(context_parts)
def select_prompt_template(self, question, context_docs):
"""
选择提示模板
"""
# 根据问题类型和上下文选择模板
if self.is_factual_question(question):
return self.prompt_templates['factual']
elif self.is_analytical_question(question):
return self.prompt_templates['analytical']
elif self.is_comparison_question(question):
return self.prompt_templates['comparison']
else:
return self.prompt_templates['general']
def is_factual_question(self, question):
"""
判断是否为事实性问题
"""
factual_keywords = ['什么是', '谁是', '何时', '哪里', '多少']
return any(keyword in question for keyword in factual_keywords)
def is_analytical_question(self, question):
"""
判断是否为分析性问题
"""
analytical_keywords = ['为什么', '如何', '怎样', '原因', '分析']
return any(keyword in question for keyword in analytical_keywords)
def is_comparison_question(self, question):
"""
判断是否为比较性问题
"""
comparison_keywords = ['比较', '区别', '差异', '相同', '不同']
return any(keyword in question for keyword in comparison_keywords)
def load_prompt_templates(self):
"""
加载提示模板
"""
return {
'general': """
基于以下上下文信息,请回答用户的问题。如果上下文中没有相关信息,请明确说明。
上下文:
{context}
问题:{question}
请提供准确、详细的回答:
""",
'factual': """
基于以下上下文信息,请回答这个事实性问题。请直接提供准确的事实信息。
上下文:
{context}
问题:{question}
回答:
""",
'analytical': """
基于以下上下文信息,请分析并回答这个问题。请提供深入的分析和解释。
上下文:
{context}
问题:{question}
分析和回答:
""",
'comparison': """
基于以下上下文信息,请比较并回答这个问题。请突出关键的相似点和差异点。
上下文:
{context}
问题:{question}
比较分析:
"""
}
def evaluate_response_quality(self, question, response, context_docs):
"""
评估回答质量
"""
# 简化的质量评估
quality_score = 0.5 # 基础分数
# 检查回答长度
if 50 <= len(response) <= 1000:
quality_score += 0.1
# 检查是否引用了上下文
context_text = " ".join([doc['text'] for doc in context_docs])
common_words = set(response.lower().split()) & set(context_text.lower().split())
if len(common_words) > 5:
quality_score += 0.2
# 检查是否直接回答了问题
question_words = set(question.lower().split())
response_words = set(response.lower().split())
if len(question_words & response_words) > 2:
quality_score += 0.2
return min(quality_score, 1.0)
Agent系统架构
智能Agent设计
python
class IntelligentAgent:
def __init__(self, llm_model, tool_registry, memory_system):
self.llm_model = llm_model
self.tool_registry = tool_registry
self.memory_system = memory_system
self.planner = TaskPlanner(llm_model)
self.executor = TaskExecutor(tool_registry)
self.monitor = ExecutionMonitor()
def process_request(self, user_request, context=None):
"""
处理用户请求
"""
# 1. 理解用户意图
intent = self.understand_intent(user_request, context)
# 2. 制定执行计划
plan = self.planner.create_plan(intent, self.tool_registry.get_available_tools())
# 3. 执行计划
execution_result = self.execute_plan(plan, user_request)
# 4. 生成最终回答
final_response = self.generate_final_response(
user_request,
execution_result,
plan
)
# 5. 更新记忆
self.memory_system.store_interaction(
user_request,
final_response,
plan,
execution_result
)
return final_response
def understand_intent(self, user_request, context):
"""
理解用户意图
"""
intent_prompt = f"""
分析以下用户请求,识别用户的意图和所需的操作:
用户请求:{user_request}
上下文:{context or '无'}
请分析:
1. 用户的主要意图是什么?
2. 需要执行哪些具体操作?
3. 需要哪些信息或工具?
4. 预期的输出格式是什么?
分析结果:
"""
intent_analysis = self.llm_model.generate(intent_prompt, max_tokens=300)
return {
'original_request': user_request,
'intent_analysis': intent_analysis,
'context': context,
'timestamp': time.time()
}
def execute_plan(self, plan, original_request):
"""
执行计划
"""
execution_results = []
current_context = {}
for step in plan['steps']:
try:
# 监控执行开始
self.monitor.start_step(step)
# 执行步骤
step_result = self.executor.execute_step(step, current_context)
# 更新上下文
current_context.update(step_result.get('context_updates', {}))
# 记录结果
execution_results.append({
'step': step,
'result': step_result,
'status': 'success',
'timestamp': time.time()
})
# 监控执行完成
self.monitor.complete_step(step, step_result)
except Exception as e:
# 处理执行错误
error_result = {
'step': step,
'error': str(e),
'status': 'failed',
'timestamp': time.time()
}
execution_results.append(error_result)
# 尝试错误恢复
recovery_action = self.handle_execution_error(step, e, current_context)
if recovery_action:
execution_results.append(recovery_action)
else:
break # 无法恢复,停止执行
return {
'plan': plan,
'execution_results': execution_results,
'final_context': current_context,
'overall_status': 'success' if all(r['status'] == 'success' for r in execution_results) else 'partial_failure'
}
def handle_execution_error(self, failed_step, error, context):
"""
处理执行错误
"""
recovery_prompt = f"""
执行步骤时发生错误:
失败的步骤:{failed_step}
错误信息:{error}
当前上下文:{context}
请分析:
1. 错误的可能原因
2. 是否可以恢复
3. 如果可以恢复,建议的恢复策略
分析和建议:
"""
recovery_analysis = self.llm_model.generate(recovery_prompt, max_tokens=200)
# 简化的恢复逻辑
if "重试" in recovery_analysis:
try:
# 重试执行
retry_result = self.executor.execute_step(failed_step, context)
return {
'step': failed_step,
'result': retry_result,
'status': 'recovered',
'recovery_method': 'retry',
'timestamp': time.time()
}
except Exception:
pass
return None
def generate_final_response(self, original_request, execution_result, plan):
"""
生成最终回答
"""
response_prompt = f"""
基于以下执行结果,为用户生成最终回答:
原始请求:{original_request}
执行计划:{plan}
执行结果:{execution_result}
请生成一个清晰、有用的回答,包括:
1. 直接回答用户的问题
2. 总结执行的主要步骤
3. 提供相关的结果和数据
4. 如果有错误,说明原因和建议
最终回答:
"""
final_response = self.llm_model.generate(response_prompt, max_tokens=500)
return {
'response': final_response,
'execution_summary': execution_result,
'plan_used': plan,
'timestamp': time.time()
}
class TaskPlanner:
def __init__(self, llm_model):
self.llm_model = llm_model
self.planning_strategies = {
'sequential': self.create_sequential_plan,
'parallel': self.create_parallel_plan,
'conditional': self.create_conditional_plan
}
def create_plan(self, intent, available_tools):
"""
创建执行计划
"""
# 分析任务复杂度
complexity = self.analyze_task_complexity(intent)
# 选择规划策略
strategy = self.select_planning_strategy(complexity, available_tools)
# 创建计划
plan = self.planning_strategies[strategy](intent, available_tools)
# 验证计划
validated_plan = self.validate_plan(plan, available_tools)
return validated_plan
def analyze_task_complexity(self, intent):
"""
分析任务复杂度
"""
complexity_prompt = f"""
分析以下任务的复杂度:
任务意图:{intent['intent_analysis']}
请评估:
1. 任务是否需要多个步骤?
2. 步骤之间是否有依赖关系?
3. 是否需要条件判断?
4. 是否可以并行执行?
复杂度评估(简单/中等/复杂):
"""
complexity_analysis = self.llm_model.generate(complexity_prompt, max_tokens=200)
if "复杂" in complexity_analysis:
return "complex"
elif "中等" in complexity_analysis:
return "medium"
else:
return "simple"
def select_planning_strategy(self, complexity, available_tools):
"""
选择规划策略
"""
if complexity == "simple":
return "sequential"
elif complexity == "medium":
return "conditional"
else:
return "parallel"
def create_sequential_plan(self, intent, available_tools):
"""
创建顺序执行计划
"""
planning_prompt = f"""
为以下任务创建顺序执行计划:
任务:{intent['intent_analysis']}
可用工具:{[tool['name'] for tool in available_tools]}
请创建一个步骤清单,每个步骤包括:
1. 步骤描述
2. 使用的工具
3. 输入参数
4. 预期输出
执行计划:
"""
plan_text = self.llm_model.generate(planning_prompt, max_tokens=400)
# 解析计划文本为结构化数据
steps = self.parse_plan_text(plan_text, available_tools)
return {
'type': 'sequential',
'steps': steps,
'estimated_duration': len(steps) * 30, # 简化估算
'complexity': 'simple'
}
def create_parallel_plan(self, intent, available_tools):
"""
创建并行执行计划
"""
# 简化实现
sequential_plan = self.create_sequential_plan(intent, available_tools)
# 识别可并行的步骤
parallel_groups = self.identify_parallel_groups(sequential_plan['steps'])
return {
'type': 'parallel',
'parallel_groups': parallel_groups,
'estimated_duration': max(len(group) for group in parallel_groups) * 30,
'complexity': 'complex'
}
def create_conditional_plan(self, intent, available_tools):
"""
创建条件执行计划
"""
# 简化实现
base_plan = self.create_sequential_plan(intent, available_tools)
# 添加条件分支
conditional_steps = []
for step in base_plan['steps']:
conditional_step = step.copy()
conditional_step['conditions'] = self.generate_step_conditions(step)
conditional_steps.append(conditional_step)
return {
'type': 'conditional',
'steps': conditional_steps,
'estimated_duration': len(conditional_steps) * 40,
'complexity': 'medium'
}
def parse_plan_text(self, plan_text, available_tools):
"""
解析计划文本
"""
# 简化的解析实现
steps = []
lines = plan_text.split('\n')
current_step = {}
for line in lines:
line = line.strip()
if line.startswith('步骤') or line.startswith('Step'):
if current_step:
steps.append(current_step)
current_step = {'description': line}
elif '工具' in line or 'Tool' in line:
tool_name = self.extract_tool_name(line, available_tools)
current_step['tool'] = tool_name
elif '参数' in line or 'Parameter' in line:
current_step['parameters'] = self.extract_parameters(line)
if current_step:
steps.append(current_step)
return steps
def extract_tool_name(self, line, available_tools):
"""
提取工具名称
"""
for tool in available_tools:
if tool['name'] in line:
return tool['name']
return 'unknown'
def extract_parameters(self, line):
"""
提取参数
"""
# 简化实现
return {}
def identify_parallel_groups(self, steps):
"""
识别可并行的步骤组
"""
# 简化实现:假设没有依赖的步骤可以并行
return [steps] # 返回单个组
def generate_step_conditions(self, step):
"""
生成步骤条件
"""
# 简化实现
return []
def validate_plan(self, plan, available_tools):
"""
验证计划
"""
# 检查工具可用性
tool_names = {tool['name'] for tool in available_tools}
for step in plan.get('steps', []):
if step.get('tool') not in tool_names:
step['tool'] = 'fallback_tool'
return plan
class TaskExecutor:
def __init__(self, tool_registry):
self.tool_registry = tool_registry
def execute_step(self, step, context):
"""
执行单个步骤
"""
tool_name = step.get('tool')
parameters = step.get('parameters', {})
# 获取工具
tool = self.tool_registry.get_tool(tool_name)
if not tool:
raise Exception(f"Tool {tool_name} not found")
# 准备参数
prepared_params = self.prepare_parameters(parameters, context)
# 执行工具
result = tool.execute(prepared_params)
return {
'tool_used': tool_name,
'parameters': prepared_params,
'result': result,
'context_updates': self.extract_context_updates(result)
}
def prepare_parameters(self, parameters, context):
"""
准备参数
"""
prepared = {}
for key, value in parameters.items():
if isinstance(value, str) and value.startswith('${'):
# 从上下文中获取值
context_key = value[2:-1] # 移除 ${ 和 }
prepared[key] = context.get(context_key, value)
else:
prepared[key] = value
return prepared
def extract_context_updates(self, result):
"""
从结果中提取上下文更新
"""
# 简化实现
if isinstance(result, dict):
return {k: v for k, v in result.items() if k.startswith('context_')}
return {}
class ToolRegistry:
def __init__(self):
self.tools = {}
self.register_default_tools()
def register_tool(self, tool):
"""
注册工具
"""
self.tools[tool.name] = tool
def get_tool(self, name):
"""
获取工具
"""
return self.tools.get(name)
def get_available_tools(self):
"""
获取可用工具列表
"""
return [
{
'name': tool.name,
'description': tool.description,
'parameters': tool.parameters
}
for tool in self.tools.values()
]
def register_default_tools(self):
"""
注册默认工具
"""
self.register_tool(WebSearchTool())
self.register_tool(CalculatorTool())
self.register_tool(FileOperationTool())
self.register_tool(DatabaseQueryTool())
class BaseTool:
def __init__(self, name, description, parameters):
self.name = name
self.description = description
self.parameters = parameters
def execute(self, parameters):
"""
执行工具
"""
raise NotImplementedError
class WebSearchTool(BaseTool):
def __init__(self):
super().__init__(
name="web_search",
description="搜索网络信息",
parameters={
"query": {"type": "string", "description": "搜索查询"},
"num_results": {"type": "integer", "description": "结果数量", "default": 5}
}
)
def execute(self, parameters):
"""
执行网络搜索
"""
query = parameters.get('query', '')
num_results = parameters.get('num_results', 5)
# 模拟搜索结果
results = [
{
'title': f'搜索结果 {i+1}',
'url': f'https://example.com/result{i+1}',
'snippet': f'关于 "{query}" 的搜索结果 {i+1}'
}
for i in range(num_results)
]
return {
'query': query,
'results': results,
'context_search_results': results
}
class CalculatorTool(BaseTool):
def __init__(self):
super().__init__(
name="calculator",
description="执行数学计算",
parameters={
"expression": {"type": "string", "description": "数学表达式"}
}
)
def execute(self, parameters):
"""
执行计算
"""
expression = parameters.get('expression', '')
try:
# 安全的表达式评估
result = eval(expression, {"__builtins__": {}}, {})
return {
'expression': expression,
'result': result,
'context_calculation_result': result
}
except Exception as e:
return {
'expression': expression,
'error': str(e),
'result': None
}
class FileOperationTool(BaseTool):
def __init__(self):
super().__init__(
name="file_operation",
description="文件操作",
parameters={
"operation": {"type": "string", "description": "操作类型:read/write/list"},
"path": {"type": "string", "description": "文件路径"},
"content": {"type": "string", "description": "写入内容(仅写操作)"}
}
)
def execute(self, parameters):
"""
执行文件操作
"""
operation = parameters.get('operation', 'read')
path = parameters.get('path', '')
content = parameters.get('content', '')
# 模拟文件操作
if operation == 'read':
return {
'operation': 'read',
'path': path,
'content': f'模拟读取文件 {path} 的内容',
'context_file_content': f'文件内容:{path}'
}
elif operation == 'write':
return {
'operation': 'write',
'path': path,
'content': content,
'success': True,
'context_file_written': path
}
elif operation == 'list':
return {
'operation': 'list',
'path': path,
'files': [f'file{i}.txt' for i in range(3)],
'context_file_list': [f'file{i}.txt' for i in range(3)]
}
return {'error': f'Unknown operation: {operation}'}
class DatabaseQueryTool(BaseTool):
def __init__(self):
super().__init__(
name="database_query",
description="查询数据库",
parameters={
"query": {"type": "string", "description": "SQL查询语句"},
"database": {"type": "string", "description": "数据库名称"}
}
)
def execute(self, parameters):
"""
执行数据库查询
"""
query = parameters.get('query', '')
database = parameters.get('database', 'default')
# 模拟数据库查询结果
results = [
{'id': 1, 'name': '张三', 'age': 25},
{'id': 2, 'name': '李四', 'age': 30},
{'id': 3, 'name': '王五', 'age': 28}
]
return {
'query': query,
'database': database,
'results': results,
'context_query_results': results
}
总结
RAG和Agent技术代表了大模型应用开发的两个重要方向,它们让大模型从"知识库"进化为"智能助手":
✅ RAG技术价值:
- 知识增强:让模型获得最新、专业的知识
- 可解释性:提供信息来源,增强可信度
- 成本效益:无需重新训练即可更新知识
- 领域适应:快速适应特定领域和场景
✅ Agent技术价值:
- 任务自动化:自主规划和执行复杂任务
- 工具集成:连接外部系统和服务
- 智能决策:基于上下文做出合理决策
- 持续学习:从交互中学习和改进
✅ 技术实现要点:
- 文档处理:高质量的文档解析和分块
- 检索优化:语义检索、关键词检索、混合检索
- 生成质量:上下文构建、提示工程、质量评估
- Agent设计:意图理解、任务规划、工具执行
关键启示:
- 场景导向:根据具体应用场景选择合适的技术方案
- 质量为王:数据质量决定应用效果的上限
- 用户体验:技术服务于用户需求,体验是关键
- 持续优化:基于用户反馈不断改进系统性能
- 生态建设:构建完整的工具生态和知识体系
RAG和Agent技术还在快速发展,多模态RAG、自主Agent、工具学习等新方向不断涌现。掌握这些核心技术,是构建实用AI应用的基础。
相关文章推荐:
想了解更多应用开发技术,欢迎关注我们的技术博客!
🎉 大模型技术博客系列完成!
至此,我们的大模型技术博客系列已经全部完成!这个系列涵盖了大模型技术的方方面面:
📚 完整系列回顾
第一阶段:发展演变篇
- 大模型发展简史 - 技术演进脉络
- Transformer架构深度解析 - 技术基础
- 预训练范式演变 - 训练策略
- 开源vs闭源大模型对比 - 技术路线
第二阶段:技术原理篇 5. 大模型训练技术详解 - 分布式训练 6. 推理优化技术全解 - 性能优化 7. 大模型安全与对齐 - RLHF技术 8. 多模态大模型技术 - 视觉语言模型
第三阶段:生产实践篇 9. 大模型部署架构设计 - 高并发服务 10. 大模型成本优化实战 - 降本增效 11. 大模型监控与运维 - 稳定性保障 12. 大模型应用开发指南 - RAG与Agent
🎯 系列特色与价值
技术深度:每篇文章都深入技术细节,提供完整的代码实现 体系完整:从历史发展到实际应用,形成完整的知识体系 实用价值:面向实际工程需求,提供可操作的方法和经验 前沿视角:涵盖最新的技术发展和行业趋势
这个系列为AI从业者、研究人员和技术爱好者提供了全面的大模型技术指南,希望能够帮助大家更好地理解和应用大模型技术!