logo

Python实时监控系统性能:参数获取与文件存储全流程解析

作者:沙与沫2025.09.25 23:05浏览量:5

简介:本文详细讲解如何使用Python获取系统基础性能参数(CPU、内存、磁盘、网络等),并将数据结构化写入文件。通过标准库和第三方工具实现跨平台监控,提供完整代码示例与异常处理方案。

Python获取系统基础性能参数实现写入文件

在系统运维和性能分析场景中,实时获取并记录系统基础性能参数是故障排查、容量规划和性能优化的关键环节。Python凭借其丰富的标准库和第三方生态,能够高效实现跨平台的系统监控与数据持久化。本文将系统阐述如何通过Python获取CPU、内存、磁盘、网络等核心性能指标,并将数据结构化写入文件,提供完整的实现方案与优化建议。

一、核心性能参数分类与获取方式

1. CPU使用率监控

CPU作为计算核心,其使用率直接反映系统负载。Python可通过以下方式获取:

  • psutil:跨平台系统监控工具,支持获取CPU逻辑核心数、使用率、空闲时间等
  • /proc/stat文件(Linux):解析内核提供的CPU时间统计
  • wmi模块(Windows):通过WMI接口获取性能计数器
  1. import psutil
  2. def get_cpu_info():
  3. # 获取CPU逻辑核心数
  4. cpu_count = psutil.cpu_count(logical=True)
  5. # 获取每颗CPU的使用率(间隔1秒采样)
  6. cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
  7. # 获取系统全局CPU使用率
  8. global_percent = psutil.cpu_percent(interval=1)
  9. return {
  10. "cpu_cores": cpu_count,
  11. "percpu_usage": cpu_percent,
  12. "global_usage": global_percent
  13. }

2. 内存使用情况

内存监控需关注总量、已用、空闲和缓存等指标:

  • psutil.virtual_memory():获取物理内存信息
  • psutil.swap_memory():获取交换分区信息
  • memory_profiler:更精细的内存分析(需安装)
  1. def get_memory_info():
  2. mem = psutil.virtual_memory()
  3. swap = psutil.swap_memory()
  4. return {
  5. "total_memory": mem.total,
  6. "available": mem.available,
  7. "used_percent": mem.percent,
  8. "swap_total": swap.total,
  9. "swap_used_percent": swap.percent
  10. }

3. 磁盘I/O监控

磁盘性能直接影响系统响应速度,需监控:

  • 分区使用情况psutil.disk_usage()
  • 磁盘I/O统计psutil.disk_io_counters()
  • 文件系统信息os.statvfs()(Linux)
  1. def get_disk_info():
  2. # 获取所有磁盘分区
  3. partitions = psutil.disk_partitions(all=False)
  4. disk_info = []
  5. for partition in partitions:
  6. usage = psutil.disk_usage(partition.mountpoint)
  7. io_stats = psutil.disk_io_counters(perdisk=True).get(partition.device.split('/')[-1], None)
  8. disk_info.append({
  9. "device": partition.device,
  10. "mountpoint": partition.mountpoint,
  11. "fstype": partition.fstype,
  12. "total": usage.total,
  13. "used": usage.used,
  14. "free": usage.free,
  15. "percent": usage.percent,
  16. # 转换为MB单位
  17. "read_bytes": io_stats.read_bytes / (1024**2) if io_stats else 0,
  18. "write_bytes": io_stats.write_bytes / (1024**2) if io_stats else 0
  19. })
  20. return disk_info

4. 网络流量统计

网络监控需捕获:

  • 接口流量psutil.net_io_counters()
  • 连接状态psutil.net_connections()
  • 带宽使用率:需结合时间差计算
  1. def get_network_info():
  2. net_io = psutil.net_io_counters()
  3. connections = psutil.net_connections(kind='inet')
  4. return {
  5. "bytes_sent": net_io.bytes_sent / (1024**2), # 转换为MB
  6. "bytes_recv": net_io.bytes_recv / (1024**2),
  7. "packets_sent": net_io.packets_sent,
  8. "packets_recv": net_io.packets_recv,
  9. "active_connections": len(connections)
  10. }

二、数据持久化方案

1. CSV文件存储

适合结构化数据存储,便于后续分析:

  1. import csv
  2. from datetime import datetime
  3. def write_to_csv(data, filename="system_metrics.csv"):
  4. # 如果是首次写入,创建文件并写入表头
  5. first_write = not os.path.exists(filename)
  6. with open(filename, mode='a', newline='') as file:
  7. writer = csv.writer(file)
  8. if first_write:
  9. writer.writerow(["timestamp", "cpu_global", "mem_used%", "disk_used%", "net_sent(MB)", "net_recv(MB)"])
  10. # 提取关键指标写入
  11. writer.writerow([
  12. datetime.now().isoformat(),
  13. data["cpu"]["global_usage"],
  14. data["memory"]["used_percent"],
  15. sum(disk["percent"] for disk in data["disk"])/len(data["disk"]) if data["disk"] else 0,
  16. data["network"]["bytes_sent"],
  17. data["network"]["bytes_recv"]
  18. ])

