logo

Python精准监控:系统性能参数获取与文件存储实战指南

作者:有好多问题2025.09.25 23:05浏览量:0

简介:本文详细介绍如何使用Python获取系统CPU、内存、磁盘等基础性能参数,并通过文件操作实现数据持久化存储,为系统监控和性能分析提供实用方案。

Python精准监控:系统性能参数获取与文件存储实战指南

在系统运维和开发过程中,实时获取并分析系统性能参数是保障服务稳定性的关键环节。Python凭借其丰富的标准库和第三方生态,能够高效实现系统性能监控数据的采集与存储。本文将系统阐述如何使用Python获取CPU使用率、内存占用、磁盘I/O等基础性能指标,并将数据结构化写入文件,为后续的性能分析和自动化运维提供数据支撑。

一、系统性能参数采集的核心方法

1. CPU使用率采集

CPU使用率是衡量系统负载的核心指标,Python可通过psutil库实现跨平台采集。psutil提供了cpu_percent()方法,支持按核心或全局统计:

  1. import psutil
  2. def get_cpu_usage():
  3. # 获取所有CPU核心的使用率(列表形式)
  4. cpu_percent_per_core = psutil.cpu_percent(interval=1, percpu=True)
  5. # 获取全局CPU使用率
  6. cpu_percent_total = psutil.cpu_percent(interval=1)
  7. return {
  8. "total_usage": cpu_percent_total,
  9. "per_core_usage": cpu_percent_per_core
  10. }

关键参数说明

  • interval=1:采样间隔(秒),确保数据准确性
  • percpu=True:返回各核心使用率,否则返回全局值

2. 内存信息采集

内存状态直接影响系统运行效率,psutil.virtual_memory()可获取详细内存信息:

  1. def get_memory_info():
  2. mem = psutil.virtual_memory()
  3. return {
  4. "total": mem.total, # 总内存(字节)
  5. "available": mem.available, # 可用内存(字节)
  6. "used": mem.used, # 已用内存(字节)
  7. "percent": mem.percent # 内存使用率(%)
  8. }

数据解析要点

  • available字段比free更准确,包含缓存和缓冲区可回收内存
  • 字节单位需转换为GB/MB便于阅读(1GB=1024^3字节)

3. 磁盘I/O与空间监控

磁盘性能是系统瓶颈的常见来源,需同时监控使用情况和I/O负载:

  1. def get_disk_info():
  2. # 磁盘分区使用情况
  3. disk_usage = psutil.disk_usage('/')
  4. # 磁盘I/O统计(需结合采样间隔计算速率)
  5. io_before = psutil.disk_io_counters()
  6. time.sleep(1) # 间隔1秒
  7. io_after = psutil.disk_io_counters()
  8. read_speed = (io_after.read_bytes - io_before.read_bytes) / 1024 # KB/s
  9. write_speed = (io_after.write_bytes - io_before.write_bytes) / 1024
  10. return {
  11. "usage": {
  12. "total": disk_usage.total,
  13. "used": disk_usage.used,
  14. "free": disk_usage.free,
  15. "percent": disk_usage.percent
  16. },
  17. "io_speed": {
  18. "read_kbps": read_speed,
  19. "write_kbps": write_speed
  20. }
  21. }

实现注意事项

  • 磁盘I/O速率需通过两次采样差值计算
  • 路径'/'可根据需要替换为其他分区

二、性能数据结构化存储方案

1. CSV文件存储实现

CSV格式适合结构化数据存储,可使用Python内置csv模块:

  1. import csv
  2. from datetime import datetime
  3. def write_to_csv(data, filename="system_metrics.csv"):
  4. timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  5. flat_data = {
  6. "timestamp": timestamp,
  7. "cpu_total": data["cpu"]["total_usage"],
  8. "cpu_core1": data["cpu"]["per_core_usage"][0] if len(data["cpu"]["per_core_usage"]) > 0 else 0,
  9. "mem_used_percent": data["memory"]["percent"],
  10. "disk_used_percent": data["disk"]["usage"]["percent"],
  11. "disk_read_kbps": data["disk"]["io_speed"]["read_kbps"],
  12. "disk_write_kbps": data["disk"]["io_speed"]["write_kbps"]
  13. }
  14. fieldnames = ["timestamp", "cpu_total", "cpu_core1", "mem_used_percent",
  15. "disk_used_percent", "disk_read_kbps", "disk_write_kbps"]
  16. # 写入模式(追加)
  17. with open(filename, mode='a', newline='') as file:
  18. writer = csv.DictWriter(file, fieldnames=fieldnames)
  19. # 仅在文件为空时写入表头
  20. if file.tell() == 0:
  21. writer.writeheader()
  22. writer.writerow(flat_data)

