大模型监控与运维 - 生产环境稳定性保障
发布时间:2024-10-20
作者:AI技术研究者
标签:监控运维, 稳定性保障, 性能监控, 故障处理, DevOps, SRE
前言
如果说部署大模型是"上线",那么监控运维就是"守护"。作为一个深度参与大模型生产运维的SRE工程师,我见证了从"能跑就行"到"稳定可靠"的运维体系演进过程。
我记得第一次遇到大模型服务半夜宕机时的慌乱:GPU内存溢出、推理延迟飙升、用户投诉如雪花般飞来,而我们却对系统状态一无所知。那一夜的紧急修复让我深刻理解了监控运维的重要性。
经过几年的实践,我们构建了一套完整的大模型监控运维体系,将故障发现时间从小时级降低到分钟级,将故障恢复时间从小时级降低到分钟级,服务可用性从99%提升到99.9%以上。
今天,让我们深入探讨大模型监控运维的核心技术:从指标体系设计到告警策略,从故障诊断到自动恢复,全面解析如何构建稳定可靠的大模型服务。
监控指标体系
核心性能指标
python
class LLMMonitoringMetrics:
def __init__(self):
self.metric_categories = {
'performance_metrics': {
'latency': {
'p50_latency_ms': '50分位延迟',
'p95_latency_ms': '95分位延迟',
'p99_latency_ms': '99分位延迟',
'max_latency_ms': '最大延迟'
},
'throughput': {
'requests_per_second': '每秒请求数',
'tokens_per_second': '每秒处理token数',
'concurrent_requests': '并发请求数',
'queue_length': '请求队列长度'
},
'accuracy': {
'success_rate': '成功率',
'error_rate': '错误率',
'timeout_rate': '超时率',
'quality_score': '输出质量分数'
}
},
'resource_metrics': {
'compute': {
'gpu_utilization': 'GPU利用率',
'gpu_memory_usage': 'GPU内存使用率',
'cpu_utilization': 'CPU利用率',
'memory_usage': '内存使用率'
},
'storage': {
'disk_usage': '磁盘使用率',
'io_wait': 'IO等待时间',
'cache_hit_rate': '缓存命中率',
'cache_memory_usage': '缓存内存使用'
},
'network': {
'network_in_bytes': '网络入流量',
'network_out_bytes': '网络出流量',
'connection_count': '连接数',
'packet_loss_rate': '丢包率'
}
},
'business_metrics': {
'user_experience': {
'user_satisfaction_score': '用户满意度',
'session_duration': '会话时长',
'bounce_rate': '跳出率',
'conversion_rate': '转化率'
},
'cost': {
'cost_per_request': '每请求成本',
'cost_per_token': '每token成本',
'resource_efficiency': '资源效率',
'cost_trend': '成本趋势'
}
}
}
def define_sli_slo(self):
"""
定义SLI/SLO指标
"""
sli_slo_definitions = {
'availability': {
'sli': 'successful_requests / total_requests',
'slo': '99.9%',
'measurement_window': '30天',
'error_budget': '0.1%'
},
'latency': {
'sli': 'p95_latency_ms',
'slo': '< 500ms',
'measurement_window': '24小时',
'error_budget': '5%的请求可以超过阈值'
},
'throughput': {
'sli': 'requests_per_second',
'slo': '> 1000 RPS',
'measurement_window': '1小时',
'error_budget': '10%的时间可以低于阈值'
},
'quality': {
'sli': 'output_quality_score',
'slo': '> 0.8',
'measurement_window': '24小时',
'error_budget': '5%的输出可以低于阈值'
}
}
return sli_slo_definitions
def calculate_error_budget(self, slo_target, actual_performance, time_window):
"""
计算错误预算
"""
if slo_target.endswith('%'):
target_value = float(slo_target.rstrip('%')) / 100
error_budget = 1 - target_value
if actual_performance >= target_value:
remaining_budget = error_budget
budget_consumption = 0
else:
budget_consumption = target_value - actual_performance
remaining_budget = max(0, error_budget - budget_consumption)
else:
# 处理延迟等非百分比指标
target_value = float(slo_target.rstrip('ms'))
violations = sum(1 for perf in actual_performance if perf > target_value)
total_measurements = len(actual_performance)
violation_rate = violations / total_measurements if total_measurements > 0 else 0
error_budget = 0.05 # 5%的错误预算
budget_consumption = violation_rate
remaining_budget = max(0, error_budget - budget_consumption)
return {
'remaining_budget': remaining_budget,
'budget_consumption': budget_consumption,
'budget_consumption_rate': budget_consumption / error_budget if error_budget > 0 else 0,
'time_window': time_window,
'status': 'healthy' if remaining_budget > 0.2 * error_budget else 'at_risk'
}
class MetricsCollector:
def __init__(self, collection_interval=60):
self.collection_interval = collection_interval
self.metrics_buffer = {}
self.collectors = {
'system': SystemMetricsCollector(),
'application': ApplicationMetricsCollector(),
'business': BusinessMetricsCollector()
}
def collect_all_metrics(self):
"""
收集所有指标
"""
timestamp = time.time()
all_metrics = {'timestamp': timestamp}
for collector_name, collector in self.collectors.items():
try:
metrics = collector.collect()
all_metrics[collector_name] = metrics
except Exception as e:
print(f"Error collecting {collector_name} metrics: {e}")
all_metrics[collector_name] = {}
# 存储到缓冲区
self.metrics_buffer[timestamp] = all_metrics
# 清理旧数据(保留最近1小时)
cutoff_time = timestamp - 3600
self.metrics_buffer = {
ts: metrics for ts, metrics in self.metrics_buffer.items()
if ts > cutoff_time
}
return all_metrics
def get_metric_trend(self, metric_path, time_range_seconds=3600):
"""
获取指标趋势
"""
current_time = time.time()
start_time = current_time - time_range_seconds
trend_data = []
for timestamp, metrics in self.metrics_buffer.items():
if timestamp >= start_time:
value = self.extract_metric_value(metrics, metric_path)
if value is not None:
trend_data.append({
'timestamp': timestamp,
'value': value
})
# 计算趋势统计
if len(trend_data) >= 2:
values = [data['value'] for data in trend_data]
trend_analysis = {
'min': min(values),
'max': max(values),
'avg': sum(values) / len(values),
'current': values[-1],
'previous': values[-2] if len(values) >= 2 else values[-1],
'change_rate': (values[-1] - values[-2]) / values[-2] if len(values) >= 2 and values[-2] != 0 else 0,
'trend_direction': 'up' if values[-1] > values[-2] else 'down' if values[-1] < values[-2] else 'stable'
}
else:
trend_analysis = {'error': 'Insufficient data for trend analysis'}
return {
'data_points': trend_data,
'analysis': trend_analysis
}
def extract_metric_value(self, metrics, metric_path):
"""
从嵌套字典中提取指标值
"""
keys = metric_path.split('.')
current = metrics
try:
for key in keys:
current = current[key]
return current
except (KeyError, TypeError):
return None
class SystemMetricsCollector:
def collect(self):
"""
收集系统指标
"""
import psutil
import GPUtil
# CPU指标
cpu_metrics = {
'cpu_utilization': psutil.cpu_percent(interval=1),
'cpu_count': psutil.cpu_count(),
'load_average': psutil.getloadavg() if hasattr(psutil, 'getloadavg') else [0, 0, 0]
}
# 内存指标
memory = psutil.virtual_memory()
memory_metrics = {
'memory_total_gb': memory.total / (1024**3),
'memory_used_gb': memory.used / (1024**3),
'memory_utilization': memory.percent,
'memory_available_gb': memory.available / (1024**3)
}
# GPU指标
gpu_metrics = {'gpus': []}
try:
gpus = GPUtil.getGPUs()
for gpu in gpus:
gpu_metrics['gpus'].append({
'id': gpu.id,
'name': gpu.name,
'utilization': gpu.load * 100,
'memory_total_mb': gpu.memoryTotal,
'memory_used_mb': gpu.memoryUsed,
'memory_utilization': (gpu.memoryUsed / gpu.memoryTotal) * 100,
'temperature': gpu.temperature
})
except Exception as e:
gpu_metrics['error'] = str(e)
# 磁盘指标
disk = psutil.disk_usage('/')
disk_metrics = {
'disk_total_gb': disk.total / (1024**3),
'disk_used_gb': disk.used / (1024**3),
'disk_utilization': (disk.used / disk.total) * 100,
'disk_free_gb': disk.free / (1024**3)
}
# 网络指标
network = psutil.net_io_counters()
network_metrics = {
'network_bytes_sent': network.bytes_sent,
'network_bytes_recv': network.bytes_recv,
'network_packets_sent': network.packets_sent,
'network_packets_recv': network.packets_recv
}
return {
'cpu': cpu_metrics,
'memory': memory_metrics,
'gpu': gpu_metrics,
'disk': disk_metrics,
'network': network_metrics
}
class ApplicationMetricsCollector:
def __init__(self):
self.request_history = []
self.error_history = []
def collect(self):
"""
收集应用指标
"""
# 这些指标通常从应用程序中收集
# 这里提供示例结构
current_time = time.time()
# 请求指标
recent_requests = [
req for req in self.request_history
if current_time - req['timestamp'] <= 60
]
if recent_requests:
latencies = [req['latency'] for req in recent_requests]
request_metrics = {
'requests_per_minute': len(recent_requests),
'avg_latency_ms': sum(latencies) / len(latencies),
'p95_latency_ms': self.percentile(latencies, 95),
'p99_latency_ms': self.percentile(latencies, 99),
'max_latency_ms': max(latencies),
'min_latency_ms': min(latencies)
}
else:
request_metrics = {
'requests_per_minute': 0,
'avg_latency_ms': 0,
'p95_latency_ms': 0,
'p99_latency_ms': 0,
'max_latency_ms': 0,
'min_latency_ms': 0
}
# 错误指标
recent_errors = [
err for err in self.error_history
if current_time - err['timestamp'] <= 60
]
error_metrics = {
'errors_per_minute': len(recent_errors),
'error_rate': len(recent_errors) / len(recent_requests) if recent_requests else 0,
'error_types': self.count_error_types(recent_errors)
}
# 模型指标
model_metrics = {
'model_load_time_ms': 0, # 从实际应用获取
'model_memory_usage_gb': 0, # 从实际应用获取
'cache_hit_rate': 0.8, # 从缓存系统获取
'queue_length': 0 # 从队列系统获取
}
return {
'requests': request_metrics,
'errors': error_metrics,
'model': model_metrics
}
def percentile(self, data, percentile):
"""
计算百分位数
"""
if not data:
return 0
sorted_data = sorted(data)
index = (percentile / 100) * (len(sorted_data) - 1)
if index.is_integer():
return sorted_data[int(index)]
else:
lower = sorted_data[int(index)]
upper = sorted_data[int(index) + 1]
return lower + (upper - lower) * (index - int(index))
def count_error_types(self, errors):
"""
统计错误类型
"""
error_counts = {}
for error in errors:
error_type = error.get('type', 'unknown')
error_counts[error_type] = error_counts.get(error_type, 0) + 1
return error_counts
def record_request(self, latency_ms, success=True, error_type=None):
"""
记录请求
"""
timestamp = time.time()
self.request_history.append({
'timestamp': timestamp,
'latency': latency_ms,
'success': success
})
if not success and error_type:
self.error_history.append({
'timestamp': timestamp,
'type': error_type
})
# 清理旧数据(保留最近1小时)
cutoff_time = timestamp - 3600
self.request_history = [
req for req in self.request_history
if req['timestamp'] > cutoff_time
]
self.error_history = [
err for err in self.error_history
if err['timestamp'] > cutoff_time
]
class BusinessMetricsCollector:
def collect(self):
"""
收集业务指标
"""
# 这些指标通常从业务系统或数据库中收集
return {
'user_metrics': {
'active_users': 1000, # 从用户系统获取
'new_users': 50, # 从用户系统获取
'user_retention_rate': 0.85 # 从分析系统获取
},
'cost_metrics': {
'hourly_cost_usd': 100, # 从计费系统获取
'cost_per_request_usd': 0.01, # 计算得出
'cost_efficiency_score': 0.8 # 计算得出
},
'quality_metrics': {
'user_satisfaction_score': 4.2, # 从反馈系统获取
'output_quality_score': 0.85, # 从质量评估系统获取
'complaint_rate': 0.02 # 从客服系统获取
}
}
告警策略设计
python
class AlertingSystem:
def __init__(self):
self.alert_rules = {}
self.alert_channels = {}
self.alert_history = []
self.escalation_policies = {}
def define_alert_rules(self):
"""
定义告警规则
"""
self.alert_rules = {
'critical_alerts': {
'service_down': {
'condition': 'availability < 0.95',
'duration': '2m',
'severity': 'critical',
'description': '服务可用性低于95%',
'runbook': 'https://runbook.example.com/service-down'
},
'high_latency': {
'condition': 'p95_latency_ms > 2000',
'duration': '5m',
'severity': 'critical',
'description': '95分位延迟超过2秒',
'runbook': 'https://runbook.example.com/high-latency'
},
'gpu_memory_exhaustion': {
'condition': 'gpu_memory_utilization > 0.95',
'duration': '1m',
'severity': 'critical',
'description': 'GPU内存使用率超过95%',
'runbook': 'https://runbook.example.com/gpu-memory'
}
},
'warning_alerts': {
'moderate_latency': {
'condition': 'p95_latency_ms > 1000',
'duration': '10m',
'severity': 'warning',
'description': '95分位延迟超过1秒',
'runbook': 'https://runbook.example.com/moderate-latency'
},
'high_error_rate': {
'condition': 'error_rate > 0.05',
'duration': '5m',
'severity': 'warning',
'description': '错误率超过5%',
'runbook': 'https://runbook.example.com/high-error-rate'
},
'resource_usage_high': {
'condition': 'cpu_utilization > 0.8 OR memory_utilization > 0.8',
'duration': '15m',
'severity': 'warning',
'description': 'CPU或内存使用率超过80%',
'runbook': 'https://runbook.example.com/resource-usage'
}
},
'info_alerts': {
'deployment_started': {
'condition': 'deployment_event = true',
'duration': '0m',
'severity': 'info',
'description': '部署开始',
'runbook': 'https://runbook.example.com/deployment'
},
'scaling_event': {
'condition': 'instance_count_change > 0',
'duration': '0m',
'severity': 'info',
'description': '实例数量变化',
'runbook': 'https://runbook.example.com/scaling'
}
}
}
def evaluate_alert_conditions(self, metrics):
"""
评估告警条件
"""
triggered_alerts = []
for category, rules in self.alert_rules.items():
for rule_name, rule_config in rules.items():
if self.check_condition(rule_config['condition'], metrics):
# 检查持续时间
if self.check_duration(rule_name, rule_config['duration']):
alert = {
'rule_name': rule_name,
'category': category,
'severity': rule_config['severity'],
'description': rule_config['description'],
'runbook': rule_config['runbook'],
'timestamp': time.time(),
'metrics_snapshot': metrics
}
triggered_alerts.append(alert)
return triggered_alerts
def check_condition(self, condition, metrics):
"""
检查告警条件
"""
# 简化的条件检查实现
# 实际应用中应使用更复杂的表达式解析器
try:
# 替换条件中的指标名称为实际值
condition_with_values = condition
# 提取指标值
if 'availability' in condition:
availability = self.calculate_availability(metrics)
condition_with_values = condition_with_values.replace('availability', str(availability))
if 'p95_latency_ms' in condition:
p95_latency = metrics.get('application', {}).get('requests', {}).get('p95_latency_ms', 0)
condition_with_values = condition_with_values.replace('p95_latency_ms', str(p95_latency))
if 'gpu_memory_utilization' in condition:
gpu_memory = self.get_max_gpu_memory_utilization(metrics)
condition_with_values = condition_with_values.replace('gpu_memory_utilization', str(gpu_memory / 100))
if 'error_rate' in condition:
error_rate = metrics.get('application', {}).get('errors', {}).get('error_rate', 0)
condition_with_values = condition_with_values.replace('error_rate', str(error_rate))
if 'cpu_utilization' in condition:
cpu_util = metrics.get('system', {}).get('cpu', {}).get('cpu_utilization', 0)
condition_with_values = condition_with_values.replace('cpu_utilization', str(cpu_util / 100))
if 'memory_utilization' in condition:
mem_util = metrics.get('system', {}).get('memory', {}).get('memory_utilization', 0)
condition_with_values = condition_with_values.replace('memory_utilization', str(mem_util / 100))
# 评估条件
return eval(condition_with_values)
except Exception as e:
print(f"Error evaluating condition '{condition}': {e}")
return False
def calculate_availability(self, metrics):
"""
计算可用性
"""
requests = metrics.get('application', {}).get('requests', {})
errors = metrics.get('application', {}).get('errors', {})
total_requests = requests.get('requests_per_minute', 0)
error_count = errors.get('errors_per_minute', 0)
if total_requests == 0:
return 1.0
return (total_requests - error_count) / total_requests
def get_max_gpu_memory_utilization(self, metrics):
"""
获取最大GPU内存利用率
"""
gpus = metrics.get('system', {}).get('gpu', {}).get('gpus', [])
if not gpus:
return 0
return max(gpu.get('memory_utilization', 0) for gpu in gpus)
def check_duration(self, rule_name, duration_str):
"""
检查告警持续时间
"""
# 简化实现:这里应该跟踪每个规则的触发历史
# 并检查是否满足持续时间要求
if duration_str == '0m':
return True # 立即触发
# 解析持续时间
duration_minutes = int(duration_str.rstrip('m'))
# 检查历史记录(简化实现)
current_time = time.time()
rule_history = [
alert for alert in self.alert_history
if alert['rule_name'] == rule_name and
current_time - alert['timestamp'] <= duration_minutes * 60
]
return len(rule_history) >= duration_minutes
def send_alert(self, alert):
"""
发送告警
"""
severity = alert['severity']
# 根据严重程度选择通知渠道
if severity == 'critical':
channels = ['pagerduty', 'slack', 'email', 'sms']
elif severity == 'warning':
channels = ['slack', 'email']
else:
channels = ['slack']
for channel in channels:
try:
self.send_to_channel(channel, alert)
except Exception as e:
print(f"Failed to send alert to {channel}: {e}")
# 记录告警历史
self.alert_history.append(alert)
def send_to_channel(self, channel, alert):
"""
发送到特定通道
"""
if channel == 'slack':
self.send_slack_alert(alert)
elif channel == 'email':
self.send_email_alert(alert)
elif channel == 'pagerduty':
self.send_pagerduty_alert(alert)
elif channel == 'sms':
self.send_sms_alert(alert)
def send_slack_alert(self, alert):
"""
发送Slack告警
"""
# 实际实现应调用Slack API
message = f"""
🚨 **{alert['severity'].upper()}**: {alert['description']}
📊 **Rule**: {alert['rule_name']}
🕐 **Time**: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(alert['timestamp']))}
📖 **Runbook**: {alert['runbook']}
"""
print(f"Slack Alert: {message}")
def send_email_alert(self, alert):
"""
发送邮件告警
"""
# 实际实现应调用邮件服务
print(f"Email Alert: {alert['description']}")
def send_pagerduty_alert(self, alert):
"""
发送PagerDuty告警
"""
# 实际实现应调用PagerDuty API
print(f"PagerDuty Alert: {alert['description']}")
def send_sms_alert(self, alert):
"""
发送短信告警
"""
# 实际实现应调用短信服务
print(f"SMS Alert: {alert['description']}")
故障诊断与恢复
自动故障检测
python
class FaultDetectionSystem:
def __init__(self):
self.anomaly_detectors = {
'statistical': StatisticalAnomalyDetector(),
'ml_based': MLAnomalyDetector(),
'rule_based': RuleBasedDetector()
}
self.fault_patterns = self.load_fault_patterns()
def detect_anomalies(self, metrics_stream):
"""
检测异常
"""
anomalies = []
for detector_name, detector in self.anomaly_detectors.items():
try:
detected_anomalies = detector.detect(metrics_stream)
for anomaly in detected_anomalies:
anomaly['detector'] = detector_name
anomalies.append(anomaly)
except Exception as e:
print(f"Error in {detector_name} detector: {e}")
# 合并和去重异常
merged_anomalies = self.merge_anomalies(anomalies)
return merged_anomalies
def diagnose_fault(self, anomalies, current_metrics):
"""
故障诊断
"""
diagnosis_results = []
for anomaly in anomalies:
# 匹配故障模式
matched_patterns = self.match_fault_patterns(anomaly, current_metrics)
for pattern in matched_patterns:
diagnosis = {
'fault_type': pattern['type'],
'confidence': pattern['confidence'],
'root_cause': pattern['root_cause'],
'symptoms': pattern['symptoms'],
'recommended_actions': pattern['actions'],
'severity': pattern['severity'],
'anomaly_source': anomaly
}
diagnosis_results.append(diagnosis)
# 按置信度排序
diagnosis_results.sort(key=lambda x: x['confidence'], reverse=True)
return diagnosis_results
def load_fault_patterns(self):
"""
加载故障模式
"""
return {
'gpu_memory_leak': {
'type': 'memory_leak',
'symptoms': ['increasing_gpu_memory', 'stable_request_rate'],
'root_cause': 'GPU内存未正确释放',
'actions': ['restart_service', 'check_memory_management'],
'severity': 'high',
'confidence_threshold': 0.8
},
'model_degradation': {
'type': 'model_performance',
'symptoms': ['increasing_latency', 'stable_accuracy'],
'root_cause': '模型性能退化',
'actions': ['check_model_version', 'reload_model'],
'severity': 'medium',
'confidence_threshold': 0.7
},
'network_congestion': {
'type': 'network_issue',
'symptoms': ['high_network_latency', 'packet_loss'],
'root_cause': '网络拥塞',
'actions': ['check_network_status', 'scale_bandwidth'],
'severity': 'medium',
'confidence_threshold': 0.6
},
'resource_exhaustion': {
'type': 'resource_issue',
'symptoms': ['high_cpu_usage', 'high_memory_usage', 'increasing_latency'],
'root_cause': '资源耗尽',
'actions': ['scale_up_resources', 'optimize_resource_usage'],
'severity': 'high',
'confidence_threshold': 0.8
}
}
def match_fault_patterns(self, anomaly, current_metrics):
"""
匹配故障模式
"""
matched_patterns = []
for pattern_name, pattern in self.fault_patterns.items():
confidence = self.calculate_pattern_confidence(pattern, anomaly, current_metrics)
if confidence >= pattern['confidence_threshold']:
matched_pattern = pattern.copy()
matched_pattern['name'] = pattern_name
matched_pattern['confidence'] = confidence
matched_patterns.append(matched_pattern)
return matched_patterns
def calculate_pattern_confidence(self, pattern, anomaly, current_metrics):
"""
计算模式匹配置信度
"""
symptoms = pattern['symptoms']
matched_symptoms = 0
for symptom in symptoms:
if self.check_symptom(symptom, anomaly, current_metrics):
matched_symptoms += 1
confidence = matched_symptoms / len(symptoms) if symptoms else 0
# 根据异常的严重程度调整置信度
if anomaly.get('severity', 'medium') == 'high':
confidence *= 1.2
elif anomaly.get('severity', 'medium') == 'low':
confidence *= 0.8
return min(confidence, 1.0)
def check_symptom(self, symptom, anomaly, current_metrics):
"""
检查症状是否存在
"""
if symptom == 'increasing_gpu_memory':
gpu_metrics = current_metrics.get('system', {}).get('gpu', {})
return any(gpu.get('memory_utilization', 0) > 80 for gpu in gpu_metrics.get('gpus', []))
elif symptom == 'increasing_latency':
latency = current_metrics.get('application', {}).get('requests', {}).get('p95_latency_ms', 0)
return latency > 1000
elif symptom == 'high_cpu_usage':
cpu_util = current_metrics.get('system', {}).get('cpu', {}).get('cpu_utilization', 0)
return cpu_util > 80
elif symptom == 'high_memory_usage':
mem_util = current_metrics.get('system', {}).get('memory', {}).get('memory_utilization', 0)
return mem_util > 80
elif symptom == 'stable_request_rate':
# 检查请求率是否稳定(变化小于10%)
return True # 简化实现
elif symptom == 'packet_loss':
# 检查网络丢包
return False # 简化实现
return False
def merge_anomalies(self, anomalies):
"""
合并相似的异常
"""
# 简化实现:按时间窗口和指标类型合并
merged = {}
for anomaly in anomalies:
key = f"{anomaly.get('metric', 'unknown')}_{anomaly.get('timestamp', 0) // 300}" # 5分钟窗口
if key not in merged:
merged[key] = anomaly
else:
# 合并异常信息
merged[key]['confidence'] = max(merged[key].get('confidence', 0), anomaly.get('confidence', 0))
merged[key]['detectors'] = merged[key].get('detectors', []) + [anomaly.get('detector', 'unknown')]
return list(merged.values())
class StatisticalAnomalyDetector:
def __init__(self, window_size=100, threshold=3):
self.window_size = window_size
self.threshold = threshold
self.metric_history = {}
def detect(self, metrics_stream):
"""
基于统计的异常检测
"""
anomalies = []
for metric_name, value in self.flatten_metrics(metrics_stream).items():
if metric_name not in self.metric_history:
self.metric_history[metric_name] = []
self.metric_history[metric_name].append(value)
# 保持窗口大小
if len(self.metric_history[metric_name]) > self.window_size:
self.metric_history[metric_name] = self.metric_history[metric_name][-self.window_size:]
# 检测异常
if len(self.metric_history[metric_name]) >= 10: # 至少需要10个数据点
anomaly = self.detect_statistical_anomaly(metric_name, value)
if anomaly:
anomalies.append(anomaly)
return anomalies
def detect_statistical_anomaly(self, metric_name, current_value):
"""
检测统计异常
"""
history = self.metric_history[metric_name][:-1] # 排除当前值
if len(history) < 10:
return None
import numpy as np
mean = np.mean(history)
std = np.std(history)
if std == 0:
return None
z_score = abs(current_value - mean) / std
if z_score > self.threshold:
return {
'metric': metric_name,
'value': current_value,
'expected_range': (mean - self.threshold * std, mean + self.threshold * std),
'z_score': z_score,
'confidence': min(z_score / self.threshold, 1.0),
'timestamp': time.time(),
'type': 'statistical_anomaly'
}
return None
def flatten_metrics(self, metrics):
"""
展平嵌套的指标字典
"""
flattened = {}
def flatten_dict(d, prefix=''):
for key, value in d.items():
if isinstance(value, dict):
flatten_dict(value, f"{prefix}{key}.")
elif isinstance(value, (int, float)):
flattened[f"{prefix}{key}"] = value
flatten_dict(metrics)
return flattened
class MLAnomalyDetector:
def __init__(self):
self.models = {}
self.training_data = {}
def detect(self, metrics_stream):
"""
基于机器学习的异常检测
"""
# 简化实现:这里应该使用训练好的ML模型
# 如Isolation Forest, One-Class SVM, LSTM等
anomalies = []
# 示例:使用简单的阈值检测
flattened_metrics = self.flatten_metrics(metrics_stream)
for metric_name, value in flattened_metrics.items():
if self.is_anomalous_ml(metric_name, value):
anomalies.append({
'metric': metric_name,
'value': value,
'confidence': 0.7, # 简化的置信度
'timestamp': time.time(),
'type': 'ml_anomaly'
})
return anomalies
def is_anomalous_ml(self, metric_name, value):
"""
使用ML模型检测异常
"""
# 简化实现:实际应该使用训练好的模型
# 这里使用简单的规则作为示例
if 'latency' in metric_name and value > 2000:
return True
elif 'utilization' in metric_name and value > 95:
return True
elif 'error_rate' in metric_name and value > 0.1:
return True
return False
def flatten_metrics(self, metrics):
"""
展平指标字典
"""
flattened = {}
def flatten_dict(d, prefix=''):
for key, value in d.items():
if isinstance(value, dict):
flatten_dict(value, f"{prefix}{key}.")
elif isinstance(value, (int, float)):
flattened[f"{prefix}{key}"] = value
flatten_dict(metrics)
return flattened
class RuleBasedDetector:
def __init__(self):
self.rules = self.load_detection_rules()
def detect(self, metrics_stream):
"""
基于规则的异常检测
"""
anomalies = []
for rule_name, rule in self.rules.items():
if self.evaluate_rule(rule, metrics_stream):
anomalies.append({
'rule': rule_name,
'description': rule['description'],
'confidence': rule['confidence'],
'timestamp': time.time(),
'type': 'rule_based_anomaly'
})
return anomalies
def load_detection_rules(self):
"""
加载检测规则
"""
return {
'high_latency_spike': {
'condition': 'p95_latency_ms > 3000',
'description': '延迟突然飙升',
'confidence': 0.9
},
'memory_leak_pattern': {
'condition': 'memory_utilization > 90 AND memory_trend = "increasing"',
'description': '疑似内存泄漏',
'confidence': 0.8
},
'error_burst': {
'condition': 'error_rate > 0.2',
'description': '错误率突增',
'confidence': 0.9
}
}
def evaluate_rule(self, rule, metrics):
"""
评估规则条件
"""
# 简化的规则评估实现
condition = rule['condition']
try:
# 提取指标值并评估条件
if 'p95_latency_ms' in condition:
p95_latency = metrics.get('application', {}).get('requests', {}).get('p95_latency_ms', 0)
condition = condition.replace('p95_latency_ms', str(p95_latency))
if 'memory_utilization' in condition:
memory_util = metrics.get('system', {}).get('memory', {}).get('memory_utilization', 0)
condition = condition.replace('memory_utilization', str(memory_util))
if 'error_rate' in condition:
error_rate = metrics.get('application', {}).get('errors', {}).get('error_rate', 0)
condition = condition.replace('error_rate', str(error_rate))
# 简化的条件评估
return eval(condition.replace('AND', ' and ').replace('OR', ' or '))
except Exception as e:
print(f"Error evaluating rule condition: {e}")
return False
总结
大模型监控运维是保障生产环境稳定性的关键技术体系,需要从多个维度构建完整的保障机制:
✅ 监控体系建设:
- 指标体系:性能、资源、业务多维度监控
- SLI/SLO:明确的服务水平指标和目标
- 告警策略:分级告警、智能降噪、及时通知
- 可观测性:日志、指标、链路追踪三位一体
✅ 故障处理机制:
- 自动检测:统计、机器学习、规则多种检测方法
- 快速诊断:故障模式匹配、根因分析
- 自动恢复:故障自愈、服务降级、流量切换
- 事后复盘:故障总结、流程改进、预防措施
✅ 运维自动化:
- 部署自动化:CI/CD流水线、蓝绿部署、金丝雀发布
- 扩容自动化:基于负载的自动扩缩容
- 故障自愈:自动重启、服务迁移、资源调度
- 配置管理:版本控制、环境一致性、变更追踪
关键启示:
- 预防胜于治疗:通过监控预警提前发现问题
- 自动化是趋势:减少人工干预,提高响应速度
- 数据驱动决策:基于监控数据进行运维决策
- 持续改进:通过故障复盘不断完善运维体系
- 团队协作:开发、运维、业务团队紧密配合
大模型监控运维技术还在快速发展,AIOps、智能运维、云原生运维等新技术不断涌现。构建高效、智能、可靠的运维体系,是保障大模型服务稳定运行的基础。
相关文章推荐:
想了解更多监控运维技术,欢迎关注后续文章!