1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| import logging import re import subprocess
# 日志级别默认是WARNING #logging.basicConfig(level=logging.INFO,filename='run.log', # format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s') # 开始使用log功能 logger = logging.getLogger(__name__) logger.setLevel(level=logging.INFO) handler = logging.FileHandler("chartslog.txt") handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter)
console = logging.StreamHandler() console.setLevel(logging.INFO)
# 日志是否保存在日志文件里 logger.addHandler(handler) #日志是否在前台显示 logger.addHandler(console)
#根据监听端口获取进程号,执行命令:netstat -lptn |grep port,获取成功返回进程号,获取失败返回0 def getPid(port): result = subprocess.check_output("netstat -lptn |grep " + port, shell=True).decode('utf-8') if len(result) != 0: processid = re.match('.*?((\d+)(\D+)?)$', result).group(2) logger.info("pid为:" + processid) return processid else: logger.error("获取进程失败,请检查该端口:" + port + "是否被使用!netstat -lptn |grep命令结果为:" + result) return 0
#根据进程号获取进程所在路径,执行命令:pwdx pid,获取成功返回路径,获取失败返回0 def getPath(pid): result = subprocess.check_output("pwdx " + pid, shell=True).decode('utf-8') if len(result) != 0: processpath = re.match('\d+: (.*)', result).group(1) logger.info("进程所在路径为:" + processpath) return processpath else: logger.error("获取路径失败,请检查进程号:" + pid + "是否存在!pwdx命令结果为:" + result) return 0
#根据进程所在路径进入到进程所在位置,执行命令:cd path、pwd,获取成功无返回,获取失败返回0 def goPath(path): result = subprocess.check_output("cd " + path, shell=True).decode('utf-8') if len(result) == 0: # processpath = subprocess.check_output("pwd", shell=True).decode('utf-8') # logger.info("当前所在路径为:" + processpath) pass else: logger.error("进入路径失败,请检查路径:" + path + "是否存在!cd命令结果为:" + result) return 0
#查询kafka积压 def kafkaCheck(port): kfkpid = getPid(str(port)) kfkpath = getPath(kfkpid)
kfkoverstock = "/bin/kafka-consumer-offset-checker.sh --topic antispider --zookeeper localhost:2181 --group groupid"
if kfkpid != 0 and kfkpath != 0: result = subprocess.Popen(kfkpath + kfkoverstock, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info("kafka结果为:" + '\n' + str(result.communicate()))
#压缩生成的日志文件 def setZip(): result = subprocess.check_output("pwd", shell=True).decode('utf-8') if len(result) != 0:
logger.info("当前路径为:" + result) try: zipcmd = subprocess.check_output("zip -r checkResult.zip " + result, shell=True).decode('utf-8') except Exception as e: logger.error(e) finally: pass else: logger.error("获取路径失败!pwd命令结果为:" + result)
#该函数可以执行一些状态检测的命令,并输出结果 def systemstatus(): # 参考文章:https://blog.csdn.net/baidu_36943075/article/details/105681683 # subprocess.Popen虽然既可以判断执行是否成功,还可以获取执行结果,但是该方式返回的结果不够直观,不能记录在终端下执行命令的效果 # subprocess.call方式仅返回执行命令成功或失败的结果,执行失败不需要特殊处理,命令执行失败会直接报错;结果为0则表示执行成功,为其他值则表示执行不成功 # subprocess.check_output方式执行失败不需要特殊处理,命令执行失败会直接报错,该方式返回执行结果,但是结果返回的是一个str字符串(不论有多少行),并且返回的结果需要转换编码
# p = subprocess.Popen('df -h',shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) # result = subprocess.call("df -h",shell=True,stdout=subprocess.PIPE)
command = {"磁盘空间":"df -h","CPU与内存使用情况":"vmstat","端口开放情况":"netstat -lptn"}
command2 = {"防火墙是否开放":"firewall-cmd --state","防火墙开放列表":"firewall-cmd --list-all"}
for key,value in command.items(): result = subprocess.check_output(value, shell=True).decode('utf-8') logger.info(str(key) + '\n' + result)
for key,value in command2.items(): result = subprocess.Popen(value, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info(str(key) + '\n' + str(result.communicate()))
#该函数会通过ip a命令输出ip地址;且可以通过ping的方式去检查跟某些域名的连通性 def networkstatus(): #配置ping检测的域名 cloudurl = ["www.xx.com","www.xx.com"] command = {"ip地址": "ip a"}
for key,value in command.items(): result = subprocess.check_output(value, shell=True).decode('utf-8') logger.info(str(key) + '\n' + result)
try: for url in cloudurl: result = subprocess.check_output("ping " + url + " -c 5", shell=True).decode('utf-8') logger.info("ping " + url + '\n' + result)
except Exception as e: logger.error("连接域名异常!") logger.error(e) finally: pass
if __name__ == '__main__': systemstatus() networkstatus() kafkaCheck("port")
#生成zip文件 setZip()
|