목차
Python 脚本部分
企业微信告警
FTP 客户端
SSH 客户端
Saltstack 客户端
vCenter 客户端
获取域名 ssl 证书过期时间
发送今天的天气预报以及未来的天气趋势图
Shell 脚本部分
SVN 完整备份
zabbix 监控用户密码过期
构建本地YUM
读者需求解答
백엔드 개발 파이썬 튜토리얼 슈퍼 하드코어! 11가지 매우 실용적인 Python 및 Shell 스크립트 예제!

슈퍼 하드코어! 11가지 매우 실용적인 Python 및 Shell 스크립트 예제!

Apr 12, 2023 pm 01:52 PM
python 각본 shell

슈퍼 하드코어! 11가지 매우 실용적인 Python 및 Shell 스크립트 예제!

Python 脚本部分实例:企业微信告警、FTP 客户端、SSH 客户端、Saltstack 客户端、vCenter 客户端、获取域名 ssl 证书过期时间、发送今天的天气预报以及未来的天气趋势图;

Shell 脚本部分实例:SVN 完整备份、Zabbix 监控用户密码过期、构建本地 YUM 以及上篇文章中有读者的需求(负载高时,查出占用比较高的进程脚本并存储或推送通知);

篇幅有些长,还请大家耐心翻到文末,毕竟有彩蛋。

슈퍼 하드코어! 11가지 매우 실용적인 Python 및 Shell 스크립트 예제!

Python 脚本部分

企业微信告警

此脚本通过企业微信应用,进行微信告警,可用于 Zabbix 监控。

# -*- coding: utf-8 -*-
import requests
import json
class DLF:
 def __init__(self, corpid, corpsecret):
 self.url = "https://qyapi.weixin.qq.com/cgi-bin"
 self.corpid = corpid
 self.corpsecret = corpsecret
 self._token = self._get_token()
 def _get_token(self):
 '''
 获取企业微信API接口的access_token
 :return:
 '''
 token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
 try:
 res = requests.get(token_url).json()
 token = res['access_token']
 return token
 except Exception as e:
 return str(e)
 def _get_media_id(self, file_obj):
 get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
 data = {"media": file_obj}
 try:
 res = requests.post(url=get_media_url, files=data)
 media_id = res.json()['media_id']
 return media_id
 except Exception as e:
 return str(e)
 def send_text(self, agentid, content, touser=None, toparty=None):
 send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
 send_data = {
 "touser": touser,
 "toparty": toparty,
 "msgtype": "text",
 "agentid": agentid,
 "text": {
 "content": content
 }
 }
 try:
 res = requests.post(send_msg_url, data=json.dumps(send_data))
 except Exception as e:
 return str(e)
 def send_image(self, agentid, file_obj, touser=None, toparty=None):
 media_id = self._get_media_id(file_obj)
 send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
 send_data = {
 "touser": touser,
 "toparty": toparty,
 "msgtype": "image",
 "agentid": agentid,
 "image": {
 "media_id": media_id
}
 }
 try:
 res = requests.post(send_msg_url, data=json.dumps(send_data))
 except Exception as e:
 return str(e)
로그인 후 복사

FTP 客户端

通过 ftplib 模块操作 ftp 服务器,进行上传下载等操作。