2. JSON文件存储

适合嵌套数据结构,保留完整信息:

  1. import json
  2. def write_to_json(data, filename="system_metrics.json"):
  3. # 读取现有数据(如果存在)
  4. existing_data = []
  5. if os.path.exists(filename):
  6. with open(filename, 'r') as f:
  7. try:
  8. existing_data = json.load(f)
  9. except json.JSONDecodeError:
  10. existing_data = []
  11. # 添加新记录
  12. new_entry = {
  13. "timestamp": datetime.now().isoformat(),
  14. "cpu": data["cpu"],
  15. "memory": data["memory"],
  16. "disk": data["disk"],
  17. "network": data["network"]
  18. }
  19. existing_data.append(new_entry)
  20. # 写入文件
  21. with open(filename, 'w') as f:
  22. json.dump(existing_data, f, indent=2)

3. 数据库存储(SQLite示例)

适合长期存储和复杂查询:

  1. import sqlite3
  2. def init_db(db_path="system_metrics.db"):
  3. conn = sqlite3.connect(db_path)
  4. cursor = conn.cursor()
  5. cursor.execute('''
  6. CREATE TABLE IF NOT EXISTS metrics (
  7. id INTEGER PRIMARY KEY AUTOINCREMENT,
  8. timestamp TEXT NOT NULL,
  9. cpu_global REAL,
  10. mem_used_percent REAL,
  11. disk_avg_used_percent REAL,
  12. net_sent_mb REAL,
  13. net_recv_mb REAL
  14. )
  15. ''')
  16. conn.commit()
  17. conn.close()
  18. def write_to_db(data, db_path="system_metrics.db"):
  19. conn = sqlite3.connect(db_path)
  20. cursor = conn.cursor()
  21. cursor.execute('''
  22. INSERT INTO metrics
  23. (timestamp, cpu_global, mem_used_percent, disk_avg_used_percent, net_sent_mb, net_recv_mb)
  24. VALUES (?, ?, ?, ?, ?, ?)
  25. ''', (
  26. datetime.now().isoformat(),
  27. data["cpu"]["global_usage"],
  28. data["memory"]["used_percent"],
  29. sum(disk["percent"] for disk in data["disk"])/len(data["disk"]) if data["disk"] else 0,
  30. data["network"]["bytes_sent"],
  31. data["network"]["bytes_recv"]
  32. ))
  33. conn.commit()
  34. conn.close()

