


Super hardcore ! 11 exemples de scripts Python et Shell très pratiques !
Python 脚本部分实例:企业微信告警、FTP 客户端、SSH 客户端、Saltstack 客户端、vCenter 客户端、获取域名 ssl 证书过期时间、发送今天的天气预报以及未来的天气趋势图;
Shell 脚本部分实例:SVN 完整备份、Zabbix 监控用户密码过期、构建本地 YUM 以及上篇文章中有读者的需求(负载高时,查出占用比较高的进程脚本并存储或推送通知);
篇幅有些长,还请大家耐心翻到文末,毕竟有彩蛋。
Python 脚本部分
企业微信告警
此脚本通过企业微信应用,进行微信告警,可用于 Zabbix 监控。
# -*- coding: utf-8 -*- import requests import json class DLF: def __init__(self, corpid, corpsecret): self.url = "https://qyapi.weixin.qq.com/cgi-bin" self.corpid = corpid self.corpsecret = corpsecret self._token = self._get_token() def _get_token(self): ''' 获取企业微信API接口的access_token :return: ''' token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret) try: res = requests.get(token_url).json() token = res['access_token'] return token except Exception as e: return str(e) def _get_media_id(self, file_obj): get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token) data = {"media": file_obj} try: res = requests.post(url=get_media_url, files=data) media_id = res.json()['media_id'] return media_id except Exception as e: return str(e) def send_text(self, agentid, content, touser=None, toparty=None): send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser": touser, "toparty": toparty, "msgtype": "text", "agentid": agentid, "text": { "content": content } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) def send_image(self, agentid, file_obj, touser=None, toparty=None): media_id = self._get_media_id(file_obj) send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser": touser, "toparty": toparty, "msgtype": "image", "agentid": agentid, "image": { "media_id": media_id } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e)
FTP 客户端
通过 ftplib 模块操作 ftp 服务器,进行上传下载等操作。
# -*- coding: utf-8 -*- from ftplib import FTP from os import path import copy class FTPClient: def __init__(self, host, user, passwd, port=21): self.host = host self.user = user self.passwd = passwd self.port = port self.res = {'status': True, 'msg': None} self._ftp = None self._login() def _login(self): ''' 登录FTP服务器 :return: 连接或登录出现异常时返回错误信息 ''' try: self._ftp = FTP() self._ftp.connect(self.host, self.port, timeout=30) self._ftp.login(self.user, self.passwd) except Exception as e: return e def upload(self, localpath, remotepath=None): ''' 上传ftp文件 :param localpath: local file path :param remotepath: remote file path :return: ''' if not localpath: return 'Please select a local file. ' # 读取本地文件 # fp = open(localpath, 'rb') # 如果未传递远程文件路径,则上传到当前目录,文件名称同本地文件 if not remotepath: remotepath = path.basename(localpath) # 上传文件 self._ftp.storbinary('STOR ' + remotepath, localpath) # fp.close() def download(self, remotepath, localpath=None): ''' localpath :param localpath: local file path :param remotepath: remote file path :return: ''' if not remotepath: return 'Please select a remote file. ' # 如果未传递本地文件路径,则下载到当前目录,文件名称同远程文件 if not localpath: localpath = path.basename(remotepath) # 如果localpath是目录的话就和remotepath的basename拼接 if path.isdir(localpath): localpath = path.join(localpath, path.basename(remotepath)) # 写入本地文件 fp = open(localpath, 'wb') # 下载文件 self._ftp.retrbinary('RETR ' + remotepath, fp.write) fp.close() def nlst(self, dir='/'): ''' 查看目录下的内容 :return: 以列表形式返回目录下的所有内容 ''' files_list = self._ftp.nlst(dir) return files_list def rmd(self, dir=None): ''' 删除目录 :param dir: 目录名称 :return: 执行结果 ''' if not dir: return 'Please input dirname' res = copy.deepcopy(self.res) try: del_d = self._ftp.rmd(dir) res['msg'] = del_d except Exception as e: res['status'] = False res['msg'] = str(e) return res def mkd(self, dir=None): ''' 创建目录 :param dir: 目录名称 :return: 执行结果 ''' if not dir: return 'Please input dirname' res = copy.deepcopy(self.res) try: mkd_d = self._ftp.mkd(dir) res['msg'] = mkd_d except Exception as e: res['status'] = False res['msg'] = str(e) return res def del_file(self, filename=None): ''' 删除文件 :param filename: 文件名称 :return: 执行结果 ''' if not filename: return 'Please input filename' res = copy.deepcopy(self.res) try: del_f = self._ftp.delete(filename) res['msg'] = del_f except Exception as e: res['status'] = False res['msg'] = str(e) return res def get_file_size(self, filenames=[]): ''' 获取文件大小,单位是字节 判断文件类型 :param filename: 文件名称 :return: 执行结果 ''' if not filenames: return {'msg': 'This is an empty directory'} res_l = [] for file in filenames: res_d = {} # 如果是目录或者文件不存在就会报错 try: size = self._ftp.size(file) type = 'f' except: # 如果是路径的话size显示 - , file末尾加/ (/dir/) size = '-' type = 'd' file = file + '/' res_d['filename'] = file res_d['size'] = size res_d['type'] = type res_l.append(res_d) return res_l def rename(self, old_name=None, new_name=None): ''' 重命名 :param old_name: 旧的文件或者目录名称 :param new_name: 新的文件或者目录名称 :return: 执行结果 ''' if not old_name or not new_name: return 'Please input old_name and new_name' res = copy.deepcopy(self.res) try: rename_f = self._ftp.rename(old_name, new_name) res['msg'] = rename_f except Exception as e: res['status'] = False res['msg'] = str(e) return res def close(self): ''' 退出ftp连接 :return: ''' try: # 向服务器发送quit命令 self._ftp.quit() except Exception: return 'No response from server' finally: # 客户端单方面关闭连接 self._ftp.close()
SSH 客户端
此脚本仅用于通过 key 连接,如需要密码连接,简单修改下即可。
# -*- coding: utf-8 -*- import paramiko class SSHClient: def __init__(self, host, port, user, pkey): self.ssh_host = host self.ssh_port = port self.ssh_user = user self.private_key = paramiko.RSAKey.from_private_key_file(pkey) self.ssh = None self._connect() def _connect(self): self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10) except: return 'ssh connect fail' def execute_command(self, command): stdin, stdout, stderr = self.ssh.exec_command(command) out = stdout.read() err = stderr.read() return out, err def close(self): self.ssh.close()
Saltstack 客户端
通过 api 对 Saltstack 服务端进行操作,执行命令。
#!/usr/bin/env python # -*- coding:utf-8 -*- import requests import json import copy class SaltApi: """ 定义salt api接口的类 初始化获得token """ def __init__(self): self.url = "http://172.85.10.21:8000/" self.username = "saltapi" self.password = "saltapi" self.headers = {"Content-type": "application/json"} self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None} self.login_url = self.url + "login" self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'} self.token = self.get_data(self.login_url, self.login_params)['token'] self.headers['X-Auth-Token'] = self.token def get_data(self, url, params): ''' 请求url获取数据 :param url: 请求的url地址 :param params: 传递给url的参数 :return: 请求的结果 ''' send_data = json.dumps(params) request = requests.post(url, data=send_data, headers=self.headers) response = request.json() result = dict(response) return result['return'][0] def get_auth_keys(self): ''' 获取所有已经认证的key :return: ''' data = copy.deepcopy(self.params) data['client'] = 'wheel' data['fun'] = 'key.list_all' result = self.get_data(self.url, data) try: return result['data']['return']['minions'] except Exception as e: return str(e) def get_grains(self, tgt, arg='id'): """ 获取系统基础信息 :tgt: 目标主机 :return: """ data = copy.deepcopy(self.params) if tgt: data['tgt'] = tgt else: data['tgt'] = '*' data['fun'] = 'grains.item' data['arg'] = arg result = self.get_data(self.url, data) return result def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list', salt_async=False): """ 执行saltstack 模块命令,类似于salt '*' cmd.run 'command' :param tgt: 目标主机 :param fun: 模块方法 可为空 :param arg: 传递参数 可为空 :return: 执行结果 """ data = copy.deepcopy(self.params) if not tgt: return {'status': False, 'msg': 'target host not exist'} if not arg: data.pop('arg') else: data['arg'] = arg if tgt != '*': data['tgt_type'] = tgt_type if salt_async: data['client'] = 'local_async' data['fun'] = fun data['tgt'] = tgt result = self.get_data(self.url, data) return result def jobs(self, fun='detail', jid=None): """ 任务 :param fun: active, detail :param jod: Job ID :return: 任务执行结果 """ data = {'client': 'runner'} data['fun'] = fun if fun == 'detail': if not jid: return {'success': False, 'msg': 'job id is none'} data['fun'] = 'jobs.lookup_jid' data['jid'] = jid else: return {'success': False, 'msg': 'fun is active or detail'} result = self.get_data(self.url, data) return result
vCenter 客户端
通过官方 SDK 对 vCenter 进行日常操作,此脚本是我用于 cmdb 平台的,自动获取主机信息,存入数据库。
from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL from pyVmomi import vim from asset import models import atexit class Vmware: def __init__(self, ip, user, password, port, idc, vcenter_id): self.ip = ip self.user = user self.password = password self.port = port self.idc_id = idc self.vcenter_id = vcenter_id def get_obj(self, content, vimtype, name=None): ''' 列表返回,name 可以指定匹配的对象 ''' container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True) obj = [ view for view in container.view ] return obj def get_esxi_info(self): # 宿主机信息 esxi_host = {} res = {"connect_status": True, "msg": None} try: # connect this thing si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60) except Exception as e: res['connect_status'] = False try: res['msg'] = ("%s Caught vmodl fault : " + e.msg) % (self.ip) except Exception as e: res['msg'] = '%s: connection error' % (self.ip) return res # disconnect this thing atexit.register(Disconnect, si) content = si.RetrieveContent() esxi_obj = self.get_obj(content, [vim.HostSystem]) for esxi in esxi_obj: esxi_host[esxi.name] = {} esxi_host[esxi.name]['idc_id'] = self.idc_id esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id esxi_host[esxi.name]['server_ip'] = esxi.name esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model for i in esxi.summary.hardware.otherIdentifyingInfo: if isinstance(i, vim.host.SystemIdentificationInfo): esxi_host[esxi.name]['server_sn'] = i.identifierValue # 系统名称 esxi_host[esxi.name]['system_name'] = esxi.summary.config.product.fullName # cpu总核数 esxi_cpu_total = esxi.summary.hardware.numCpuThreads # 内存总量 GB esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 / 1024 # 获取硬盘总量 GB esxi_disk_total = 0 for ds in esxi.datastore: esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024 # 默认配置4核8G100G,根据这个配置计算剩余可分配虚拟机 default_configure = { 'cpu': 4, 'memory': 8, 'disk': 100 } esxi_host[esxi.name]['vm_host'] = [] vm_usage_total_cpu = 0 vm_usage_total_memory = 0 vm_usage_total_disk = 0 # 虚拟机信息 for vm in esxi.vm: host_info = {} host_info['vm_name'] = vm.name host_info['power_status'] = vm.runtime.powerState host_info['cpu_total_kernel'] = str(vm.config.hardware.numCPU) + '核' host_info['memory_total'] = str(vm.config.hardware.memoryMB) + 'MB' host_info['system_info'] = vm.config.guestFullName disk_info = '' disk_total = 0 for d in vm.config.hardware.device: if isinstance(d, vim.vm.device.VirtualDisk): disk_total += d.capacityInKB / 1024 / 1024 disk_info += d.deviceInfo.label + ": " +str((d.capacityInKB) / 1024 / 1024) + ' GB' + ',' host_info['disk_info'] = disk_info esxi_host[esxi.name]['vm_host'].append(host_info) # 计算当前宿主机可用容量:总量 - 已分配的 if host_info['power_status'] == 'poweredOn': vm_usage_total_cpu += vm.config.hardware.numCPU vm_usage_total_disk += disk_total vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024) esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu esxi_memory_free = esxi_memory_total - vm_usage_total_memory esxi_disk_free = esxi_disk_total - vm_usage_total_disk esxi_host[esxi.name]['cpu_info'] = 'Total: %d核, Free: %d核' % (esxi_cpu_total, esxi_cpu_free) esxi_host[esxi.name]['memory_info'] = 'Total: %dGB, Free: %dGB' % (esxi_memory_total, esxi_memory_free) esxi_host[esxi.name]['disk_info'] = 'Total: %dGB, Free: %dGB' % (esxi_disk_total, esxi_disk_free) # 计算cpu 内存 磁盘按照默认资源分配的最小值,即为当前可分配资源 if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100: free_allocation_vm_host = 0 else: free_allocation_vm_host = int(min( [ esxi_cpu_free / default_configure['cpu'], esxi_memory_free / default_configure['memory'], esxi_disk_free / default_configure['disk'] ] )) esxi_host[esxi.name]['free_allocation_vm_host'] = free_allocation_vm_host esxi_host['connect_status'] = True return esxi_host def write_to_db(self): esxi_host = self.get_esxi_info() # 连接失败 if not esxi_host['connect_status']: return esxi_host del esxi_host['connect_status'] for machine_ip in esxi_host: # 物理机信息 esxi_host_dict = esxi_host[machine_ip] # 虚拟机信息 virtual_host = esxi_host[machine_ip]['vm_host'] del esxi_host[machine_ip]['vm_host'] obj = models.EsxiHost.objects.create(**esxi_host_dict) obj.save() for host_info in virtual_host: host_info['management_host_id'] = obj.id obj2 = models.virtualHost.objects.create(**host_info) obj2.save()
获取域名 ssl 证书过期时间
用于 zabbix 告警
import re import sys import time import subprocess from datetime import datetime from io import StringIO def main(domain): f = StringIO() comm = f"curl -Ivs https://{domain} --connect-timeout 10" result = subprocess.getstatusoutput(comm) f.write(result[1]) try: m = re.search('start date: (.*?)n.*?expire date: (.*?)n.*?common name: (.*?)n.*?issuer: CN=(.*?)n', f.getvalue(), re.S) start_date = m.group(1) expire_date = m.group(2) common_name = m.group(3) issuer = m.group(4) except Exception as e: return 999999999 # time 字符串转时间数组 start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT") start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date) # datetime 字符串转时间数组 expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT") expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S") # 剩余天数 remaining = (expire_date-datetime.now()).days return remaining if __name__ == "__main__": domain = sys.argv[1] remaining_days = main(domain) print(remaining_days)
发送今天的天气预报以及未来的天气趋势图
此脚本用于给老婆大人发送今天的天气预报以及未来的天气趋势图,现在微信把网页端禁止了,没法发送到微信了,我是通过企业微信进行通知的,需要把你老婆大人拉到企业微信,无兴趣的小伙伴跳过即可。
# -*- coding: utf-8 -*- import requests import json import datetime def weather(city): url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city try: data = requests.get(url).json()['data'] city = data['city'] ganmao = data['ganmao'] today_weather = data['forecast'][0] res = "老婆今天是{}n今天天气概况n城市: {:<10}n时间: {:<10}n高温: {:<10}n低温: {:<10}n风力: {:<10}n风向: {:<10}n天气: {:<10}nn稍后会发送近期温度趋势图,请注意查看。 ".format( ganmao, city, datetime.datetime.now().strftime('%Y-%m-%d'), today_weather['high'].split()[1], today_weather['low'].split()[1], today_weather['fengli'].split('[')[2].split(']')[0], today_weather['fengxiang'],today_weather['type'], ) return {"source_data": data, "res": res} except Exception as e: return str(e) ``` + 获取天气预报趋势图 ```python # -*- coding: utf-8 -*- import matplotlib.pyplot as plt import re import datetime def Future_weather_states(forecast, save_path, day_num=5): ''' 展示未来的天气预报趋势图 :param forecast: 天气预报预测的数据 :param day_num: 未来几天 :return: 趋势图 ''' future_forecast = forecast dict={} for i in range(day_num): data = [] date = future_forecast[i]["date"] date = int(re.findall("d+",date)[0]) data.append(int(re.findall("d+", future_forecast[i]["high"])[0])) data.append(int(re.findall("d+", future_forecast[i]["low"])[0])) data.append(future_forecast[i]["type"]) dict[date] = data data_list = sorted(dict.items()) date=[] high_temperature = [] low_temperature = [] for each in data_list: date.append(each[0]) high_temperature.append(each[1][0]) low_temperature.append(each[1][1]) fig = plt.plot(date,high_temperature,"r",date,low_temperature,"b") current_date = datetime.datetime.now().strftime('%Y-%m') plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False plt.xlabel(current_date) plt.ylabel("℃") plt.legend(["高温", "低温"]) plt.xticks(date) plt.title("最近几天温度变化趋势") plt.savefig(save_path) ``` + 发送到企业微信 ```python # -*- coding: utf-8 -*- import requests import json class DLF: def __init__(self, corpid, corpsecret): self.url = "https://qyapi.weixin.qq.com/cgi-bin" self.corpid = corpid self.corpsecret = corpsecret self._token = self._get_token() def _get_token(self): ''' 获取企业微信API接口的access_token :return: ''' token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret) try: res = requests.get(token_url).json() token = res['access_token'] return token except Exception as e: return str(e) def _get_media_id(self, file_obj): get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token) data = {"media": file_obj} try: res = requests.post(url=get_media_url, files=data) media_id = res.json()['media_id'] return media_id except Exception as e: return str(e) def send_text(self, agentid, content, touser=None, toparty=None): send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser": touser, "toparty": toparty, "msgtype": "text", "agentid": agentid, "text": { "content": content } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) def send_image(self, agentid, file_obj, touser=None, toparty=None): media_id = self._get_media_id(file_obj) send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser": touser, "toparty": toparty, "msgtype": "image", "agentid": agentid, "image": { "media_id": media_id } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) + main脚本 # -*- coding: utf-8 -*- from plugins.weather_forecast import weather from plugins.trend_chart import Future_weather_states from plugins.send_wechat import DLF import os # 企业微信相关信息 corpid = "xxx" corpsecret = "xxx" agentid = "xxx" # 天气预报趋势图保存路径 _path = os.path.dirname(os.path.abspath(__file__)) save_path = os.path.join(_path ,'./tmp/weather_forecast.jpg') # 获取天气预报信息 content = weather("大兴") # 发送文字消息 dlf = DLF(corpid, corpsecret) dlf.send_text(agentid=agentid, content=content['res'], toparty='1') # 生成天气预报趋势图 Future_weather_states(content['source_data']['forecast'], save_path) # 发送图片消息 file_obj = open(save_path, 'rb') dlf.send_image(agentid=agentid, toparty='1', file_obj=file_obj)
Shell 脚本部分
SVN 完整备份
通过 hotcopy 进行 SVN 完整备份,备份保留 7 天。
#!/bin/bash # Filename :svn_backup_repos.sh # Date :2020/12/14 # Author :JakeTian # Email:JakeTian@***.com # Crontab:59 23 * * * /bin/bash $BASE_PATH/svn_backup_repos.sh >/dev/null 2>&1 # Notes:将脚本加入crontab中,每天定时执行 # Description:SVN完全备份 set -e SRC_PATH="/opt/svndata" DST_PATH="/data/svnbackup" LOG_FILE="$DST_PATH/logs/svn_backup.log" SVN_BACKUP_C="/bin/svnadmin hotcopy" SVN_LOOK_C="/bin/svnlook youngest" TODAY=$(date +'%F') cd $SRC_PATH ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name 'httpd' -a ! -name 'bak' | tr -d './') # 创建备份目录,备份脚本日志目录 test -d $DST_PATH || mkdir -p $DST_PATH test -d $DST_PATH/logs || mkdir $DST_PATH/logs test -d $DST_PATH/$TODAY || mkdir $DST_PATH/$TODAY # 备份repos文件 for repo in $ALL_REPOS do $SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo # 判断备份是否完成 if $SVN_LOOK_C $DST_PATH/$TODAY/$repo;then echo "$TODAY: $repo Backup Success" >> $LOG_FILE else echo "$TODAY: $repo Backup Fail" >> $LOG_FILE fi done # # 备份用户密码文件和权限文件 cp -p authz access.conf $DST_PATH/$TODAY # 日志文件转储 mv $LOG_FILE $LOG_FILE-$TODAY # 删除七天前的备份 seven_days_ago=$(date -d "7 days ago" +'%F') rm -rf $DST_PATH/$seven_days_ago
zabbix 监控用户密码过期
用于 Zabbix 监控 Linux 系统用户(shell 为 /bin/bash 和 /bin/sh)密码过期,密码有效期剩余 7 天触发加自动发现用户。
#!/bin/bash diskarray=(`awk -F':' '$NF ~ //bin/bash/||//bin/sh/{print $1}' /etc/passwd`) length=${#diskarray[@]} printf "{n" printf't'""data":[" for ((i=0;i<$length;i++)) do printf 'ntt{' printf ""{#USER_NAME}":"${diskarray[$i]}"}" if [ $i -lt $[$length-1] ];then printf ',' fi done printf"nt]n" printf "}n" 检查用户密码过期 #!/bin/bash export LANG=en_US.UTF-8 SEVEN_DAYS_AGO=$(date -d '-7 day' +'%s') user="$1" # 将Sep 09, 2018格式的时间转换成unix时间 expires_date=$(sudo chage -l $user | awk -F':' '/Password expires/{print $NF}' | sed -n 's/^ //p') if [[ "$expires_date" != "never" ]];then expires_date=$(date -d "$expires_date" +'%s') if [ "$expires_date" -le "$SEVEN_DAYS_AGO" ];then echo "1" else echo "0" fi else echo "0" fi
构建本地YUM
通过 rsync 的方式同步 yum,通过 nginx 只做 http yum 站点;
但是 centos6 的镜像最近都不能用了,国内貌似都禁用了,如果找到合适的自行更换地址。
#!/bin/bash # 更新yum镜像 RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000" DIR="/app/yumData" LogDir="$DIR/logs" Centos6Base="$DIR/Centos6/x86_64/Base" Centos7Base="$DIR/Centos7/x86_64/Base" Centos6Epel="$DIR/Centos6/x86_64/Epel" Centos7Epel="$DIR/Centos7/x86_64/Epel" Centos6Salt="Super hardcore ! 11 exemples de scripts Python et Shell très pratiques !" Centos7Salt="$DIR/Centos7/x86_64/Salt" Centos6Update="$DIR/Centos6/x86_64/Update" Centos7Update="$DIR/Centos7/x86_64/Update" Centos6Docker="$DIR/Centos6/x86_64/Docker" Centos7Docker="$DIR/Centos7/x86_64/Docker" Centos6Mysql5_7="$DIR/Centos6/x86_64/Mysql/Mysql5.7" Centos7Mysql5_7="$DIR/Centos7/x86_64/Mysql/Mysql5.7" Centos6Mysql8_0="$DIR/Centos6/x86_64/Mysql/Mysql8.0" Centos7Mysql8_0="$DIR/Centos7/x86_64/Mysql/Mysql8.0" MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn" # 目录不存在就创建 check_dir(){ for dir in $* do test -d $dir || mkdir -p $dir done } # 检查rsync同步结果 check_rsync_status(){ if [ $? -eq 0 ];then echo "rsync success" >> $1 else echo "rsync fail" >> $1 fi } check_dir $DIR $LogDir $Centos6Base $Centos7Base $Centos6Epel $Centos7Epel $Centos6Salt $Centos7Salt $Centos6Update $Centos7Update $Centos6Docker $Centos7Docker $Centos6Mysql5_7 $Centos7Mysql5_7 $Centos6Mysql8_0 $Centos7Mysql8_0 # Base yumrepo #$RsyncCommand "$MirrorDomain"/repo/centos/6/os/x86_64/ $Centos6Base >> "$LogDir/centos6Base.log" 2>&1 # check_rsync_status "$LogDir/centos6Base.log" $RsyncCommand "$MirrorDomain"/repo/centos/7/os/x86_64/ $Centos7Base >> "$LogDir/centos7Base.log" 2>&1 check_rsync_status "$LogDir/centos7Base.log" # Epel yumrepo # $RsyncCommand "$MirrorDomain"/repo/epel/6/x86_64/ $Centos6Epel >> "$LogDir/centos6Epel.log" 2>&1 # check_rsync_status "$LogDir/centos6Epel.log" $RsyncCommand "$MirrorDomain"/repo/epel/7/x86_64/ $Centos7Epel >> "$LogDir/centos7Epel.log" 2>&1 check_rsync_status "$LogDir/centos7Epel.log" # SaltStack yumrepo # $RsyncCommand "$MirrorDomain"/repo/salt/yum/redhat/6/x86_64/ $Centos6Salt >> "$LogDir/centos6Salt.log" 2>&1 # ln -s $Centos6Salt/archive/$(ls $Centos6Salt/archive | tail -1) $Centos6Salt/latest # check_rsync_status "$LogDir/centos6Salt.log" $RsyncComman "$MirrorDomain"/repo/salt/yum/redhat/7/x86_64/ $Centos7Salt >> "$LogDir/centos7Salt.log" 2>&1 check_rsync_status "$LogDir/centos7Salt.log" # ln -s $Centos7Salt/archive/$(ls $Centos7Salt/archive | tail -1) $Centos7Salt/latest # Docker yumrepo $RsyncCommand "$MirrorDomain"/repo/docker-ce/linux/centos/7/x86_64/stable/ $Centos7Docker >> "$LogDir/centos7Docker.log" 2>&1 check_rsync_status "$LogDir/centos7Docker.log" # centos update yumrepo # $RsyncCommand "$MirrorDomain"/repo/centos/6/updates/x86_64/ $Centos6Update >> "$LogDir/centos6Update.log" 2>&1 # check_rsync_status "$LogDir/centos6Update.log" $RsyncCommand "$MirrorDomain"/repo/centos/7/updates/x86_64/ $Centos7Update >> "$LogDir/centos7Update.log" 2>&1 check_rsync_status "$LogDir/centos7Update.log" # mysql 5.7 yumrepo # $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ "$Centos6Mysql5_7" >> "$LogDir/centos6Mysql5.7.log" 2>&1 # check_rsync_status "$LogDir/centos6Mysql5.7.log" $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ "$Centos7Mysql5_7" >> "$LogDir/centos7Mysql5.7.log" 2>&1 check_rsync_status "$LogDir/centos7Mysql5.7.log" # mysql 8.0 yumrepo # $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ "$Centos6Mysql8_0" >> "$LogDir/centos6Mysql8.0.log" 2>&1 # check_rsync_status "$LogDir/centos6Mysql8.0.log" $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ "$Centos7Mysql8_0" >> "$LogDir/centos7Mysql8.0.log" 2>&1 check_rsync_status "$LogDir/centos7Mysql8.0.log"
读者需求解答
负载高时,查出占用比较高的进程脚本并存储或推送通知
这部分内容是上篇 Shell 脚本实例中底部读者留言的需求,如下:
#!/bin/bash # 物理cpu个数 physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort | uniq | wc -l) # 单个物理cpu核数 physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $NF}') # 总核数 total_cpu_cores=$((physical_cpu_count*physical_cpu_cores)) # 分别是一分钟、五分钟、十五分钟负载的阈值,其中有一项超过阈值才会触发 one_min_load_threshold="$total_cpu_cores" five_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.8"}') fifteen_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.7"}') # 分别是分钟、五分钟、十五分钟负载平均值 one_min_load=$(uptime | awk '{print $(NF-2)}' | tr -d ',') five_min_load=$(uptime | awk '{print $(NF-1)}' | tr -d ',') fifteen_min_load=$(uptime | awk '{print $NF}' | tr -d ',') # 获取当前cpu 内存 磁盘io信息,并写入日志文件 # 如果需要发送消息或者调用其他,请自行编写函数即可 get_info(){ log_dir="cpu_high_script_log" test -d "$log_dir" || mkdir "$log_dir" ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > "$log_dir"/cpu_top10.log ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > "$log_dir"/mem_top10.log iostat -dx 1 10 > "$log_dir"/disk_io_10.log } export -f get_info echo "$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold" | awk '{ if ($1>=$2 || $3>=$4 || $5>=$6) system("get_info") }'
以上,就是今天分享的全部内容了。
希望大家通过这些案例能够学以致用,结合自身的实际场景进行运用,从而提高自己的工作效率。
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Outils d'IA chauds