# -*- coding: utf-8 -*-
from ftplib import FTP
from os import path
import copy
class FTPClient:
 def __init__(self, host, user, passwd, port=21):
 self.host = host
 self.user = user
 self.passwd = passwd
 self.port = port
 self.res = {'status': True, 'msg': None}
 self._ftp = None
 self._login()
 def _login(self):
 '''
 登录FTP服务器
 :return: 连接或登录出现异常时返回错误信息
 '''
 try:
 self._ftp = FTP()
 self._ftp.connect(self.host, self.port, timeout=30)
 self._ftp.login(self.user, self.passwd)
 except Exception as e:
 return e
 def upload(self, localpath, remotepath=None):
 '''
 上传ftp文件
 :param localpath: local file path
 :param remotepath: remote file path
 :return:
 '''
 if not localpath: return 'Please select a local file. '
 # 读取本地文件
 # fp = open(localpath, 'rb')
 # 如果未传递远程文件路径,则上传到当前目录,文件名称同本地文件
 if not remotepath:
 remotepath = path.basename(localpath)
 # 上传文件
 self._ftp.storbinary('STOR ' + remotepath, localpath)
 # fp.close()
 def download(self, remotepath, localpath=None):
 '''
 localpath
 :param localpath: local file path
 :param remotepath: remote file path
 :return:
 '''
 if not remotepath: return 'Please select a remote file. '
 # 如果未传递本地文件路径,则下载到当前目录,文件名称同远程文件
 if not localpath:
 localpath = path.basename(remotepath)
 # 如果localpath是目录的话就和remotepath的basename拼接
 if path.isdir(localpath):
 localpath = path.join(localpath, path.basename(remotepath))
 # 写入本地文件
 fp = open(localpath, 'wb')
 # 下载文件
 self._ftp.retrbinary('RETR ' + remotepath, fp.write)
 fp.close()
 def nlst(self, dir='/'):
 '''
 查看目录下的内容
 :return: 以列表形式返回目录下的所有内容
 '''
 files_list = self._ftp.nlst(dir)
 return files_list
 def rmd(self, dir=None):
 '''
 删除目录
 :param dir: 目录名称
 :return: 执行结果
 '''
 if not dir: return 'Please input dirname'
 res = copy.deepcopy(self.res)
 try:
 del_d = self._ftp.rmd(dir)
 res['msg'] = del_d
 except Exception as e:
 res['status'] = False
 res['msg'] = str(e)
 return res
 def mkd(self, dir=None):
 '''
 创建目录
 :param dir: 目录名称
 :return: 执行结果
 '''
 if not dir: return 'Please input dirname'
 res = copy.deepcopy(self.res)
 try:
 mkd_d = self._ftp.mkd(dir)
 res['msg'] = mkd_d
 except Exception as e:
 res['status'] = False
 res['msg'] = str(e)
 return res
 def del_file(self, filename=None):
 '''
 删除文件
 :param filename: 文件名称
 :return: 执行结果
 '''
 if not filename: return 'Please input filename'
 res = copy.deepcopy(self.res)
 try:
 del_f = self._ftp.delete(filename)
 res['msg'] = del_f
 except Exception as e:
 res['status'] = False
 res['msg'] = str(e)
 return res
 def get_file_size(self, filenames=[]):
 '''
 获取文件大小,单位是字节
 判断文件类型
 :param filename: 文件名称
 :return: 执行结果
 '''
 if not filenames: return {'msg': 'This is an empty directory'}
 res_l = []
 for file in filenames:
 res_d = {}
 # 如果是目录或者文件不存在就会报错
 try:
 size = self._ftp.size(file)
 type = 'f'
 except:
 # 如果是路径的话size显示 - , file末尾加/ (/dir/)
 size = '-'
 type = 'd'
 file = file + '/'
 res_d['filename'] = file
 res_d['size'] = size
 res_d['type'] = type
 res_l.append(res_d)
 return res_l
 def rename(self, old_name=None, new_name=None):
 '''
 重命名
 :param old_name: 旧的文件或者目录名称
 :param new_name: 新的文件或者目录名称
 :return: 执行结果
 '''
 if not old_name or not new_name: return 'Please input old_name and new_name'
 res = copy.deepcopy(self.res)
 try:
 rename_f = self._ftp.rename(old_name, new_name)
 res['msg'] = rename_f
 except Exception as e:
 res['status'] = False
 res['msg'] = str(e)
 return res
 def close(self):
 '''
 退出ftp连接
 :return:
 '''
 try:
 # 向服务器发送quit命令
 self._ftp.quit()
 except Exception:
 return 'No response from server'
 finally:
 # 客户端单方面关闭连接
 self._ftp.close()
로그인 후 복사

SSH 客户端

此脚本仅用于通过 key 连接,如需要密码连接,简单修改下即可。

# -*- coding: utf-8 -*-
import paramiko
class SSHClient:
 def __init__(self, host, port, user, pkey):
 self.ssh_host = host
 self.ssh_port = port
 self.ssh_user = user
 self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
 self.ssh = None
 self._connect()
 def _connect(self):
 self.ssh = paramiko.SSHClient()
 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 try:
 self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10)
 except:
 return 'ssh connect fail'
 def execute_command(self, command):
 stdin, stdout, stderr = self.ssh.exec_command(command)
 out = stdout.read()
 err = stderr.read()
 return out, err
 def close(self):
 self.ssh.close()
로그인 후 복사

Saltstack 客户端

通过 api 对 Saltstack 服务端进行操作,执行命令。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import requests
import json
import copy
class SaltApi:
 """
 定义salt api接口的类
 初始化获得token
 """
 def __init__(self):
 self.url = "http://172.85.10.21:8000/"
 self.username = "saltapi"
 self.password = "saltapi"
 self.headers = {"Content-type": "application/json"}
 self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None}
 self.login_url = self.url + "login"
 self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'}
 self.token = self.get_data(self.login_url, self.login_params)['token']
 self.headers['X-Auth-Token'] = self.token
 def get_data(self, url, params):
 '''
 请求url获取数据
 :param url: 请求的url地址
 :param params: 传递给url的参数
 :return: 请求的结果
 '''
 send_data = json.dumps(params)
 request = requests.post(url, data=send_data, headers=self.headers)
 response = request.json()
 result = dict(response)
 return result['return'][0]
 def get_auth_keys(self):
 '''
 获取所有已经认证的key
 :return:
 '''
 data = copy.deepcopy(self.params)
 data['client'] = 'wheel'
 data['fun'] = 'key.list_all'
 result = self.get_data(self.url, data)
 try:
 return result['data']['return']['minions']
 except Exception as e:
 return str(e)
 def get_grains(self, tgt, arg='id'):
 """
 获取系统基础信息
 :tgt: 目标主机
 :return:
 """
 data = copy.deepcopy(self.params)
 if tgt:
 data['tgt'] = tgt
 else:
 data['tgt'] = '*'
 data['fun'] = 'grains.item'
 data['arg'] = arg
 result = self.get_data(self.url, data)
 return result
 def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list', salt_async=False):
 """
 执行saltstack 模块命令,类似于salt '*' cmd.run 'command'
 :param tgt: 目标主机
 :param fun: 模块方法 可为空
 :param arg: 传递参数 可为空
 :return: 执行结果
 """
 data = copy.deepcopy(self.params)
 if not tgt: return {'status': False, 'msg': 'target host not exist'}
 if not arg:
 data.pop('arg')
 else:
 data['arg'] = arg
 if tgt != '*':
 data['tgt_type'] = tgt_type
 if salt_async: data['client'] = 'local_async'
 data['fun'] = fun
 data['tgt'] = tgt
 result = self.get_data(self.url, data)
 return result
 def jobs(self, fun='detail', jid=None):
 """
 任务
 :param fun: active, detail
 :param jod: Job ID
 :return: 任务执行结果
 """
 data = {'client': 'runner'}
 data['fun'] = fun
 if fun == 'detail':
 if not jid: return {'success': False, 'msg': 'job id is none'}
 data['fun'] = 'jobs.lookup_jid'
 data['jid'] = jid
 else:
 return {'success': False, 'msg': 'fun is active or detail'}
 result = self.get_data(self.url, data)
 return result
