之前零零碎碎写了一些zabbix 线路监控的脚本,工作中agnet较多,每条线路监控需求不一致,比较杂乱,现在整理成一个py模块,集合之前的所有功能
环境
python3.6以上版本,pip3(pip 9.0.1以上版本),mysql,pymysql库
使用zabbix自定义脚本获取线路时延丢包率不做介绍,参考上一篇zabbix文章
如果系统当前python版本是python3.5,升级3.6时有两个注意事项
1 先升级python至3.6再升级pip3否则会导致pip3无法正常使用
2 python3.5升级到3.6后需要把lsb_release.py
文件复制到python3.6的lib里,否则pip3无法正常使用
3 上两步完成后再进行pip3升级
darkping包文件如下
-----bin----程序入口,接收参数调用views
-----views-----逻辑函数,计算并返回,清除数据库历史数据
-----mtr.sh----shell脚本,供views调用
-----start-sql----根据ipinfo表变化动态创建线程,数据写入zabbixvalue表
------tcping----使用socket计算tcp时延丢包工具
-----log----日志文件
-----models-----数据库相关
-----settings----配置文件,sql语句,文件路径,重要参数等
逻辑
zabbix前端添加item
bin接收zabbix item传过来的参数,格式化后调用view.dark_zabbix() ,函数把ipinfo信息写入数据库,检查start-sql脚本是否执行,如未执行就触发反之从zabbixvalue表中获取item所需要的参数,经过计算后返回,并进行判断,如果时延相对于上次探测结果增大一定阈值或丢包超过设定阈值就调用mtr脚本并保存至日志
start-sql轮询ipinfo表中数据动态创建线程,调用测试命令把数据写入zabbixvalue表中
zabbix前端删除item
views函数会检查ipinfo中30分钟未更新数据,并进行删除
start-sql根据ipinfo表变化重新创建线程
---------------------------------------------- 2021.6.9号更新---------------------------------------------------------------------------------------
数据量大了后发现使用多线程,线程间来回抢占cpu导致cpu消耗增大,后来改为协程,cpu消耗对比原来降低一半以上
-----------------------------------------------------------------------------------------------------------------------------------------------------------
bin代码
#!/usr/bin/env python3
#-*-coding:utf-8-*-
#----------------------------------------------------------zabbixping脚本----------------------------------------------------
import argparse
from views import dark_zabbix
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='icmp for monitor')
parser.add_argument('-t',action = 'store',dest='tip')
parser.add_argument('-i',action='store',dest='interval',default='1')
parser.add_argument('-I',action='store',dest='item')
parser.add_argument('-p',action='store',dest='port',default='0')
parser.add_argument('-T',action = 'store',dest='type',default='icmp')
args= parser.parse_args()
ip = args.tip
i = float(args.interval)
item = args.item
port = int(args.port)
t_ype = args.type
print(dark_zabbix(ip,item,i,port,t_ype))
views代码
#!/usr/bin/env python3
#-*-coding:utf-8-*-
import log,models,time,subprocess,re
from models import db
from settings import dbinfo
def insertdb(ip,i,port,t_ype):
dbvalues = dbinfo()
my_sql = db(dbvalues.dbinfo())
ctime=int(time.time())
check_sql = dbvalues.sql_sqlstatues(ip,t_ype,port,'ipinfo')
sql = dbvalues.sql_inserttoipinfo(ip,i,port,t_ype,ctime)
sql_update = dbvalues.sql_updatetoipinfo(ip,port,t_ype,ctime)
check = my_sql.db_readone(check_sql)
try:
ip,c_time = check['ipaddress'],check['time']
my_sql.db_write(sql_update)
except:
my\_sql.db\_write(sql)
my\_sql.db\_close()
#删除30分钟没有Item调用的monitor ipinfo
def clear():
ctime = int(time.time()) - 1800
dbvalues = dbinfo()
my_sql = db(dbvalues.dbinfo())
sql = dbvalues.sql_clearipinfo(ctime)
my_sql.db_write(sql)
my_sql.db_close()
def dark_zabbix(ip,item,i,port,t_ype):
insertdb(ip,i,port,t_ype)
clear()
#初始化
res_ret = 0
pkloss_ret = 0
#根据频率计算所select的数据数量
packet_count= int(20/i)
#实例化dbinfo
dbvalues = dbinfo()
#logger对象
logger = log.logger()
#获取调用pingsql命令
# cmd = dbvalues.cmd(ip,i,port,t_ype)
#获取sql语句
sql_getvalue = dbvalues.sql_getvalue(ip,packet_count,t_ype,port)
sql_mtr = dbvalues.sql_mtr(ip,packet_count,t_ype,port)
#实例化mysql对象,调用readone/all方法
my\_sql = db(dbvalues.dbinfo())
#判断start-sql是否在运行,如果没有就执行,初始化或者程序异常重新执行
sta = subprocess.Popen('ps aux | grep start-sql.py',shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()
flag = re.findall('python3',sta\[0\].decode('utf8'))
if not flag:
cmd = dbvalues.cmd()
subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
logger.debug(cmd)
ret = my\_sql.db\_readall(sql\_getvalue)
if len(ret) <packet\_count:
return(res\_ret)
else:
for x in ret:
res\_ret+=x\['res'\]
pkloss\_ret+=x\['pkloss'\]
#计算时延和丢包率
try:
restime = (round(float(res\_ret/(packet\_count-pkloss\_ret)),2))
except:
restime=0
pkloss = (round(float(pkloss\_ret/packet\_count\*100),2))
#计算本次与上次时延差值与本次丢包率,决定是否调用mtr
try:
history\_restime = round(float(my\_sql.db\_readone(sql\_mtr)\['avg(a.res)'\]),2)
if restime - history\_restime > 20 or 100> pkloss >20:
mtr = dbvalues.mtr(ip)
subprocess.Popen(mtr,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
except Exception as a:
logger.info(a)
if item =='restime':
my\_sql.db\_close()
return restime
if item == 'pkloss':
my\_sql.db\_close()
return pkloss
start-sql代码
#!/usr/bin/env python3
#-*-coding:utf-8-*-
import subprocess,re,time,pymysql,argparse,threading
from settings import dbinfo
from models import db
import log
import gevent
from gevent import monkey
logger = log.logger()
def ping(cmd):
pkloss = 0
ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
try:
ret =re.findall('\d+\.?\d*' ,(re.findall('time=\d+\.?\d*',ret)[0]))[0]
return(ret,pkloss)
except Exception as a :
ret = 0
pkloss = 1
return(ret,pkloss)
def value(i,ipaddress,port,t_ype):
#设置线程循环标识
flag = True
clas = dbinfo()
path = clas.base_dir()
if t_ype == 'icmp':
cmd = 'ping -c 1 -W 1 %s'%ipaddress
elif t_ype =='tcp':
cmd = path + '/tcping.py %s %s 1'%(ipaddress,port)
elif t_ype =='udp':
cmd = path+ '/udpping.py %s %s 1'%(ipaddress,port)
i = float(i)
logger.debug('初始化%s'%ipaddress)
dbvalues = dbinfo()
mysql = db(dbvalues.dbinfo())
#获取初始ipinfo表数据行数
count = mysql.db\_readone('select count(nid) from ipinfo;')\['count(nid)'\]
while flag:
new\_count = mysql.db\_readone('select count(nid) from ipinfo;')\['count(nid)'\]
#如果ipinfo表有新增或删除就结束循环,重新创建新线程
if new\_count != count:
flag=False
start = time.time()
res,pkloss = ping(cmd)
t\_time = int(time.time())
ctime = t\_time - 1800
#获取清空历史数据sql语句
sql = dbvalues.sql\_clearhistory(ctime)
sql1 = dbvalues.sql\_insert(res,pkloss,ipaddress,t\_time,t\_ype,port)
#清空48小时前数据
mysql.db\_delete(sql)
#写入新数据
mysql.db\_write(sql1)
usetime = time.time()-start
#防止sleep值为负数
try:
time.sleep(i -usetime)
except Exception as a:
pass
return
if __name__ == "__main__":
while True:
dbvalues = dbinfo()
mysql = db(dbvalues.dbinfo())
li = []
ipinfo=mysql.db_readall('select * from ipinfo;')
mysql.db_close()
monkey.patch_all()
for x in ipinfo:
s1 = gevent.spawn(value, x\['i'\],x\['ipaddress'\],x\['port'\],x\['type'\])
li.append(s1)
gevent.joinall(li)
tcping代码
#!/usr/bin/env python3
"""
TCP Ping Test (defaults to port 80, 10000 packets)
Usage: ./tcpping.py host [port] [maxCount]
import sys
import socket
import time
import signal
from timeit import default_timer as timer
host = None
port = 80
maxCount = 10000
count = 0
try:
host = sys.argv[1]
except IndexError:
print("Usage: tcpping.py host [port] [maxCount]")
sys.exit(1)
try:
port = int(sys.argv[2])
except ValueError:
print("Error: Port Must be Integer:", sys.argv[3])
sys.exit(1)
except IndexError:
pass
try:
maxCount = int(sys.argv[3])
except ValueError:
print("Error: Max Count Value Must be Integer", sys.argv[3])
sys.exit(1)
except IndexError:
pass
passed = 0
failed = 0
def getResults():
""" Summarize Results """
lRate = 0
if failed != 0:
lRate = failed / (count) \* 100
lRate = "%.2f" % lRate
print("\\nTCP Ping Results: Connections (Total/Pass/Fail): \[{:}/{:}/{:}\] (Failed: {:}%)".format((count), passed, failed, str(lRate)))
def signal_handler(signal, frame):
""" Catch Ctrl-C and Exit """
getResults()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
while count < maxCount:
count += 1
success = False
s = socket.socket(
socket.AF\_INET, socket.SOCK\_STREAM)
s.settimeout(1)
s\_start = timer()
try:
# s.bind((sip,0))
s.connect((host, int(port)))
s.shutdown(socket.SHUT\_RD)
success = True
except socket.timeout:
print("Connection timed out!")
failed += 1
except OSError as e:
print("OS Error:", e)
failed += 1
s\_stop = timer()
s\_runtime = "%.2f" % (1000 \* (s\_stop - s\_start))
if success:
print("Connected to %s\[%s\]: tcp\_seq=%s time=%s ms" % (host, port, (count-1), s\_runtime))
passed += 1
if count < maxCount:
time.sleep(1)
getResults()
log代码
#!/usr/bin/env python3
#-*-coding:utf-8-*-
import logging,time
from settings import dbinfo
def logger():
base_dir = dbinfo()
log_name = base_dir.log_dir()
logger = logging.getLogger()
fh = logging.FileHandler(log_name)
formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
fh.setFormatter(formater)
logger.setLevel(logging.DEBUG)
logger.addHandler(fh)
return logger
models代码
#!/usr/bin/env python3
#-*-coding:utf-8-*-
#-----------------------------------------------------创建db类--------------------------------------------------------
import pymysql,settings
class db:
def __init__(self,conninfo):
self.host = conninfo['host']
self.port = conninfo['port']
self.user = conninfo['user']
self.passwd = conninfo['passwd']
self.db = conninfo['db']
self.ch = conninfo['charset']
self.conn = pymysql.connect(host=self.host,port = self.port,user = self.user,passwd = self.passwd, db=self.db,charset=self.ch)
self.coursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
def db\_readone(self,sql):
self.coursor.execute(sql)
return self.coursor.fetchone()
def db\_readall(self,sql):
self.coursor.execute(sql)
return self.coursor.fetchall()
def db\_write(self,sql):
self.coursor.execute(sql)
self.conn.commit()
def db\_delete(self,sql):
self.coursor.execute(sql)
self.conn.commit()
def db\_close(self):
self.conn.close()
settings代码
#!/usr/bin/env python3
#-*-coding:utf-8-*-
#---------------------------------------------配置文件,定义sql语句,路径等------------------------------------------------
import os,time
class dbinfo:
def __init__(self):
self.dir = os.path.dirname(os.path.abspath(__file__))
def base_dir(self):
return self.dir
def log_dir(self):
log_dir = self.dir + '/log/' + time.strftime('%Y-%m-%d',time.localtime()) + '.log'
return log_dir
def mtr(self,ip):
mtr_dir = self.dir+'/log/'+ip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log'
cmd = self.dir + '/mtr.sh'+' '+ip+' '+mtr_dir
return cmd
def dbinfo(self):
dbinfo = {'host':'127.0.0.1','port':3306,'user':'root','passwd':'darkcs', 'db':'pingvalues','charset':'utf8'}
return dbinfo
def sql_sqlstatues(self,ip,t_ype,port,tables):
sql = 'select time,ipaddress from %s where ipaddress = "%s" and type = "%s" and port = %s order by nid desc limit 1;'%(tables,ip,t_ype,port)
return sql
def sql_getvalue(self,ip,packet_count,t_ype,port):
sql = 'select res,pkloss,ipaddress,time from zabbixvalue where ipaddress = "%s" and type = "%s" and port = %s order by nid desc limit %s;'%(ip,t_ype,port,packet_count)
return sql
def cmd(self):
cmd = 'nohup' + ' '+self.dir + '/start-sql.py >/dev/null 2>&1 &'
return cmd
def sql_clearhistory(self,ctime):
sql = 'delete from zabbixvalue where time<%s;'%ctime
return sql
def sql_insert(self,res,pkloss,ipaddress,t_time,t_ype,port):
sql = 'insert into zabbixvalue(res,pkloss,ipaddress,time,type,port) values(%s,%s,"%s",%s,"%s",%s)'%(res,pkloss,ipaddress,t_time,t_ype,port)
return sql
def sql_mtr(self,ip,packet_count,t_ype,port):
sql = 'select avg(a.res) from (select res from zabbixvalue where ipaddress like "%s" and type like "%s" and port like %s order by nid desc limit %s,%s) as a;'%(ip,t_ype,port,packet_count,packet_count)
return sql
def sql_inserttoipinfo(self,ip,i,port,t_ype,ctime):
sql = 'insert into ipinfo(ipaddress,port,type,time,i) values("%s",%s,"%s","%s",%s);'%(ip,port,t_ype,ctime,i)
return sql
def sql_updatetoipinfo(self,ip,port,t_ype,ctime):
sql = 'update ipinfo set time = "%s" where ipaddress like "%s" and type like "%s" and port like %s'%(ctime,ip,t_ype,port)
return sql
def sql_clearipinfo(self,ctime):
sql = 'delete from ipinfo where time < %s'%ctime
return sql
mtr shell脚本
#!/usr/bin/env bash
IP=$1
dir=$2
mtr -r -n -c 30 -w -b $IP >> $2
mysql
mysql> desc ipinfo;
+-----------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-----------+-------------+------+-----+---------+----------------+
| nid | int(11) | NO | PRI | NULL | auto_increment |
| ipaddress | varchar(64) | YES | | NULL | |
| port | int(11) | YES | | NULL | |
| type | varchar(64) | YES | | NULL | |
| time | varchar(64) | YES | | NULL | |
| i | float | YES | | NULL | |
+-----------+-------------+------+-----+---------+----------------+
6 rows in set (0.00 sec)
mysql> mysql> desc zabbixvalue;
+-----------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-----------+-------------+------+-----+---------+----------------+
| nid | int(11) | NO | PRI | NULL | auto_increment |
| res | float | YES | | NULL | |
| pkloss | int(11) | YES | | NULL | |
| ipaddress | varchar(64) | YES | | NULL | |
| time | int(11) | YES | | NULL | |
| type | varchar(64) | YES | | NULL | |
| port | int(11) | YES | | NULL | |
+-----------+-------------+------+-----+---------+----------------+
7 rows in set (0.00 sec)
创建 ipaddress type port三列联合索引,避免数据量过大导致全表扫描造成的系统cpu满负载
mysql> show index from zabbixvalue;
+-------------+------------+-----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+
| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment |
+-------------+------------+-----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+
| zabbixvalue | 0 | PRIMARY | 1 | nid | A | 223725 | NULL | NULL | | BTREE | | |
| zabbixvalue | 1 | ip_info | 1 | ipaddress | A | 1 | NULL | NULL | YES | BTREE | | |
| zabbixvalue | 1 | ip_info | 2 | type | A | 1 | NULL | NULL | YES | BTREE | | |
| zabbixvalue | 1 | ip_info | 3 | port | A | 1 | NULL | NULL | YES | BTREE | | |
| zabbixvalue | 1 | time_info | 1 | time | A | 90354 | NULL | NULL | YES | BTREE | | |
+-------------+------------+-----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+
5 rows in set (0.00 sec)
zabbix_agentd.conf
UserParameter=dark_ping_restime[*],/etc/zabbix/darkping/bin.py -t $1 -I restime
UserParameter=dark_ping_pkloss[*],/etc/zabbix/darkping/bin.py -t $1 -I pkloss
UserParameter=dark_tcpping_restime[*],/etc/zabbix/darkping/bin.py -t $1 -p $2 -T tcp -I restime
UserParameter=dark_tcpping_pkloss[*],/etc/zabbix/darkping/bin.py -t $1 -p $2 -T tcp -I pkloss
Item 配置略
2021.6.8 补一个udpping工具
client端
#!/usr/bin/env python
#-*-coding:utf-8-*-
import socket
import sys
import time
import string
import random
import signal
import os
INTERVAL = 1000 #unit ms
LEN =64
IP=""
PORT=0
count=0
count_of_received=0
rtt_sum=0.0
rtt_min=99999999.0
rtt_max=0.0
def signal_handler(*args, **kwargs):
if count!=0 and count_of_received!=0:
print('')
print('--- ping statistics ---')
if count!=0:
print('%d packets transmitted, %d received, %.2f%% packet loss'%(count,count_of_received, (count-count_of_received)*100.0/count))
if count_of_received!=0:
print('rtt min/avg/max = %.2f/%.2f/%.2f ms'%(rtt_min,rtt_sum/count_of_received,rtt_max))
os._exit(0)
def random_string(length):
return ''.join(random.choice(string.ascii_letters+ string.digits) for x in range(length))
['./udpping.py' ,'8.8.8.8' ,'53 ']
if len(sys.argv) != 4 and len(sys.argv)!=5 :
print(""" usage:""")
print(""" this_program
print(""" this_program
print()
print(""" options:""")
print(""" LEN the length of payload, unit:byte""")
print(""" INTERVAL the seconds waited between sending each packet, as well as the timeout for reply packet, unit: ms""")
print()
print(" examples:")
print(' ./udping.py 8.8.8.8 4000 10 "LEN=400;INTERVAL=2000"')
print()
exit()
IP=socket.gethostbyname(sys.argv[1])
PORT=int(sys.argv[2])
monitorcount = int(sys.argv[3])
is_ipv6=0;
if IP.find(":")!=-1:
is_ipv6=1;
if len(sys.argv)==5:
print (1)
exec(sys.argv[4])
if LEN<5:
print("LEN must be >=5")
exit()
if INTERVAL<50:
print("INTERVAL must be >=50")
exit()
signal.signal(signal.SIGINT, signal_handler)
if not is_ipv6:
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
else:
sock = socket.socket(socket.AF_INET6,socket.SOCK_DGRAM)
print("udping %s via port %d with %d bytes of payload"% (IP,PORT,LEN))
sys.stdout.flush()
while count<monitorcount:
payload= random_string(LEN)
sock.sendto(payload.encode(), (IP, PORT))
time_of_send=time.time()
deadline = time.time() + INTERVAL/1000.0
received=0
rtt=0.0
while True:
timeout=deadline - time.time()
if timeout <0:
break
#print "timeout=",timeout
sock.settimeout(timeout);
try:
recv\_data,addr = sock.recvfrom(65536)
# print(sock.recvfrom(65535))
if recv\_data== payload.encode() and addr\[0\]==IP and addr\[1\]==PORT:
rtt=((time.time()-time\_of\_send)\*1000)
print("Reply from",IP,"seq=%d"%count, "time=%.2f"%(rtt),"ms")
sys.stdout.flush()
received=1
break
except socket.timeout:
break
except :
pass
count+= 1
if received==1:
count\_of\_received+=1
rtt\_sum+=rtt
rtt\_max=max(rtt\_max,rtt)
rtt\_min=min(rtt\_min,rtt)
else:
print("Request timed out")
sys.stdout.flush()
time\_remaining=deadline-time.time()
if(time\_remaining>0):
time.sleep(time\_remaining)
signal_handler()
server端
#!/usr/bin env python3
import socket
while True:
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.bind(('ipaddress',port))
data,addr = sock.recvfrom(65535)
sock.sendto(data,addr)
也可以使用socat,实际测试使用socat会引入额外开销,时延不准确
socat -v UDP-LISTEN:4000,fork PIPE
queue版
手机扫一扫
移动阅读更方便
你可能感兴趣的文章