Undresser.AI Undress
Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover
Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool
Images de déshabillage gratuites

Clothoff.io
Dissolvant de vêtements AI

AI Hentai Generator
Générez AI Hentai gratuitement.

Article chaud

Outils chauds

Bloc-notes++7.3.1
Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise
Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1
Puissant environnement de développement intégré PHP

Dreamweaver CS6
Outils de développement Web visuel

SublimeText3 version Mac
Logiciel d'édition de code au niveau de Dieu (SublimeText3)

MySQL a une version communautaire gratuite et une version d'entreprise payante. La version communautaire peut être utilisée et modifiée gratuitement, mais le support est limité et convient aux applications avec des exigences de stabilité faibles et des capacités techniques solides. L'Enterprise Edition fournit une prise en charge commerciale complète pour les applications qui nécessitent une base de données stable, fiable et haute performance et disposées à payer pour le soutien. Les facteurs pris en compte lors du choix d'une version comprennent la criticité des applications, la budgétisation et les compétences techniques. Il n'y a pas d'option parfaite, seulement l'option la plus appropriée, et vous devez choisir soigneusement en fonction de la situation spécifique.

L'article présente le fonctionnement de la base de données MySQL. Tout d'abord, vous devez installer un client MySQL, tel que MySQLWorkBench ou le client de ligne de commande. 1. Utilisez la commande MySQL-UROot-P pour vous connecter au serveur et connecter avec le mot de passe du compte racine; 2. Utilisez Createdatabase pour créer une base de données et utilisez Sélectionner une base de données; 3. Utilisez CreateTable pour créer une table, définissez des champs et des types de données; 4. Utilisez InsertInto pour insérer des données, remettre en question les données, mettre à jour les données par mise à jour et supprimer les données par Supprimer. Ce n'est qu'en maîtrisant ces étapes, en apprenant à faire face à des problèmes courants et à l'optimisation des performances de la base de données que vous pouvez utiliser efficacement MySQL.