로그인 후 복사

vCenter 客户端

通过官方 SDK 对 vCenter 进行日常操作,此脚本是我用于 cmdb 平台的,自动获取主机信息,存入数据库。

from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL
from pyVmomi import vim
from asset import models
import atexit
class Vmware:
 def __init__(self, ip, user, password, port, idc, vcenter_id):
 self.ip = ip
 self.user = user
 self.password = password
 self.port = port
 self.idc_id = idc
 self.vcenter_id = vcenter_id
 def get_obj(self, content, vimtype, name=None):
 '''
 列表返回,name 可以指定匹配的对象
 '''
 container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True)
 obj = [ view for view in container.view ]
 return obj
 def get_esxi_info(self):
 # 宿主机信息
 esxi_host = {}
 res = {"connect_status": True, "msg": None}
 try:
 # connect this thing
 si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60)
 except Exception as e:
 res['connect_status'] = False
 try:
 res['msg'] = ("%s Caught vmodl fault : " + e.msg) % (self.ip)
 except Exception as e:
 res['msg'] = '%s: connection error' % (self.ip)
 return res
 # disconnect this thing
 atexit.register(Disconnect, si)
 content = si.RetrieveContent()
 esxi_obj = self.get_obj(content, [vim.HostSystem])
 for esxi in esxi_obj:
 esxi_host[esxi.name] = {}
 esxi_host[esxi.name]['idc_id'] = self.idc_id
 esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id
 esxi_host[esxi.name]['server_ip'] = esxi.name
 esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor
 esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model
 for i in esxi.summary.hardware.otherIdentifyingInfo:
 if isinstance(i, vim.host.SystemIdentificationInfo):
 esxi_host[esxi.name]['server_sn'] = i.identifierValue
 # 系统名称
 esxi_host[esxi.name]['system_name'] = esxi.summary.config.product.fullName
 # cpu总核数
 esxi_cpu_total = esxi.summary.hardware.numCpuThreads
 # 内存总量 GB
 esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 / 1024
 # 获取硬盘总量 GB
 esxi_disk_total = 0
 for ds in esxi.datastore:
 esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024
 # 默认配置4核8G100G,根据这个配置计算剩余可分配虚拟机
 default_configure = {
 'cpu': 4,
 'memory': 8,
 'disk': 100
 }
 esxi_host[esxi.name]['vm_host'] = []
 vm_usage_total_cpu = 0
 vm_usage_total_memory = 0
 vm_usage_total_disk = 0
 # 虚拟机信息
 for vm in esxi.vm:
 host_info = {}
 host_info['vm_name'] = vm.name
 host_info['power_status'] = vm.runtime.powerState
 host_info['cpu_total_kernel'] = str(vm.config.hardware.numCPU) + '核'
 host_info['memory_total'] = str(vm.config.hardware.memoryMB) + 'MB'
 host_info['system_info'] = vm.config.guestFullName
 disk_info = ''
 disk_total = 0
 for d in vm.config.hardware.device:
 if isinstance(d, vim.vm.device.VirtualDisk):
 disk_total += d.capacityInKB / 1024 / 1024
 disk_info += d.deviceInfo.label + ": " +str((d.capacityInKB) / 1024 / 1024) + ' GB' + ','
 host_info['disk_info'] = disk_info
 esxi_host[esxi.name]['vm_host'].append(host_info)
 # 计算当前宿主机可用容量:总量 - 已分配的
 if host_info['power_status'] == 'poweredOn':
 vm_usage_total_cpu += vm.config.hardware.numCPU
 vm_usage_total_disk += disk_total
 vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024)
 esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu
 esxi_memory_free = esxi_memory_total - vm_usage_total_memory
 esxi_disk_free = esxi_disk_total - vm_usage_total_disk
 esxi_host[esxi.name]['cpu_info'] = 'Total: %d核, Free: %d核' % (esxi_cpu_total, esxi_cpu_free)
 esxi_host[esxi.name]['memory_info'] = 'Total: %dGB, Free: %dGB' % (esxi_memory_total, esxi_memory_free)
 esxi_host[esxi.name]['disk_info'] = 'Total: %dGB, Free: %dGB' % (esxi_disk_total, esxi_disk_free)
 # 计算cpu 内存 磁盘按照默认资源分配的最小值,即为当前可分配资源
 if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100:
 free_allocation_vm_host = 0
 else:
 free_allocation_vm_host = int(min(
 [
 esxi_cpu_free / default_configure['cpu'],
 esxi_memory_free / default_configure['memory'],
 esxi_disk_free / default_configure['disk']
 ]
 ))
 esxi_host[esxi.name]['free_allocation_vm_host'] = free_allocation_vm_host
 esxi_host['connect_status'] = True
 return esxi_host
 def write_to_db(self):
 esxi_host = self.get_esxi_info()
 # 连接失败
 if not esxi_host['connect_status']:
 return esxi_host
 del esxi_host['connect_status']
 for machine_ip in esxi_host:
 # 物理机信息
 esxi_host_dict = esxi_host[machine_ip]
 # 虚拟机信息
 virtual_host = esxi_host[machine_ip]['vm_host']
 del esxi_host[machine_ip]['vm_host']
 obj = models.EsxiHost.objects.create(**esxi_host_dict)
 obj.save()
 for host_info in virtual_host:
 host_info['management_host_id'] = obj.id
 obj2 = models.virtualHost.objects.create(**host_info)
 obj2.save()
