logo

Python精准监控:获取系统性能参数并写入文件全攻略

作者:沙与沫2025.09.25 23:05浏览量:1

简介:本文详细介绍如何使用Python获取系统CPU、内存、磁盘等基础性能参数,并将数据结构化写入文件,适用于系统监控、性能分析等场景。提供跨平台解决方案及代码示例,帮助开发者快速实现自动化数据采集。

Python精准监控:获取系统性能参数并写入文件全攻略

一、系统性能监控的核心价值

云计算、大数据和物联网快速发展的今天,系统性能监控已成为保障业务稳定运行的关键环节。通过实时采集CPU使用率、内存占用、磁盘I/O等基础性能参数,运维人员可以:

  1. 提前发现系统瓶颈,避免服务中断
  2. 优化资源配置,提升系统整体效率
  3. 建立性能基准,为容量规划提供数据支持
  4. 快速定位故障根源,缩短MTTR(平均修复时间)

Python凭借其丰富的标准库和第三方生态,成为实现系统监控的理想工具。本文将详细介绍如何使用Python获取系统基础性能参数,并将数据结构化写入文件。

二、核心性能参数采集实现

2.1 CPU使用率采集

CPU是系统的核心资源,其使用情况直接反映系统负载。在Python中,可以通过以下方式获取:

  1. import psutil
  2. def get_cpu_info():
  3. """获取CPU详细信息"""
  4. cpu_info = {
  5. 'physical_cores': psutil.cpu_count(logical=False),
  6. 'logical_cores': psutil.cpu_count(logical=True),
  7. 'freq_current': psutil.cpu_freq().current, # 当前频率(MHz)
  8. 'freq_max': psutil.cpu_freq().max, # 最大频率(MHz)
  9. 'usage_percent': psutil.cpu_percent(interval=1), # 1秒内的平均使用率
  10. 'per_cpu_usage': psutil.cpu_percent(interval=1, percpu=True) # 各核心使用率
  11. }
  12. return cpu_info

关键点说明

  • psutil.cpu_count()区分物理核心和逻辑核心
  • cpu_freq()获取动态频率信息(适用于支持调频的CPU)
  • cpu_percent()建议设置interval参数获取更准确的值
  • 对于多核系统,建议同时采集总体使用率和各核心使用率

2.2 内存使用情况采集

内存是影响系统性能的另一关键因素,采集内容包括:

  1. def get_memory_info():
  2. """获取内存使用情况"""
  3. mem = psutil.virtual_memory()
  4. swap = psutil.swap_memory()
  5. memory_info = {
  6. 'total': mem.total, # 总内存(字节)
  7. 'available': mem.available, # 可用内存(字节)
  8. 'used': mem.used, # 已用内存(字节)
  9. 'free': mem.free, # 空闲内存(字节)
  10. 'percent': mem.percent, # 使用百分比
  11. 'swap_total': swap.total, # 交换分区总大小
  12. 'swap_used': swap.used, # 交换分区已用大小
  13. 'swap_percent': swap.percent # 交换分区使用百分比
  14. }
  15. # 转换为GB单位便于阅读
  16. for key in ['total', 'available', 'used', 'free', 'swap_total', 'swap_used']:
  17. memory_info[key] = round(memory_info[key] / (1024**3), 2)
  18. return memory_info

数据解读技巧

  • 关注available而非free,前者包含缓存和缓冲区可回收内存
  • 交换分区使用率持续高于20%可能预示内存不足
  • 结合usedpercent判断内存压力

2.3 磁盘I/O性能采集

磁盘性能直接影响系统响应速度,需要采集:

  1. def get_disk_info():
  2. """获取磁盘使用情况和I/O统计"""
  3. disk_info = {}
  4. # 磁盘分区使用情况
  5. partitions = psutil.disk_partitions()
  6. for partition in partitions:
  7. if 'cdrom' in partition.opts or partition.fstype == '':
  8. continue
  9. usage = psutil.disk_usage(partition.mountpoint)
  10. disk_info[partition.device] = {
  11. 'mountpoint': partition.mountpoint,
  12. 'fstype': partition.fstype,
  13. 'total': round(usage.total / (1024**3), 2),
  14. 'used': round(usage.used / (1024**3), 2),
  15. 'free': round(usage.free / (1024**3), 2),
  16. 'percent': usage.percent
  17. }
  18. # 磁盘I/O统计(需要psutil 5.6.0+)
  19. try:
  20. io_counters = psutil.disk_io_counters(perdisk=True)
  21. for disk, io in io_counters.items():
  22. disk_info[disk]['io'] = {
  23. 'read_count': io.read_count,
  24. 'write_count': io.write_count,
  25. 'read_bytes': io.read_bytes,
  26. 'write_bytes': io.write_bytes,
  27. 'read_time': io.read_time, # 读取耗时(ms)
  28. 'write_time': io.write_time # 写入耗时(ms)
  29. }
  30. except AttributeError:
  31. print("当前psutil版本不支持perdisk I/O统计")
  32. return disk_info

