Python 自动化运维脚本合集 – 提升 10 倍工作效率 原创

温馨提示:
本文最后更新于 2026-04-07,已超过 0 天没有更新。 若文章内的图片失效(无法正常加载),请留言反馈或直接 联系我

分类:Python 实战 | 标签:Python, 自动化,运维,脚本


一、环境准备

1.1 Python 环境配置

# 安装 Python 3.10+
apt install python3 python3-pip python3-venv

# 创建虚拟环境
python3 -m venv /opt/venv
source /opt/venv/bin/activate

# 安装常用库
pip install requests paramiko psutil schedule python-dotenv

1.2 项目结构

automation-scripts/
├── .env                 # 环境变量
├── requirements.txt     # 依赖
├── config.py           # 配置
├── utils/              # 工具函数
├── scripts/            # 脚本
│   ├── backup.py
│   ├── monitor.py
│   └── deploy.py
└── logs/               # 日志

二、服务器监控脚本

2.1 系统资源监控

#!/usr/bin/env python3
# scripts/monitor.py

import psutil
import json
from datetime import datetime

def get_system_stats():
    """获取系统资源状态"""
    return {
        'timestamp': datetime.now().isoformat(),
        'cpu_percent': psutil.cpu_percent(interval=1),
        'memory_percent': psutil.virtual_memory().percent,
        'disk_usage': psutil.disk_usage('/').percent,
    }

def check_thresholds(stats, thresholds):
    """检查是否超过阈值"""
    alerts = []
    if stats['cpu_percent'] > thresholds['cpu']:
        alerts.append(f"CPU 使用率过高:{stats['cpu_percent']}%")
    if stats['memory_percent'] > thresholds['memory']:
        alerts.append(f"内存使用率过高:{stats['memory_percent']}%")
    if stats['disk_usage'] > thresholds['disk']:
        alerts.append(f"磁盘使用率过高:{stats['disk_usage']}%")
    return alerts

if __name__ == '__main__':
    thresholds = {'cpu': 80, 'memory': 80, 'disk': 90}
    stats = get_system_stats()
    alerts = check_thresholds(stats, thresholds)
    
    if alerts:
        print(json.dumps({'status': 'warning', 'alerts': alerts}, indent=2))
    else:
        print(json.dumps({'status': 'ok', 'stats': stats}, indent=2))

2.2 定时监控任务

# scripts/scheduled_monitor.py
import schedule
import time

def monitor_job():
    """定时监控任务"""
    stats = get_system_stats()
    alerts = check_thresholds(stats, thresholds)
    
    if alerts:
        send_alert('\n'.join(alerts))
        print(f"[ALERT] {datetime.now()}: {alerts}")
    else:
        print(f"[OK] {datetime.now()}")

# 每 5 分钟执行一次
schedule.every(5).minutes.do(monitor_job)

while True:
    schedule.run_pending()
    time.sleep(1)

三、自动备份脚本

3.1 数据库备份

#!/usr/bin/env python3
# scripts/backup.py

import subprocess
import os
from datetime import datetime
from dotenv import load_dotenv

load_dotenv()

def backup_mysql(database, backup_dir):
    """MySQL 数据库备份"""
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    backup_file = f"{backup_dir}/{database}_{timestamp}.sql.gz"
    
    cmd = f"mysqldump -u{os.getenv('DB_USER')} -p{os.getenv('DB_PASS')} " \
          f"--single-transaction {database} | gzip > {backup_file}"
    
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    
    if result.returncode == 0:
        print(f"✅ 备份成功:{backup_file}")
        cleanup_old_backups(backup_dir, days=7)
        return True
    else:
        print(f"❌ 备份失败:{result.stderr}")
        return False

def cleanup_old_backups(backup_dir, days=7):
    """清理 7 天前的备份"""
    cutoff = datetime.now().timestamp() - (days * 86400)
    for f in os.listdir(backup_dir):
        filepath = os.path.join(backup_dir, f)
        if os.path.getmtime(filepath) < cutoff:
            os.remove(filepath)
            print(f"已删除旧备份:{f}")

3.2 文件备份(含 SSH 远程)

# scripts/remote_backup.py
import paramiko
from pathlib import Path

def backup_remote_files(hostname, username, password, remote_path, local_backup):
    """远程服务器文件备份"""
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(hostname, username=username, password=password)
    
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    remote_tar = f"/tmp/backup_{timestamp}.tar.gz"
    
    stdin, stdout, stderr = ssh.exec_command(
        f"tar -czf {remote_tar} -C {Path(remote_path).parent} {Path(remote_path).name}"
    )
    
    if stdout.channel.recv_exit_status() == 0:
        sftp = ssh.open_sftp()
        sftp.get(remote_tar, local_backup)
        sftp.close()
        ssh.exec_command(f"rm {remote_tar}")
        print(f"✅ 远程备份成功:{local_backup}")
    
    ssh.close()

四、日志分析脚本

4.1 Nginx 日志分析

#!/usr/bin/env python3
# scripts/log_analyzer.py

import re
from collections import Counter

def parse_nginx_log(log_file):
    """解析 Nginx 日志"""
    pattern = r'(\S+) - - \[(.*?)\] "(\S+) (\S+) \S+" (\d+) (\d+)'
    
    ips = []
    status_codes = []
    urls = []
    
    with open(log_file, 'r') as f:
        for line in f:
            match = re.match(pattern, line)
            if match:
                ip, _, method, url, status, size = match.groups()
                ips.append(ip)
                status_codes.append(status)
                urls.append(url)
    
    return {
        'top_ips': Counter(ips).most_common(10),
        'status_codes': Counter(status_codes),
        'top_urls': Counter(urls).most_common(10),
        'total_requests': len(ips)
    }