로그인 후 복사

获取域名 ssl 证书过期时间

用于 zabbix 告警

import re
import sys
import time
import subprocess
from datetime import datetime
from io import StringIO
def main(domain):
 f = StringIO()
 comm = f"curl -Ivs https://{domain} --connect-timeout 10"
 result = subprocess.getstatusoutput(comm)
 f.write(result[1])
 try:
 m = re.search('start date: (.*?)n.*?expire date: (.*?)n.*?common name: (.*?)n.*?issuer: CN=(.*?)n', f.getvalue(), re.S)
 start_date = m.group(1)
 expire_date = m.group(2)
 common_name = m.group(3)
 issuer = m.group(4)
 except Exception as e:
 return 999999999
 # time 字符串转时间数组
 start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
 start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
 # datetime 字符串转时间数组
 expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
 expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")
 # 剩余天数
 remaining = (expire_date-datetime.now()).days
return remaining
if __name__ == "__main__":
domain = sys.argv[1]
 remaining_days = main(domain)
 print(remaining_days)
로그인 후 복사

发送今天的天气预报以及未来的天气趋势图

슈퍼 하드코어! 11가지 매우 실용적인 Python 및 Shell 스크립트 예제!

此脚本用于给老婆大人发送今天的天气预报以及未来的天气趋势图,现在微信把网页端禁止了,没法发送到微信了,我是通过企业微信进行通知的,需要把你老婆大人拉到企业微信,无兴趣的小伙伴跳过即可。