优化建议

  • 排除光盘驱动器和未识别文件系统
  • 重点关注根分区和数据库所在分区
  • I/O统计建议采集间隔不少于5秒以获得有意义的数据

三、数据结构化与文件写入

3.1 JSON格式写入

JSON因其可读性和跨语言支持,成为首选数据格式:

  1. import json
  2. from datetime import datetime
  3. def write_to_json(data, filename='system_metrics.json'):
  4. """将系统指标写入JSON文件"""
  5. timestamp = datetime.now().isoformat()
  6. output = {
  7. 'timestamp': timestamp,
  8. 'metrics': data
  9. }
  10. with open(filename, 'a', encoding='utf-8') as f:
  11. json.dump(output, f, ensure_ascii=False, indent=4)
  12. f.write('\n') # 每次写入后添加换行符

实现要点

  • 使用append模式实现持续记录
  • 添加时间戳便于后续分析
  • ensure_ascii=False支持中文
  • indent参数提高可读性

3.2 CSV格式写入

对于需要表格化处理的数据,CSV是更好的选择:

  1. import csv
  2. def write_to_csv(data, filename='system_metrics.csv'):
  3. """将系统指标写入CSV文件"""
  4. timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  5. # 准备CSV行数据
  6. cpu_data = data.get('cpu', {})
  7. mem_data = data.get('memory', {})
  8. disk_data = list(data.get('disk', {}).values())[0] if data.get('disk') else {}
  9. row = [
  10. timestamp,
  11. cpu_data.get('usage_percent', 0),
  12. mem_data.get('percent', 0),
  13. disk_data.get('percent', 0) if disk_data else 0
  14. ]
  15. # 写入文件(首次写入需要写入表头)
  16. write_header = not os.path.exists(filename)
  17. with open(filename, 'a', newline='', encoding='utf-8') as f:
  18. writer = csv.writer(f)
  19. if write_header:
  20. writer.writerow(['timestamp', 'cpu_usage(%)', 'mem_usage(%)', 'disk_usage(%)'])
  21. writer.writerow(row)

优化实践

  • 检查文件是否存在决定是否写入表头
  • 使用newline=''避免Windows下的空行问题
  • 选择性写入关键指标简化分析

3.3 数据库存储方案

对于长期监控需求,建议考虑数据库存储:

  1. import sqlite3
  2. from contextlib import closing
  3. def init_db(db_file='system_metrics.db'):
  4. """初始化SQLite数据库"""
  5. with closing(sqlite3.connect(db_file)) as conn:
  6. cursor = conn.cursor()
  7. cursor.execute('''
  8. CREATE TABLE IF NOT EXISTS metrics (
  9. id INTEGER PRIMARY KEY AUTOINCREMENT,
  10. timestamp DATETIME NOT NULL,
  11. cpu_usage REAL,
  12. mem_usage REAL,
  13. disk_usage REAL,
  14. cpu_cores INTEGER,
  15. mem_total REAL,
  16. disk_total REAL
  17. )
  18. ''')
  19. conn.commit()
  20. def save_to_db(data, db_file='system_metrics.db'):
  21. """将指标数据保存到SQLite数据库"""
  22. cpu_data = data.get('cpu', {})
  23. mem_data = data.get('memory', {})
  24. disk_data = list(data.get('disk', {}).values())[0] if data.get('disk') else {}
  25. with closing(sqlite3.connect(db_file)) as conn:
  26. cursor = conn.cursor()
  27. cursor.execute('''
  28. INSERT INTO metrics
  29. (timestamp, cpu_usage, mem_usage, disk_usage,
  30. cpu_cores, mem_total, disk_total)
  31. VALUES (?, ?, ?, ?, ?, ?, ?)
  32. ''', (
  33. datetime.now().isoformat(),
  34. cpu_data.get('usage_percent', 0),
  35. mem_data.get('percent', 0),
  36. disk_data.get('percent', 0) if disk_data else 0,
  37. cpu_data.get('logical_cores', 0),
  38. mem_data.get('total', 0),
  39. disk_data.get('total', 0) if disk_data else 0
  40. ))
  41. conn.commit()