三、完整实现示例

  1. import os
  2. import time
  3. import psutil
  4. from datetime import datetime
  5. import json
  6. import csv
  7. import sqlite3
  8. def collect_system_metrics():
  9. # CPU信息
  10. cpu_info = {
  11. "cpu_cores": psutil.cpu_count(logical=True),
  12. "percpu_usage": psutil.cpu_percent(interval=1, percpu=True),
  13. "global_usage": psutil.cpu_percent(interval=1)
  14. }
  15. # 内存信息
  16. mem = psutil.virtual_memory()
  17. swap = psutil.swap_memory()
  18. memory_info = {
  19. "total_memory": mem.total,
  20. "available": mem.available,
  21. "used_percent": mem.percent,
  22. "swap_total": swap.total,
  23. "swap_used_percent": swap.percent
  24. }
  25. # 磁盘信息
  26. partitions = psutil.disk_partitions(all=False)
  27. disk_info = []
  28. for partition in partitions:
  29. try:
  30. usage = psutil.disk_usage(partition.mountpoint)
  31. io_stats = psutil.disk_io_counters(perdisk=True).get(partition.device.split('/')[-1], None)
  32. disk_info.append({
  33. "device": partition.device,
  34. "mountpoint": partition.mountpoint,
  35. "fstype": partition.fstype,
  36. "total": usage.total,
  37. "used": usage.used,
  38. "free": usage.free,
  39. "percent": usage.percent,
  40. "read_bytes": io_stats.read_bytes / (1024**2) if io_stats else 0,
  41. "write_bytes": io_stats.write_bytes / (1024**2) if io_stats else 0
  42. })
  43. except PermissionError:
  44. continue
  45. # 网络信息
  46. net_io = psutil.net_io_counters()
  47. connections = psutil.net_connections(kind='inet')
  48. network_info = {
  49. "bytes_sent": net_io.bytes_sent / (1024**2),
  50. "bytes_recv": net_io.bytes_recv / (1024**2),
  51. "packets_sent": net_io.packets_sent,
  52. "packets_recv": net_io.packets_recv,
  53. "active_connections": len(connections)
  54. }
  55. return {
  56. "timestamp": datetime.now().isoformat(),
  57. "cpu": cpu_info,
  58. "memory": memory_info,
  59. "disk": disk_info,
  60. "network": network_info
  61. }
  62. def save_metrics(data, csv_file="metrics.csv", json_file="metrics.json", db_file="metrics.db"):
  63. # 初始化数据库
  64. if not os.path.exists(db_file):
  65. conn = sqlite3.connect(db_file)
  66. cursor = conn.cursor()
  67. cursor.execute('''
  68. CREATE TABLE metrics (
  69. id INTEGER PRIMARY KEY AUTOINCREMENT,
  70. timestamp TEXT NOT NULL,
  71. cpu_global REAL,
  72. mem_used_percent REAL,
  73. disk_avg_used_percent REAL,
  74. net_sent_mb REAL,
  75. net_recv_mb REAL
  76. )
  77. ''')
  78. conn.commit()
  79. conn.close()
  80. # 写入CSV
  81. first_write = not os.path.exists(csv_file)
  82. with open(csv_file, mode='a', newline='') as file:
  83. writer = csv.writer(file)
  84. if first_write:
  85. writer.writerow(["timestamp", "cpu_global", "mem_used%", "disk_used%", "net_sent(MB)", "net_recv(MB)"])
  86. writer.writerow([
  87. data["timestamp"],
  88. data["cpu"]["global_usage"],
  89. data["memory"]["used_percent"],
  90. sum(disk["percent"] for disk in data["disk"])/len(data["disk"]) if data["disk"] else 0,
  91. data["network"]["bytes_sent"],
  92. data["network"]["bytes_recv"]
  93. ])
  94. # 写入JSON
  95. existing_data = []
  96. if os.path.exists(json_file):
  97. with open(json_file, 'r') as f:
  98. try:
  99. existing_data = json.load(f)
  100. except json.JSONDecodeError:
  101. existing_data = []
  102. existing_data.append(data)
  103. with open(json_file, 'w') as f:
  104. json.dump(existing_data, f, indent=2)
  105. # 写入数据库
  106. conn = sqlite3.connect(db_file)
  107. cursor = conn.cursor()
  108. cursor.execute('''
  109. INSERT INTO metrics
  110. (timestamp, cpu_global, mem_used_percent, disk_avg_used_percent, net_sent_mb, net_recv_mb)
  111. VALUES (?, ?, ?, ?, ?, ?)
  112. ''', (
  113. data["timestamp"],
  114. data["cpu"]["global_usage"],
  115. data["memory"]["used_percent"],
  116. sum(disk["percent"] for disk in data["disk"])/len(data["disk"]) if data["disk"] else 0,
  117. data["network"]["bytes_sent"],
  118. data["network"]["bytes_recv"]
  119. ))
  120. conn.commit()
  121. conn.close()
  122. if __name__ == "__main__":
  123. # 首次运行初始化数据库
  124. init_db()
  125. try:
  126. while True:
  127. metrics = collect_system_metrics()
  128. save_metrics(metrics)
  129. print(f"Metrics collected at {metrics['timestamp']}")
  130. time.sleep(60) # 每分钟采集一次
  131. except KeyboardInterrupt:
  132. print("Monitoring stopped by user")

四、优化建议与注意事项

  1. 性能影响:高频采集(如<1秒间隔)可能影响系统性能,建议根据需求调整采样频率
  2. 权限问题:Linux下获取某些磁盘信息可能需要root权限
  3. 跨平台兼容:使用psutil可最大限度保证代码跨平台性,但某些高级功能可能平台受限
  4. 数据旋转:长期运行需实现日志轮转,避免单个文件过大
  5. 异常处理:添加完善的异常捕获,特别是文件操作和网络接口
  6. 可视化建议:可将数据导入Grafana或Python的Matplotlib/Plotly进行可视化分析

五、扩展应用场景

  1. 自动化运维:集成到Ansible/SaltStack等自动化工具中
  2. 容器监控:在Docker/Kubernetes环境中监控容器资源使用
  3. 性能基准测试:记录系统在不同负载下的性能表现
  4. 异常检测:通过分析历史数据建立基线,检测异常资源使用

通过本文的实现方案,开发者可以快速构建一个轻量级的系统监控工具,既能满足即时性能分析需求,也可作为更复杂监控系统的基础组件。

相关文章推荐

发表评论

活动