# -*- coding: utf-8 -*-
 import requests
 import json
 import datetime
 def weather(city):
 url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city
 try:
 data = requests.get(url).json()['data']
 city = data['city']
 ganmao = data['ganmao']
 today_weather = data['forecast'][0]
 res = "老婆今天是{}n今天天气概况n城市: {:<10}n时间: {:<10}n高温: {:<10}n低温: {:<10}n风力: {:<10}n风向: {:<10}n天气: {:<10}nn稍后会发送近期温度趋势图,请注意查看。
 ".format(
 ganmao,
 city,
 datetime.datetime.now().strftime('%Y-%m-%d'),
 today_weather['high'].split()[1],
 today_weather['low'].split()[1],
 today_weather['fengli'].split('[')[2].split(']')[0],
 today_weather['fengxiang'],today_weather['type'],
 )
 return {"source_data": data, "res": res}
 except Exception as e:
 return str(e)
 ```
 + 获取天气预报趋势图
 ```python
 # -*- coding: utf-8 -*-
 import matplotlib.pyplot as plt
 import re
 import datetime
 def Future_weather_states(forecast, save_path, day_num=5):
 '''
 展示未来的天气预报趋势图
 :param forecast: 天气预报预测的数据
 :param day_num: 未来几天
 :return: 趋势图
 '''
 future_forecast = forecast
 dict={}
 for i in range(day_num):
 data = []
 date = future_forecast[i]["date"]
 date = int(re.findall("d+",date)[0])
 data.append(int(re.findall("d+", future_forecast[i]["high"])[0]))
 data.append(int(re.findall("d+", future_forecast[i]["low"])[0]))
 data.append(future_forecast[i]["type"])
 dict[date] = data
 data_list = sorted(dict.items())
 date=[]
 high_temperature = []
 low_temperature = []
 for each in data_list:
 date.append(each[0])
 high_temperature.append(each[1][0])
 low_temperature.append(each[1][1])
 fig = plt.plot(date,high_temperature,"r",date,low_temperature,"b")
 current_date = datetime.datetime.now().strftime('%Y-%m')
 plt.rcParams['font.sans-serif'] = ['SimHei']
 plt.rcParams['axes.unicode_minus'] = False
 plt.xlabel(current_date)
 plt.ylabel("℃")
 plt.legend(["高温", "低温"])
 plt.xticks(date)
 plt.title("最近几天温度变化趋势")
 plt.savefig(save_path)
 ```
 + 发送到企业微信
 ```python
 # -*- coding: utf-8 -*-
 import requests
 import json
 class DLF:
 def __init__(self, corpid, corpsecret):
 self.url = "https://qyapi.weixin.qq.com/cgi-bin"
 self.corpid = corpid
 self.corpsecret = corpsecret
 self._token = self._get_token()
 def _get_token(self):
 '''
 获取企业微信API接口的access_token
 :return:
 '''
 token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
 try:
 res = requests.get(token_url).json()
 token = res['access_token']
 return token
 except Exception as e:
 return str(e)
 def _get_media_id(self, file_obj):
 get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
 data = {"media": file_obj}
 try:
 res = requests.post(url=get_media_url, files=data)
 media_id = res.json()['media_id']
 return media_id
 except Exception as e:
 return str(e)
 def send_text(self, agentid, content, touser=None, toparty=None):
 send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
 send_data = {
 "touser": touser,
 "toparty": toparty,
 "msgtype": "text",
 "agentid": agentid,
 "text": {
 "content": content
 }
 }
 try:
 res = requests.post(send_msg_url, data=json.dumps(send_data))
 except Exception as e:
 return str(e)
 def send_image(self, agentid, file_obj, touser=None, toparty=None):
 media_id = self._get_media_id(file_obj)
 send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
 send_data = {
 "touser": touser,
 "toparty": toparty,
 "msgtype": "image",
 "agentid": agentid,
 "image": {
 "media_id": media_id
}
 }
 try:
 res = requests.post(send_msg_url, data=json.dumps(send_data))
 except Exception as e:
 return str(e)
+ main脚本
# -*- coding: utf-8 -*-
from plugins.weather_forecast import weather
from plugins.trend_chart import Future_weather_states
from plugins.send_wechat import DLF
import os
# 企业微信相关信息
corpid = "xxx"
corpsecret = "xxx"
agentid = "xxx"
# 天气预报趋势图保存路径
_path = os.path.dirname(os.path.abspath(__file__))
save_path = os.path.join(_path ,'./tmp/weather_forecast.jpg')
# 获取天气预报信息
content = weather("大兴")
# 发送文字消息
dlf = DLF(corpid, corpsecret)
dlf.send_text(agentid=agentid, content=content['res'], toparty='1')
# 生成天气预报趋势图
Future_weather_states(content['source_data']['forecast'], save_path)
# 发送图片消息
file_obj = open(save_path, 'rb')
dlf.send_image(agentid=agentid, toparty='1', file_obj=file_obj)
로그인 후 복사

Shell 脚本部分

SVN 完整备份

通过 hotcopy 进行 SVN 完整备份,备份保留 7 天。

#!/bin/bash
# Filename :svn_backup_repos.sh
# Date :2020/12/14
# Author :JakeTian
# Email:JakeTian@***.com
# Crontab:59 23 * * * /bin/bash $BASE_PATH/svn_backup_repos.sh >/dev/null 2>&1
# Notes:将脚本加入crontab中,每天定时执行
# Description:SVN完全备份
set -e
SRC_PATH="/opt/svndata"
DST_PATH="/data/svnbackup"
LOG_FILE="$DST_PATH/logs/svn_backup.log"
SVN_BACKUP_C="/bin/svnadmin hotcopy"
SVN_LOOK_C="/bin/svnlook youngest"
TODAY=$(date +'%F')
cd $SRC_PATH
ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name 'httpd' -a ! -name 'bak' | tr -d './')
# 创建备份目录,备份脚本日志目录
test -d $DST_PATH || mkdir -p $DST_PATH
test -d $DST_PATH/logs || mkdir $DST_PATH/logs
test -d $DST_PATH/$TODAY || mkdir $DST_PATH/$TODAY
# 备份repos文件
for repo in $ALL_REPOS
do
 $SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo
 # 判断备份是否完成
 if $SVN_LOOK_C $DST_PATH/$TODAY/$repo;then
echo "$TODAY: $repo Backup Success" >> $LOG_FILE
 else
 echo "$TODAY: $repo Backup Fail" >> $LOG_FILE
 fi
done
# # 备份用户密码文件和权限文件
cp -p authz access.conf $DST_PATH/$TODAY
# 日志文件转储
mv $LOG_FILE $LOG_FILE-$TODAY
# 删除七天前的备份
seven_days_ago=$(date -d "7 days ago" +'%F')
rm -rf $DST_PATH/$seven_days_ago
로그인 후 복사