Le fichier de téléchargement mysql est corrompu, que dois-je faire? Hélas, si vous téléchargez MySQL, vous pouvez rencontrer la corruption des fichiers. Ce n'est vraiment pas facile ces jours-ci! Cet article expliquera comment résoudre ce problème afin que tout le monde puisse éviter les détours. Après l'avoir lu, vous pouvez non seulement réparer le package d'installation MySQL endommagé, mais aussi avoir une compréhension plus approfondie du processus de téléchargement et d'installation pour éviter de rester coincé à l'avenir. Parlons d'abord de la raison pour laquelle le téléchargement des fichiers est endommagé. Il y a de nombreuses raisons à cela. Les problèmes de réseau sont le coupable. L'interruption du processus de téléchargement et l'instabilité du réseau peut conduire à la corruption des fichiers. Il y a aussi le problème avec la source de téléchargement elle-même. Le fichier serveur lui-même est cassé, et bien sûr, il est également cassé si vous le téléchargez. De plus, la numérisation excessive "passionnée" de certains logiciels antivirus peut également entraîner une corruption des fichiers. Problème de diagnostic: déterminer si le fichier est vraiment corrompu

Les principales raisons de la défaillance de l'installation de MySQL sont les suivantes: 1. Problèmes d'autorisation, vous devez s'exécuter en tant qu'administrateur ou utiliser la commande sudo; 2. Des dépendances sont manquantes et vous devez installer des packages de développement pertinents; 3. Conflits du port, vous devez fermer le programme qui occupe le port 3306 ou modifier le fichier de configuration; 4. Le package d'installation est corrompu, vous devez télécharger et vérifier l'intégrité; 5. La variable d'environnement est mal configurée et les variables d'environnement doivent être correctement configurées en fonction du système d'exploitation. Résolvez ces problèmes et vérifiez soigneusement chaque étape pour installer avec succès MySQL.