4.2 异常检测

def detect_anomalies(stats):
    """检测异常访问"""
    anomalies = []
    
    # 检测单 IP 高频访问
    for ip, count in stats['top_ips']:
        if count > 1000:
            anomalies.append(f"⚠️ 可疑 IP: {ip} ({count} 次)")
    
    # 检测错误率
    total = stats['total_requests']
    errors = sum(count for code, count in stats['status_codes'].items() 
                 if code.startswith('4') or code.startswith('5'))
    
    if total > 0 and (errors / total) > 0.1:
        anomalies.append(f"⚠️ 错误率过高:{errors/total*100:.1f}%")
    
    return anomalies

五、自动部署脚本

5.1 Git 自动拉取

#!/usr/bin/env python3
# scripts/deploy.py

import subprocess
import os

def deploy_project(repo_path, branch='main'):
    """自动部署项目"""
    os.chdir(repo_path)
    
    # 拉取最新代码
    result = subprocess.run(
        ['git', 'pull', 'origin', branch],
        capture_output=True, text=True
    )
    
    if result.returncode != 0:
        print(f"❌ Git pull 失败:{result.stderr}")
        return False
    
    print(f"✅ 代码已更新:{result.stdout}")
    
    # 安装依赖
    if os.path.exists('requirements.txt'):
        subprocess.run(['pip', 'install', '-r', 'requirements.txt'])
    
    # 重启服务
    subprocess.run(['systemctl', 'restart', 'myapp'])
    print("✅ 服务已重启")
    
    return True

5.2 WordPress 自动更新

# scripts/wp_update.py
import subprocess

def update_wordpress():
    """WordPress 自动更新"""
    wp_cli = "wp --allow-root"
    wp_path = "/var/www/html/wordpress"
    
    commands = [
        f"{wp_cli} core update --path={wp_path}",
        f"{wp_cli} plugin update --all --path={wp_path}",
        f"{wp_cli} theme update --all --path={wp_path}",
        f"{wp_cli} cache flush --path={wp_path}"
    ]
    
    for cmd in commands:
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
        if result.returncode != 0:
            print(f"❌ 更新失败:{cmd}")
            return False
        print(f"✅ 更新成功:{cmd}")
    
    return True

六、邮件通知模块

6.1 邮件发送工具

# utils/email_sender.py
import smtplib
from email.mime.text import MIMEText
from dotenv import load_dotenv
import os

load_dotenv()

class EmailSender:
    def __init__(self):
        self.smtp_server = os.getenv('SMTP_SERVER')
        self.smtp_port = int(os.getenv('SMTP_PORT', 587))
        self.username = os.getenv('SMTP_USER')
        self.password = os.getenv('SMTP_PASS')
    
    def send(self, to, subject, body):
        """发送邮件"""
        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = self.username
        msg['To'] = to
        
        with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
            server.starttls()
            server.login(self.username, self.password)
            server.send_message(msg)
        
        print(f"✅ 邮件已发送:{to}")

七、定时任务管理

7.1 使用 schedule 库

# scripts/scheduler.py
import schedule
import time

def job():
    print(f"[{datetime.now()}] 执行定时任务")

# 各种定时任务
schedule.every().day.at("02:00").do(backup_job)
schedule.every().hour.do(monitor_job)
schedule.every().monday.at("03:00").do(cleanup_job)
schedule.every(5).minutes.do(health_check)

while True:
    schedule.run_pending()
    time.sleep(1)

7.2 使用 systemd timer

# /etc/systemd/system/backup.timer
[Unit]
Description=Daily backup timer

[Timer]
OnCalendar=*-*-* 02:00:00
Persistent=true

[Install]
WantedBy=timers.target

八、实战:完整运维系统

8.1 系统架构

┌─────────────┐
│  监控模块   │──→ 异常检测 ──→ 邮件告警
├─────────────┤
│  备份模块   │──→ 本地备份 + 远程备份
├─────────────┤
│  部署模块   │──→ Git 拉取 ──→ 服务重启
├─────────────┤
│  日志模块   │──→ 日志分析 ──→ 报告生成
└─────────────┘

8.2 配置文件

# .env
DB_USER=root
DB_PASS=your_password
SMTP_SERVER=smtp.example.com
SMTP_PORT=587
SMTP_USER=alert@example.com
SMTP_PASS=your_password
BACKUP_DIR=/backup

8.3 主程序入口

# main.py
from scripts import monitor, backup
from utils.email_sender import EmailSender

def main():
    # 监控
    stats = monitor.get_system_stats()
    alerts = monitor.check_thresholds(stats)
    
    if alerts:
        sender = EmailSender()
        sender.send('admin@example.com', '服务器告警', '\n'.join(alerts))
    
    # 备份
    backup.backup_mysql('wordpress', '/backup/mysql')

if __name__ == '__main__':
    main()

九、总结与扩展

核心脚本清单

✅ monitor.py - 系统监控
✅ backup.py - 数据备份
✅ deploy.py - 自动部署
✅ log_analyzer.py - 日志分析
✅ email_sender.py - 邮件通知
✅ scheduler.py - 定时任务

扩展建议

  • 集成钉钉/企业微信告警
  • 添加 Web 管理界面
  • 使用 Prometheus + Grafana 监控
  • 容器化部署(Docker)

安全注意事项

  • 敏感信息使用 .env 管理
  • 脚本文件设置合适权限(chmod 600)
  • 定期更新依赖库
  • 日志脱敏处理

参考资源

  • Python 官方文档
  • psutil 库文档
  • schedule 库文档
  • Paramiko SSH 库