zabbix 监控用户密码过期

用于 Zabbix 监控 Linux 系统用户(shell 为 /bin/bash 和 /bin/sh)密码过期,密码有效期剩余 7 天触发加自动发现用户。

#!/bin/bash
diskarray=(`awk -F':' '$NF ~ //bin/bash/||//bin/sh/{print $1}' /etc/passwd`)
length=${#diskarray[@]}
printf "{n"
printf't'""data":["
for ((i=0;i<$length;i++))
do
 printf 'ntt{'
 printf ""{#USER_NAME}":"${diskarray[$i]}"}"
 if [ $i -lt $[$length-1] ];then
 printf ','
 fi
done
printf"nt]n"
printf "}n"
检查用户密码过期
#!/bin/bash
export LANG=en_US.UTF-8
SEVEN_DAYS_AGO=$(date -d '-7 day' +'%s')
user="$1"
# 将Sep 09, 2018格式的时间转换成unix时间
expires_date=$(sudo chage -l $user | awk -F':' '/Password expires/{print $NF}' | sed -n 's/^ //p')
if [[ "$expires_date" != "never" ]];then
 expires_date=$(date -d "$expires_date" +'%s')
 if [ "$expires_date" -le "$SEVEN_DAYS_AGO" ];then
 echo "1"
 else
 echo "0"
 fi
else
 echo "0"
fi
로그인 후 복사

构建本地YUM

通过 rsync 的方式同步 yum,通过 nginx 只做 http yum 站点;

但是 centos6 的镜像最近都不能用了,国内貌似都禁用了,如果找到合适的自行更换地址。

#!/bin/bash
# 更新yum镜像
RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000"
DIR="/app/yumData"
LogDir="$DIR/logs"
Centos6Base="$DIR/Centos6/x86_64/Base"
Centos7Base="$DIR/Centos7/x86_64/Base"
Centos6Epel="$DIR/Centos6/x86_64/Epel"
Centos7Epel="$DIR/Centos7/x86_64/Epel"
Centos6Salt="슈퍼 하드코어! 11가지 매우 실용적인 Python 및 Shell 스크립트 예제!"
Centos7Salt="$DIR/Centos7/x86_64/Salt"
Centos6Update="$DIR/Centos6/x86_64/Update"
Centos7Update="$DIR/Centos7/x86_64/Update"
Centos6Docker="$DIR/Centos6/x86_64/Docker"
Centos7Docker="$DIR/Centos7/x86_64/Docker"
Centos6Mysql5_7="$DIR/Centos6/x86_64/Mysql/Mysql5.7"
Centos7Mysql5_7="$DIR/Centos7/x86_64/Mysql/Mysql5.7"
Centos6Mysql8_0="$DIR/Centos6/x86_64/Mysql/Mysql8.0"
Centos7Mysql8_0="$DIR/Centos7/x86_64/Mysql/Mysql8.0"
MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn"
# 目录不存在就创建
check_dir(){
 for dir in $*
 do
 test -d $dir || mkdir -p $dir
 done
}
# 检查rsync同步结果
check_rsync_status(){
 if [ $? -eq 0 ];then
 echo "rsync success" >> $1
 else
 echo "rsync fail" >> $1
 fi
}
check_dir $DIR $LogDir $Centos6Base $Centos7Base $Centos6Epel $Centos7Epel $Centos6Salt $Centos7Salt $Centos6Update $Centos7Update $Centos6Docker $Centos7Docker $Centos6Mysql5_7 $Centos7Mysql5_7 $Centos6Mysql8_0 $Centos7Mysql8_0
# Base yumrepo
#$RsyncCommand "$MirrorDomain"/repo/centos/6/os/x86_64/ $Centos6Base >> "$LogDir/centos6Base.log" 2>&1
# check_rsync_status "$LogDir/centos6Base.log"
$RsyncCommand "$MirrorDomain"/repo/centos/7/os/x86_64/ $Centos7Base >> "$LogDir/centos7Base.log" 2>&1
check_rsync_status "$LogDir/centos7Base.log"
# Epel yumrepo
# $RsyncCommand "$MirrorDomain"/repo/epel/6/x86_64/ $Centos6Epel >> "$LogDir/centos6Epel.log" 2>&1
# check_rsync_status "$LogDir/centos6Epel.log"
$RsyncCommand "$MirrorDomain"/repo/epel/7/x86_64/ $Centos7Epel >> "$LogDir/centos7Epel.log" 2>&1
check_rsync_status "$LogDir/centos7Epel.log"
# SaltStack yumrepo
# $RsyncCommand "$MirrorDomain"/repo/salt/yum/redhat/6/x86_64/ $Centos6Salt >> "$LogDir/centos6Salt.log" 2>&1
# ln -s $Centos6Salt/archive/$(ls $Centos6Salt/archive | tail -1) $Centos6Salt/latest
# check_rsync_status "$LogDir/centos6Salt.log"
$RsyncComman "$MirrorDomain"/repo/salt/yum/redhat/7/x86_64/ $Centos7Salt >> "$LogDir/centos7Salt.log" 2>&1
check_rsync_status "$LogDir/centos7Salt.log"
# ln -s $Centos7Salt/archive/$(ls $Centos7Salt/archive | tail -1) $Centos7Salt/latest
# Docker yumrepo
$RsyncCommand "$MirrorDomain"/repo/docker-ce/linux/centos/7/x86_64/stable/ $Centos7Docker >> "$LogDir/centos7Docker.log" 2>&1
check_rsync_status "$LogDir/centos7Docker.log"
# centos update yumrepo
# $RsyncCommand "$MirrorDomain"/repo/centos/6/updates/x86_64/ $Centos6Update >> "$LogDir/centos6Update.log" 2>&1
# check_rsync_status "$LogDir/centos6Update.log"
$RsyncCommand "$MirrorDomain"/repo/centos/7/updates/x86_64/ $Centos7Update >> "$LogDir/centos7Update.log" 2>&1
check_rsync_status "$LogDir/centos7Update.log"
# mysql 5.7 yumrepo
# $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ "$Centos6Mysql5_7" >> "$LogDir/centos6Mysql5.7.log" 2>&1
# check_rsync_status "$LogDir/centos6Mysql5.7.log"
$RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ "$Centos7Mysql5_7" >> "$LogDir/centos7Mysql5.7.log" 2>&1
check_rsync_status "$LogDir/centos7Mysql5.7.log"
# mysql 8.0 yumrepo
# $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ "$Centos6Mysql8_0" >> "$LogDir/centos6Mysql8.0.log" 2>&1
# check_rsync_status "$LogDir/centos6Mysql8.0.log"
$RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ "$Centos7Mysql8_0" >> "$LogDir/centos7Mysql8.0.log" 2>&1
check_rsync_status "$LogDir/centos7Mysql8.0.log"
로그인 후 복사

读者需求解答

负载高时,查出占用比较高的进程脚本并存储或推送通知

这部分内容是上篇 Shell 脚本实例中底部读者留言的需求,如下:

#!/bin/bash
# 物理cpu个数
physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort | uniq | wc -l)
# 单个物理cpu核数
physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $NF}')
# 总核数
total_cpu_cores=$((physical_cpu_count*physical_cpu_cores))
# 分别是一分钟、五分钟、十五分钟负载的阈值,其中有一项超过阈值才会触发
one_min_load_threshold="$total_cpu_cores"
five_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.8"}')
fifteen_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.7"}')
# 分别是分钟、五分钟、十五分钟负载平均值
one_min_load=$(uptime | awk '{print $(NF-2)}' | tr -d ',')
five_min_load=$(uptime | awk '{print $(NF-1)}' | tr -d ',')
fifteen_min_load=$(uptime | awk '{print $NF}' | tr -d ',')
# 获取当前cpu 内存 磁盘io信息,并写入日志文件
# 如果需要发送消息或者调用其他,请自行编写函数即可
get_info(){
 log_dir="cpu_high_script_log"
 test -d "$log_dir" || mkdir "$log_dir"
 ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > "$log_dir"/cpu_top10.log
 ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > "$log_dir"/mem_top10.log
 iostat -dx 1 10 > "$log_dir"/disk_io_10.log
}
export -f get_info
echo "$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold" | 
awk '{ if ($1>=$2 || $3>=$4 || $5>=$6) system("get_info") }'
로그인 후 복사