Guide d'optimisation des performances de la base de données MySQL dans les applications à forte intensité de ressources, la base de données MySQL joue un rôle crucial et est responsable de la gestion des transactions massives. Cependant, à mesure que l'échelle de l'application se développe, les goulots d'étranglement des performances de la base de données deviennent souvent une contrainte. Cet article explorera une série de stratégies efficaces d'optimisation des performances MySQL pour garantir que votre application reste efficace et réactive dans des charges élevées. Nous combinerons des cas réels pour expliquer les technologies clés approfondies telles que l'indexation, l'optimisation des requêtes, la conception de la base de données et la mise en cache. 1. La conception de l'architecture de la base de données et l'architecture optimisée de la base de données sont la pierre angulaire de l'optimisation des performances MySQL. Voici quelques principes de base: sélectionner le bon type de données et sélectionner le plus petit type de données qui répond aux besoins peut non seulement économiser un espace de stockage, mais également améliorer la vitesse de traitement des données.

L'optimisation des performances MySQL doit commencer à partir de trois aspects: configuration d'installation, indexation et optimisation des requêtes, surveillance et réglage. 1. Après l'installation, vous devez ajuster le fichier my.cnf en fonction de la configuration du serveur, tel que le paramètre innodb_buffer_pool_size, et fermer query_cache_size; 2. Créez un index approprié pour éviter les index excessifs et optimiser les instructions de requête, telles que l'utilisation de la commande Explication pour analyser le plan d'exécution; 3. Utilisez le propre outil de surveillance de MySQL (ShowProcessList, Showstatus) pour surveiller la santé de la base de données, et sauvegarde régulièrement et organisez la base de données. Ce n'est qu'en optimisant en continu ces étapes que les performances de la base de données MySQL peuvent être améliorées.