优化建议

  • 使用mode='a'实现数据追加
  • 通过file.tell()判断是否为空文件,避免重复写入表头
  • 添加时间戳字段实现时序数据分析

2. JSON文件存储方案

JSON格式适合嵌套数据存储,可使用json模块:

  1. import json
  2. def write_to_json(data, filename="system_metrics.json"):
  3. timestamp = datetime.now().isoformat()
  4. full_data = {
  5. "timestamp": timestamp,
  6. "metrics": data
  7. }
  8. # 读取现有数据(如果存在)
  9. try:
  10. with open(filename, 'r') as file:
  11. existing_data = json.load(file)
  12. except FileNotFoundError:
  13. existing_data = []
  14. # 追加新数据
  15. existing_data.append(full_data)
  16. # 写入文件
  17. with open(filename, 'w') as file:
  18. json.dump(existing_data, file, indent=4)

适用场景

  • 需要保留完整数据结构的场景
  • 后续需用Python或其他语言解析的场景
  • 数据量较小(JSON不适合超大文件)

三、完整监控系统实现

1. 主监控程序框架

  1. import time
  2. import psutil
  3. from datetime import datetime
  4. def collect_metrics():
  5. return {
  6. "cpu": get_cpu_usage(),
  7. "memory": get_memory_info(),
  8. "disk": get_disk_info()
  9. }
  10. def main(interval=5, output_format="csv"):
  11. while True:
  12. metrics = collect_metrics()
  13. if output_format == "csv":
  14. write_to_csv(metrics)
  15. elif output_format == "json":
  16. write_to_json(metrics)
  17. print(f"[{datetime.now()}] Metrics collected and saved")
  18. time.sleep(interval)
  19. if __name__ == "__main__":
  20. main(interval=5, output_format="csv")

2. 高级功能扩展

(1)异常处理机制

  1. def safe_collect_metrics():
  2. try:
  3. return collect_metrics()
  4. except Exception as e:
  5. error_data = {
  6. "timestamp": datetime.now().isoformat(),
  7. "error": str(e),
  8. "metrics": None
  9. }
  10. write_to_json(error_data, "error_log.json")
  11. return None

(2)数据压缩存储

对于长期监控,可使用gzip压缩JSON文件:

  1. import gzip
  2. import json
  3. def write_compressed_json(data, filename="system_metrics.json.gz"):
  4. with gzip.open(filename, 'wt', encoding='utf-8') as file:
  5. json.dump(data, file, indent=4)

四、实际应用建议

  1. 采样间隔选择

    • 短间隔(1-5秒):适合实时监控
    • 长间隔(60秒+):适合长期趋势分析
  2. 数据轮转策略

    1. import os
    2. def rotate_log(filename, max_size_mb=10):
    3. if os.path.exists(filename) and os.path.getsize(filename) > max_size_mb * 1024 * 1024:
    4. backup_name = f"{filename}.{datetime.now().strftime('%Y%m%d%H%M%S')}"
    5. os.rename(filename, backup_name)
  3. 可视化集成

    • 使用pandas读取CSV文件进行数据分析
    • 结合matplotlibPlotly生成可视化图表

五、性能优化技巧

  1. 减少I/O操作

    • 批量写入代替频繁单次写入
    • 使用内存缓冲区暂存数据
  2. 多线程实现

    1. import threading
    2. def async_write(data, writer_func):
    3. thread = threading.Thread(target=writer_func, args=(data,))
    4. thread.start()
  3. 选择高效存储格式

    • 小数据量:JSON/CSV
    • 大数据量:Parquet/HDF5(需pandas支持)