以上,就是今天分享的全部内容了。

希望大家通过这些案例能够学以致用,结合自身的实际场景进行运用,从而提高自己的工作效率。

위 내용은 슈퍼 하드코어! 11가지 매우 실용적인 Python 및 Shell 스크립트 예제!의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

AI Hentai Generator

AI Hentai Generator

AI Hentai를 무료로 생성하십시오.

뜨거운 도구

메모장++7.3.1

메모장++7.3.1

사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전

SublimeText3 중국어 버전

중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경

드림위버 CS6

드림위버 CS6

시각적 웹 개발 도구

SublimeText3 Mac 버전

SublimeText3 Mac 버전

신 수준의 코드 편집 소프트웨어(SublimeText3)

Python 3.6에 피클 파일을로드 할 때 '__builtin__'모듈을 찾을 수없는 경우 어떻게해야합니까? Python 3.6에 피클 파일을로드 할 때 '__builtin__'모듈을 찾을 수없는 경우 어떻게해야합니까? Apr 02, 2025 am 07:12 AM

Python 3.6에 피클 파일로드 3.6 환경 보고서 오류 : modulenotfounderror : nomodulename ...

데비안 문자열은 여러 브라우저와 호환됩니다 데비안 문자열은 여러 브라우저와 호환됩니다 Apr 02, 2025 am 08:30 AM