MySQL peut s'exécuter sans connexions réseau pour le stockage et la gestion des données de base. Cependant, la connexion réseau est requise pour l'interaction avec d'autres systèmes, l'accès à distance ou l'utilisation de fonctionnalités avancées telles que la réplication et le clustering. De plus, les mesures de sécurité (telles que les pare-feu), l'optimisation des performances (choisissez la bonne connexion réseau) et la sauvegarde des données sont essentielles pour se connecter à Internet.

MySQL a refusé de commencer? Ne paniquez pas, vérifions-le! De nombreux amis ont découvert que le service ne pouvait pas être démarré après avoir installé MySQL, et ils étaient si anxieux! Ne vous inquiétez pas, cet article vous emmènera pour le faire face calmement et découvrez le cerveau derrière! Après l'avoir lu, vous pouvez non seulement résoudre ce problème, mais aussi améliorer votre compréhension des services MySQL et vos idées de problèmes de dépannage, et devenir un administrateur de base de données plus puissant! Le service MySQL n'a pas réussi et il y a de nombreuses raisons, allant des erreurs de configuration simples aux problèmes système complexes. Commençons par les aspects les plus courants. Connaissances de base: une brève description du processus de démarrage du service MySQL Service Startup. Autrement dit, le système d'exploitation charge les fichiers liés à MySQL, puis démarre le démon mysql. Cela implique la configuration