数据库设计原则

  • 使用时间序列数据模型
  • 包含关键指标的原始值和计算值
  • 考虑添加索引优化查询性能
  • 定期归档历史数据防止数据库膨胀

四、完整实现示例

  1. import os
  2. import time
  3. import psutil
  4. import json
  5. from datetime import datetime
  6. def collect_system_metrics():
  7. """收集所有系统指标"""
  8. metrics = {
  9. 'cpu': get_cpu_info(),
  10. 'memory': get_memory_info(),
  11. 'disk': get_disk_info(),
  12. 'network': get_network_info() # 可选网络指标
  13. }
  14. return metrics
  15. def get_network_info():
  16. """获取网络I/O统计(可选)"""
  17. net_io = psutil.net_io_counters()
  18. return {
  19. 'bytes_sent': net_io.bytes_sent,
  20. 'bytes_recv': net_io.bytes_recv,
  21. 'packets_sent': net_io.packets_sent,
  22. 'packets_recv': net_io.packets_recv
  23. }
  24. def main():
  25. # 初始化数据库(首次运行需要)
  26. # init_db()
  27. while True:
  28. try:
  29. # 收集指标
  30. metrics = collect_system_metrics()
  31. # 写入JSON文件
  32. timestamp = datetime.now().strftime('%Y%m%d')
  33. json_filename = f'metrics_{timestamp}.json'
  34. write_to_json(metrics, json_filename)
  35. # 写入CSV文件(简化版)
  36. cpu = metrics['cpu']
  37. mem = metrics['memory']
  38. disk = next(iter(metrics['disk'].values())) if metrics['disk'] else {}
  39. csv_data = [
  40. datetime.now().isoformat(),
  41. cpu['usage_percent'],
  42. mem['percent'],
  43. disk.get('percent', 0)
  44. ]
  45. csv_filename = 'metrics_summary.csv'
  46. write_header = not os.path.exists(csv_filename)
  47. with open(csv_filename, 'a', newline='') as f:
  48. writer = csv.writer(f)
  49. if write_header:
  50. writer.writerow(['timestamp', 'cpu', 'memory', 'disk'])
  51. writer.writerow(csv_data)
  52. # 休眠指定间隔
  53. time.sleep(60) # 每分钟采集一次
  54. except KeyboardInterrupt:
  55. print("监控已停止")
  56. break
  57. except Exception as e:
  58. print(f"采集出错: {str(e)}")
  59. time.sleep(10) # 出错后等待10秒再重试
  60. if __name__ == '__main__':
  61. main()

五、最佳实践与优化建议

  1. 采集频率优化

    • CPU/内存:建议1-5秒采集一次
    • 磁盘I/O:建议5-30秒采集一次
    • 网络指标:根据业务需求调整
  2. 资源消耗控制

    • 避免在性能关键路径上运行监控
    • 考虑使用异步采集减少对系统的影响
    • 对大规模服务器集群,采用分布式采集架构
  3. 数据存储策略

    • 原始数据保留30天
    • 聚合数据(如每小时平均值)长期保留
    • 定期执行数据库维护(VACUUM, REINDEX)
  4. 告警机制集成

    1. def check_thresholds(metrics):
    2. """检查指标是否超过阈值"""
    3. alerts = []
    4. if metrics['cpu']['usage_percent'] > 90:
    5. alerts.append("CPU使用率过高")
    6. if metrics['memory']['percent'] > 85:
    7. alerts.append("内存使用率过高")
    8. # 添加更多检查...
    9. return alerts
  5. 跨平台兼容性

    • 使用psutil已处理大部分平台差异
    • 对特定平台(如AIX、Solaris)可能需要额外处理
    • 考虑使用Docker容器化监控程序

六、总结与展望

本文详细介绍了使用Python获取系统基础性能参数并写入文件的完整实现方案。通过psutil库,开发者可以轻松获取CPU、内存、磁盘等关键指标,并通过JSON、CSV或数据库等多种方式持久化存储这些数据。

未来发展方向包括:

  1. 集成Prometheus等时序数据库
  2. 添加可视化展示层(如Grafana)
  3. 实现基于机器学习的异常检测
  4. 开发跨云平台的统一监控解决方案

系统性能监控是保障IT系统稳定运行的基础工作。通过本文介绍的Python实现方案,开发者可以快速构建起灵活、可扩展的性能监控系统,为业务连续性提供有力保障。

相关文章推荐

发表评论

活动