六、完整代码示例

  1. import psutil
  2. import csv
  3. import json
  4. from datetime import datetime
  5. import time
  6. import gzip
  7. import os
  8. def get_cpu_usage():
  9. return {
  10. "total_usage": psutil.cpu_percent(interval=1),
  11. "per_core_usage": psutil.cpu_percent(interval=1, percpu=True)
  12. }
  13. def get_memory_info():
  14. mem = psutil.virtual_memory()
  15. return {
  16. "total": mem.total,
  17. "available": mem.available,
  18. "used": mem.used,
  19. "percent": mem.percent
  20. }
  21. def get_disk_info():
  22. disk_usage = psutil.disk_usage('/')
  23. io_before = psutil.disk_io_counters()
  24. time.sleep(1)
  25. io_after = psutil.disk_io_counters()
  26. read_speed = (io_after.read_bytes - io_before.read_bytes) / 1024
  27. write_speed = (io_after.write_bytes - io_before.write_bytes) / 1024
  28. return {
  29. "usage": {
  30. "total": disk_usage.total,
  31. "used": disk_usage.used,
  32. "free": disk_usage.free,
  33. "percent": disk_usage.percent
  34. },
  35. "io_speed": {
  36. "read_kbps": read_speed,
  37. "write_kbps": write_speed
  38. }
  39. }
  40. def write_to_csv(data, filename="system_metrics.csv"):
  41. timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  42. flat_data = {
  43. "timestamp": timestamp,
  44. "cpu_total": data["cpu"]["total_usage"],
  45. "cpu_core1": data["cpu"]["per_core_usage"][0] if len(data["cpu"]["per_core_usage"]) > 0 else 0,
  46. "mem_used_percent": data["memory"]["percent"],
  47. "disk_used_percent": data["disk"]["usage"]["percent"],
  48. "disk_read_kbps": data["disk"]["io_speed"]["read_kbps"],
  49. "disk_write_kbps": data["disk"]["io_speed"]["write_kbps"]
  50. }
  51. fieldnames = ["timestamp", "cpu_total", "cpu_core1", "mem_used_percent",
  52. "disk_used_percent", "disk_read_kbps", "disk_write_kbps"]
  53. with open(filename, mode='a', newline='') as file:
  54. writer = csv.DictWriter(file, fieldnames=fieldnames)
  55. if file.tell() == 0:
  56. writer.writeheader()
  57. writer.writerow(flat_data)
  58. def write_compressed_json(data, filename="system_metrics.json.gz"):
  59. timestamp = datetime.now().isoformat()
  60. full_data = {
  61. "timestamp": timestamp,
  62. "metrics": data
  63. }
  64. try:
  65. with gzip.open(filename, 'rt') as file:
  66. existing_data = json.loads(file.read())
  67. except (FileNotFoundError, json.JSONDecodeError):
  68. existing_data = []
  69. existing_data.append(full_data)
  70. with gzip.open(filename, 'wt', encoding='utf-8') as file:
  71. json.dump(existing_data, file, indent=4)
  72. def main(interval=5, output_format="csv"):
  73. while True:
  74. metrics = {
  75. "cpu": get_cpu_usage(),
  76. "memory": get_memory_info(),
  77. "disk": get_disk_info()
  78. }
  79. if output_format == "csv":
  80. write_to_csv(metrics)
  81. elif output_format == "json_gz":
  82. write_compressed_json(metrics)
  83. print(f"[{datetime.now()}] Metrics collected ({output_format})")
  84. time.sleep(interval)
  85. if __name__ == "__main__":
  86. # 使用示例:python script.py 5 csv
  87. import sys
  88. interval = int(sys.argv[1]) if len(sys.argv) > 1 else 5
  89. output_format = sys.argv[2] if len(sys.argv) > 2 else "csv"
  90. main(interval, output_format)

七、总结与展望

本文系统阐述了Python实现系统性能监控的核心方法,涵盖CPU、内存、磁盘等关键指标的采集技术,以及CSV/JSON两种主流存储方案的实现细节。通过完整的代码示例和优化建议,开发者可以快速构建定制化的监控系统。

未来发展方向包括:

  1. 集成Prometheus/Grafana构建可视化监控平台
  2. 添加网络性能监控(带宽、延迟)
  3. 实现异常检测与自动告警机制
  4. 支持容器化部署(Docker/K8s)

建议开发者根据实际需求选择合适的存储格式和采样间隔,并考虑添加数据压缩和轮转策略以应对长期监控场景。

相关文章推荐

发表评论