"Debiantrings"는 표준 용어가 아니며 구체적인 의미는 여전히 불분명합니다. 이 기사는 브라우저 호환성에 직접 언급 할 수 없습니다. 그러나 "Debiantrings"가 Debian 시스템에서 실행되는 웹 응용 프로그램을 지칭하는 경우 브라우저 호환성은 응용 프로그램 자체의 기술 아키텍처에 따라 다릅니다. 대부분의 최신 웹 응용 프로그램은 크로스 브라우저 호환성에 전념합니다. 이는 웹 표준에 따라 웹 표준과 잘 호환 가능한 프론트 엔드 기술 (예 : HTML, CSS, JavaScript) 및 백엔드 기술 (PHP, Python, Node.js 등)을 사용하는 데 의존합니다. 응용 프로그램이 여러 브라우저와 호환되도록 개발자는 종종 브라우저 크로스 테스트를 수행하고 응답 성을 사용해야합니다.

XML 수정에 프로그래밍이 필요합니까? XML 수정에 프로그래밍이 필요합니까? Apr 02, 2025 pm 06:51 PM

XML 컨텐츠를 수정하려면 프로그래밍이 필요합니다. 대상 노드를 추가, 삭제, 수정 및 확인하려면 정확한 찾기가 필요하기 때문입니다. 프로그래밍 언어에는 XML을 처리하기위한 해당 라이브러리가 있으며 운영 데이터베이스와 같이 안전하고 효율적이며 제어 가능한 작업을 수행 할 수있는 API를 제공합니다.

휴대폰에서 XML을 PDF로 변환 할 때 변환 속도가 빠르나요? 휴대폰에서 XML을 PDF로 변환 할 때 변환 속도가 빠르나요? Apr 02, 2025 pm 10:09 PM

모바일 XML에서 PDF의 속도는 다음 요인에 따라 다릅니다. XML 구조의 복잡성. 모바일 하드웨어 구성 변환 방법 (라이브러리, 알고리즘) 코드 품질 최적화 방법 (효율적인 라이브러리 선택, 알고리즘 최적화, 캐시 데이터 및 다중 스레딩 사용). 전반적으로 절대적인 답변은 없으며 특정 상황에 따라 최적화해야합니다.

XML에서 댓글 내용을 수정하는 방법 XML에서 댓글 내용을 수정하는 방법 Apr 02, 2025 pm 06:15 PM

작은 XML 파일의 경우 주석 내용을 텍스트 편집기로 직접 교체 할 수 있습니다. 큰 파일의 경우 XML 파서를 사용하여 효율성과 정확성을 보장하기 위해 수정하는 것이 좋습니다. XML 주석을 삭제할 때주의를 기울이면 주석을 유지하면 일반적으로 코드 이해 및 유지 관리에 도움이됩니다. 고급 팁은 XML 파서를 사용하여 댓글을 수정하기위한 파이썬 샘플 코드를 제공하지만 사용 된 XML 라이브러리에 따라 특정 구현을 조정해야합니다. XML 파일을 수정할 때 인코딩 문제에주의하십시오. UTF-8 인코딩을 사용하고 인코딩 형식을 지정하는 것이 좋습니다.

XML을 PDF로 변환 할 수있는 모바일 앱이 있습니까? XML을 PDF로 변환 할 수있는 모바일 앱이 있습니까? Apr 02, 2025 pm 08:54 PM

XML을 PDF로 직접 변환하는 응용 프로그램은 근본적으로 다른 두 형식이므로 찾을 수 없습니다. XML은 데이터를 저장하는 데 사용되는 반면 PDF는 문서를 표시하는 데 사용됩니다. 변환을 완료하려면 Python 및 ReportLab과 같은 프로그래밍 언어 및 라이브러리를 사용하여 XML 데이터를 구문 분석하고 PDF 문서를 생성 할 수 있습니다.

protobuf 및 연결 문자열 상수에서 열거 유형을 정의하는 방법은 무엇입니까? protobuf 및 연결 문자열 상수에서 열거 유형을 정의하는 방법은 무엇입니까? Apr 02, 2025 pm 03:36 PM

protobuf에서 문자열 상수 열거를 정의하는 문제 protobuf를 사용할 때 종종 열거 유형을 문자열 상수와 연관시켜야하는 상황이 발생합니다 ...

이미지로 변환 된 XML의 크기를 제어하는 ​​방법은 무엇입니까? 이미지로 변환 된 XML의 크기를 제어하는 ​​방법은 무엇입니까? Apr 02, 2025 pm 07:24 PM

XML을 통해 이미지를 생성하려면 XML에서 메타 데이터 (크기, 색상)를 기반으로 이미지를 생성하기 위해 브리지로 그래프 라이브러리 (예 : Pillow 및 JFreeChart)를 사용해야합니다. 이미지의 크기를 제어하는 ​​열쇠는 & lt; width & gt의 값을 조정하는 것입니다. 및 & lt; 높이 & gt; XML의 태그. 그러나 실제 애플리케이션에서 XML 구조의 복잡성, 그래프 드로잉의 편향, 이미지 생성 속도 및 메모리 소비 및 이미지 형식 선택은 모두 생성 된 이미지 크기에 영향을 미칩니다. 따라서 그래픽 라이브러리에 능숙한 XML 구조에 대한 깊은 이해가 필요하고 최적화 알고리즘 및 이미지 형식 선택과 같은 요소를 고려해야합니다.

See all articles