zabbix 线路质量监控自定义python模块(Mysql版),多线程(后来发现使用协程更好)降低系统消耗
阅读原文时间:2023年07月09日阅读:4

之前零零碎碎写了一些zabbix 线路监控的脚本,工作中agnet较多,每条线路监控需求不一致,比较杂乱,现在整理成一个py模块,集合之前的所有功能

环境

  python3.6以上版本,pip3(pip 9.0.1以上版本),mysql,pymysql库

  使用zabbix自定义脚本获取线路时延丢包率不做介绍,参考上一篇zabbix文章

如果系统当前python版本是python3.5,升级3.6时有两个注意事项

  1 先升级python至3.6再升级pip3否则会导致pip3无法正常使用

  2 python3.5升级到3.6后需要把lsb_release.py 文件复制到python3.6的lib里,否则pip3无法正常使用

  3 上两步完成后再进行pip3升级

darkping包文件如下
  -----bin----程序入口,接收参数调用views
    -----views-----逻辑函数,计算并返回,清除数据库历史数据
      -----mtr.sh----shell脚本,供views调用
      -----start-sql----根据ipinfo表变化动态创建线程,数据写入zabbixvalue表
        ------tcping----使用socket计算tcp时延丢包工具
  -----log----日志文件
  -----models-----数据库相关
  -----settings----配置文件,sql语句,文件路径,重要参数等

逻辑

  zabbix前端添加item

  bin接收zabbix item传过来的参数,格式化后调用view.dark_zabbix() ,函数把ipinfo信息写入数据库,检查start-sql脚本是否执行,如未执行就触发反之从zabbixvalue表中获取item所需要的参数,经过计算后返回,并进行判断,如果时延相对于上次探测结果增大一定阈值或丢包超过设定阈值就调用mtr脚本并保存至日志

  start-sql轮询ipinfo表中数据动态创建线程,调用测试命令把数据写入zabbixvalue表中

  zabbix前端删除item

  views函数会检查ipinfo中30分钟未更新数据,并进行删除

  start-sql根据ipinfo表变化重新创建线程

---------------------------------------------- 2021.6.9号更新---------------------------------------------------------------------------------------

数据量大了后发现使用多线程,线程间来回抢占cpu导致cpu消耗增大,后来改为协程,cpu消耗对比原来降低一半以上

-----------------------------------------------------------------------------------------------------------------------------------------------------------

bin代码

#!/usr/bin/env python3
#-*-coding:utf-8-*-
#----------------------------------------------------------zabbixping脚本----------------------------------------------------
import argparse
from views import dark_zabbix

if __name__ == "__main__":
parser = argparse.ArgumentParser(description='icmp for monitor')
parser.add_argument('-t',action = 'store',dest='tip')
parser.add_argument('-i',action='store',dest='interval',default='1')
parser.add_argument('-I',action='store',dest='item')
parser.add_argument('-p',action='store',dest='port',default='0')
parser.add_argument('-T',action = 'store',dest='type',default='icmp')
args= parser.parse_args()
ip = args.tip
i = float(args.interval)
item = args.item
port = int(args.port)
t_ype = args.type
print(dark_zabbix(ip,item,i,port,t_ype))

views代码

#!/usr/bin/env python3
#-*-coding:utf-8-*-
import log,models,time,subprocess,re
from models import db
from settings import dbinfo

数据库写入需要监控的ip参数

def insertdb(ip,i,port,t_ype):
dbvalues = dbinfo()
my_sql = db(dbvalues.dbinfo())
ctime=int(time.time())
check_sql = dbvalues.sql_sqlstatues(ip,t_ype,port,'ipinfo')
sql = dbvalues.sql_inserttoipinfo(ip,i,port,t_ype,ctime)
sql_update = dbvalues.sql_updatetoipinfo(ip,port,t_ype,ctime)
check = my_sql.db_readone(check_sql)
try:
ip,c_time = check['ipaddress'],check['time']
my_sql.db_write(sql_update)

except:  
    my\_sql.db\_write(sql)  
    my\_sql.db\_close()  

#删除30分钟没有Item调用的monitor ipinfo
def clear():
ctime = int(time.time()) - 1800
dbvalues = dbinfo()
my_sql = db(dbvalues.dbinfo())
sql = dbvalues.sql_clearipinfo(ctime)
my_sql.db_write(sql)
my_sql.db_close()

def dark_zabbix(ip,item,i,port,t_ype):
insertdb(ip,i,port,t_ype)
clear()
#初始化
res_ret = 0
pkloss_ret = 0
#根据频率计算所select的数据数量
packet_count= int(20/i)
#实例化dbinfo
dbvalues = dbinfo()
#logger对象
logger = log.logger()
#获取调用pingsql命令
# cmd = dbvalues.cmd(ip,i,port,t_ype)
#获取sql语句
sql_getvalue = dbvalues.sql_getvalue(ip,packet_count,t_ype,port)
sql_mtr = dbvalues.sql_mtr(ip,packet_count,t_ype,port)

#实例化mysql对象,调用readone/all方法  
my\_sql = db(dbvalues.dbinfo())

#判断start-sql是否在运行,如果没有就执行,初始化或者程序异常重新执行  
sta = subprocess.Popen('ps aux | grep start-sql.py',shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()  
flag = re.findall('python3',sta\[0\].decode('utf8'))  
if  not flag:  
    cmd = dbvalues.cmd()  
    subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)  
    logger.debug(cmd)

ret = my\_sql.db\_readall(sql\_getvalue)  
if len(ret) <packet\_count:  
    return(res\_ret)  
else:  
    for x in ret:  
        res\_ret+=x\['res'\]  
        pkloss\_ret+=x\['pkloss'\]  
#计算时延和丢包率  
try:  
    restime = (round(float(res\_ret/(packet\_count-pkloss\_ret)),2))  
except:  
    restime=0  
pkloss = (round(float(pkloss\_ret/packet\_count\*100),2))  
#计算本次与上次时延差值与本次丢包率,决定是否调用mtr  
try:  
    history\_restime = round(float(my\_sql.db\_readone(sql\_mtr)\['avg(a.res)'\]),2)  
    if restime - history\_restime > 20 or 100> pkloss >20:  
        mtr = dbvalues.mtr(ip)  
        subprocess.Popen(mtr,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)  
except Exception as a:  
    logger.info(a)

if item =='restime':  
    my\_sql.db\_close()  
    return  restime  
if item == 'pkloss':  
    my\_sql.db\_close()  
    return  pkloss

start-sql代码

#!/usr/bin/env python3
#-*-coding:utf-8-*-
import subprocess,re,time,pymysql,argparse,threading
from settings import dbinfo
from models import db
import log
import gevent
from gevent import monkey

logger = log.logger()
def ping(cmd):
pkloss = 0
ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
try:
ret =re.findall('\d+\.?\d*' ,(re.findall('time=\d+\.?\d*',ret)[0]))[0]
return(ret,pkloss)
except Exception as a :
ret = 0
pkloss = 1
return(ret,pkloss)
def value(i,ipaddress,port,t_ype):
#设置线程循环标识
flag = True
clas = dbinfo()
path = clas.base_dir()
if t_ype == 'icmp':
cmd = 'ping -c 1 -W 1 %s'%ipaddress
elif t_ype =='tcp':
cmd = path + '/tcping.py %s %s 1'%(ipaddress,port)
elif t_ype =='udp':
cmd = path+ '/udpping.py %s %s 1'%(ipaddress,port)
i = float(i)

logger.debug('初始化%s'%ipaddress)  
dbvalues = dbinfo()  
mysql = db(dbvalues.dbinfo())  
#获取初始ipinfo表数据行数  
count = mysql.db\_readone('select count(nid) from ipinfo;')\['count(nid)'\]  
while flag:  
    new\_count =  mysql.db\_readone('select count(nid) from ipinfo;')\['count(nid)'\]  
    #如果ipinfo表有新增或删除就结束循环,重新创建新线程  
    if new\_count != count:  
        flag=False  
    start = time.time()  
    res,pkloss = ping(cmd)  
    t\_time = int(time.time())  
    ctime = t\_time - 1800  
    #获取清空历史数据sql语句  
    sql = dbvalues.sql\_clearhistory(ctime)  
    sql1 = dbvalues.sql\_insert(res,pkloss,ipaddress,t\_time,t\_ype,port)  
    #清空48小时前数据  
    mysql.db\_delete(sql)  
    #写入新数据  
    mysql.db\_write(sql1)  
    usetime = time.time()-start  
    #防止sleep值为负数  
    try:  
        time.sleep(i -usetime)  
    except Exception as a:  
        pass  
return

if __name__ == "__main__":
while True:
dbvalues = dbinfo()
mysql = db(dbvalues.dbinfo())
li = []
ipinfo=mysql.db_readall('select * from ipinfo;')
mysql.db_close()
monkey.patch_all()

        for x in ipinfo:  
            s1 = gevent.spawn(value, x\['i'\],x\['ipaddress'\],x\['port'\],x\['type'\])  
            li.append(s1)  
        gevent.joinall(li)

if __name__ == "__main__":

while True:

dbvalues = dbinfo()

mysql = db(dbvalues.dbinfo())

li = []

ipinfo=mysql.db_readall('select * from ipinfo;')

mysql.db_close()

#创建线程

for x in ipinfo:

t1 = threading.Thread(target=value,args=(x['i'],x['ipaddress'],x['port'],x['type']))

t1.start()

li.append(t1)

#所有线程不结束不进行下次循环

for t in li:

t.join()

tcping代码

#!/usr/bin/env python3
"""
TCP Ping Test (defaults to port 80, 10000 packets)
Usage: ./tcpping.py host [port] [maxCount]

  • Ctrl-C Exits with Results
    """

import sys
import socket
import time
import signal
from timeit import default_timer as timer

host = None
port = 80

maxCount = 10000
count = 0

try:

sip = sys.argv[1]

except IndexError:

print("Usage: tcpping.py host [port] [maxCount]")

sys.exit(1)

try:
host = sys.argv[1]
except IndexError:
print("Usage: tcpping.py host [port] [maxCount]")
sys.exit(1)

try:
port = int(sys.argv[2])
except ValueError:
print("Error: Port Must be Integer:", sys.argv[3])
sys.exit(1)
except IndexError:
pass

try:
maxCount = int(sys.argv[3])
except ValueError:
print("Error: Max Count Value Must be Integer", sys.argv[3])
sys.exit(1)
except IndexError:
pass

passed = 0
failed = 0

def getResults():
""" Summarize Results """

lRate = 0  
if failed != 0:  
    lRate = failed / (count) \* 100  
    lRate = "%.2f" % lRate

print("\\nTCP Ping Results: Connections (Total/Pass/Fail): \[{:}/{:}/{:}\] (Failed: {:}%)".format((count), passed, failed, str(lRate)))

def signal_handler(signal, frame):
""" Catch Ctrl-C and Exit """
getResults()
sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

while count < maxCount:

count += 1

success = False

s = socket.socket(  
socket.AF\_INET, socket.SOCK\_STREAM)

s.settimeout(1)

s\_start = timer()

try:  
    # s.bind((sip,0))  
    s.connect((host, int(port)))  
    s.shutdown(socket.SHUT\_RD)  
    success = True

except socket.timeout:  
    print("Connection timed out!")  
    failed += 1  
except OSError as e:  
    print("OS Error:", e)  
    failed += 1

s\_stop = timer()  
s\_runtime = "%.2f" % (1000 \* (s\_stop - s\_start))

if success:  
    print("Connected to %s\[%s\]: tcp\_seq=%s time=%s ms" % (host, port, (count-1), s\_runtime))  
    passed += 1

if count < maxCount:  
    time.sleep(1)

getResults()

log代码

#!/usr/bin/env python3
#-*-coding:utf-8-*-
import logging,time
from settings import dbinfo
def logger():
base_dir = dbinfo()
log_name = base_dir.log_dir()
logger = logging.getLogger()
fh = logging.FileHandler(log_name)
formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
fh.setFormatter(formater)
logger.setLevel(logging.DEBUG)
logger.addHandler(fh)
return logger

models代码

#!/usr/bin/env python3
#-*-coding:utf-8-*-
#-----------------------------------------------------创建db类--------------------------------------------------------
import pymysql,settings
class db:
def __init__(self,conninfo):
self.host = conninfo['host']
self.port = conninfo['port']
self.user = conninfo['user']
self.passwd = conninfo['passwd']
self.db = conninfo['db']
self.ch = conninfo['charset']
self.conn = pymysql.connect(host=self.host,port = self.port,user = self.user,passwd = self.passwd, db=self.db,charset=self.ch)
self.coursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)

def db\_readone(self,sql):  
    self.coursor.execute(sql)  
    return self.coursor.fetchone()  
def db\_readall(self,sql):  
    self.coursor.execute(sql)  
    return self.coursor.fetchall()  
def db\_write(self,sql):  
    self.coursor.execute(sql)  
    self.conn.commit()  
def db\_delete(self,sql):  
    self.coursor.execute(sql)  
    self.conn.commit()  
def db\_close(self):  
    self.conn.close()

settings代码

#!/usr/bin/env python3
#-*-coding:utf-8-*-
#---------------------------------------------配置文件,定义sql语句,路径等------------------------------------------------
import os,time

class dbinfo:
def __init__(self):
self.dir = os.path.dirname(os.path.abspath(__file__))
def base_dir(self):
return self.dir
def log_dir(self):
log_dir = self.dir + '/log/' + time.strftime('%Y-%m-%d',time.localtime()) + '.log'
return log_dir
def mtr(self,ip):
mtr_dir = self.dir+'/log/'+ip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log'
cmd = self.dir + '/mtr.sh'+' '+ip+' '+mtr_dir
return cmd
def dbinfo(self):
dbinfo = {'host':'127.0.0.1','port':3306,'user':'root','passwd':'darkcs', 'db':'pingvalues','charset':'utf8'}
return dbinfo
def sql_sqlstatues(self,ip,t_ype,port,tables):
sql = 'select time,ipaddress from %s where ipaddress = "%s" and type = "%s" and port = %s order by nid desc limit 1;'%(tables,ip,t_ype,port)
return sql
def sql_getvalue(self,ip,packet_count,t_ype,port):
sql = 'select res,pkloss,ipaddress,time from zabbixvalue where ipaddress = "%s" and type = "%s" and port = %s order by nid desc limit %s;'%(ip,t_ype,port,packet_count)
return sql
def cmd(self):
cmd = 'nohup' + ' '+self.dir + '/start-sql.py >/dev/null 2>&1 &'
return cmd
def sql_clearhistory(self,ctime):
sql = 'delete from zabbixvalue where time<%s;'%ctime
return sql
def sql_insert(self,res,pkloss,ipaddress,t_time,t_ype,port):
sql = 'insert into zabbixvalue(res,pkloss,ipaddress,time,type,port) values(%s,%s,"%s",%s,"%s",%s)'%(res,pkloss,ipaddress,t_time,t_ype,port)
return sql
def sql_mtr(self,ip,packet_count,t_ype,port):
sql = 'select avg(a.res) from (select res from zabbixvalue where ipaddress like "%s" and type like "%s" and port like %s order by nid desc limit %s,%s) as a;'%(ip,t_ype,port,packet_count,packet_count)
return sql
def sql_inserttoipinfo(self,ip,i,port,t_ype,ctime):
sql = 'insert into ipinfo(ipaddress,port,type,time,i) values("%s",%s,"%s","%s",%s);'%(ip,port,t_ype,ctime,i)
return sql
def sql_updatetoipinfo(self,ip,port,t_ype,ctime):
sql = 'update ipinfo set time = "%s" where ipaddress like "%s" and type like "%s" and port like %s'%(ctime,ip,t_ype,port)
return sql
def sql_clearipinfo(self,ctime):
sql = 'delete from ipinfo where time < %s'%ctime
return sql

mtr shell脚本

#!/usr/bin/env bash
IP=$1
dir=$2
mtr -r -n -c 30 -w -b $IP >> $2

mysql

mysql> desc ipinfo;
+-----------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-----------+-------------+------+-----+---------+----------------+
| nid | int(11) | NO | PRI | NULL | auto_increment |
| ipaddress | varchar(64) | YES | | NULL | |
| port | int(11) | YES | | NULL | |
| type | varchar(64) | YES | | NULL | |
| time | varchar(64) | YES | | NULL | |
| i | float | YES | | NULL | |
+-----------+-------------+------+-----+---------+----------------+
6 rows in set (0.00 sec)

mysql> mysql> desc zabbixvalue;
+-----------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-----------+-------------+------+-----+---------+----------------+
| nid | int(11) | NO | PRI | NULL | auto_increment |
| res | float | YES | | NULL | |
| pkloss | int(11) | YES | | NULL | |
| ipaddress | varchar(64) | YES | | NULL | |
| time | int(11) | YES | | NULL | |
| type | varchar(64) | YES | | NULL | |
| port | int(11) | YES | | NULL | |
+-----------+-------------+------+-----+---------+----------------+
7 rows in set (0.00 sec)

创建 ipaddress type port三列联合索引,避免数据量过大导致全表扫描造成的系统cpu满负载

mysql> show index from zabbixvalue;
+-------------+------------+-----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+
| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment |
+-------------+------------+-----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+
| zabbixvalue | 0 | PRIMARY | 1 | nid | A | 223725 | NULL | NULL | | BTREE | | |
| zabbixvalue | 1 | ip_info | 1 | ipaddress | A | 1 | NULL | NULL | YES | BTREE | | |
| zabbixvalue | 1 | ip_info | 2 | type | A | 1 | NULL | NULL | YES | BTREE | | |
| zabbixvalue | 1 | ip_info | 3 | port | A | 1 | NULL | NULL | YES | BTREE | | |
| zabbixvalue | 1 | time_info | 1 | time | A | 90354 | NULL | NULL | YES | BTREE | | |
+-------------+------------+-----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+
5 rows in set (0.00 sec)

zabbix_agentd.conf

UserParameter=dark_ping_restime[*],/etc/zabbix/darkping/bin.py -t $1 -I restime
UserParameter=dark_ping_pkloss[*],/etc/zabbix/darkping/bin.py -t $1 -I pkloss

UserParameter=dark_tcpping_restime[*],/etc/zabbix/darkping/bin.py -t $1 -p $2 -T tcp -I restime
UserParameter=dark_tcpping_pkloss[*],/etc/zabbix/darkping/bin.py -t $1 -p $2 -T tcp -I pkloss

Item 配置略

2021.6.8 补一个udpping工具

client端

#!/usr/bin/env python
#-*-coding:utf-8-*-
import socket
import sys
import time
import string
import random
import signal
import os

INTERVAL = 1000 #unit ms
LEN =64
IP=""
PORT=0

count=0
count_of_received=0
rtt_sum=0.0
rtt_min=99999999.0
rtt_max=0.0

def signal_handler(signal, frame):

def signal_handler(*args, **kwargs):
if count!=0 and count_of_received!=0:
print('')
print('--- ping statistics ---')
if count!=0:
print('%d packets transmitted, %d received, %.2f%% packet loss'%(count,count_of_received, (count-count_of_received)*100.0/count))
if count_of_received!=0:
print('rtt min/avg/max = %.2f/%.2f/%.2f ms'%(rtt_min,rtt_sum/count_of_received,rtt_max))
os._exit(0)

def random_string(length):
return ''.join(random.choice(string.ascii_letters+ string.digits) for x in range(length))
['./udpping.py' ,'8.8.8.8' ,'53 ']

if len(sys.argv) != 4 and len(sys.argv)!=5 :
print(""" usage:""")
print(""" this_program """)
print(""" this_program "" """)

print()  
print(""" options:""")  
print("""   LEN         the length of payload, unit:byte""")  
print("""   INTERVAL    the seconds waited between sending each packet, as well as the timeout for reply packet, unit: ms""")

print()  
print(" examples:")  
print('   ./udping.py 8.8.8.8 4000  10 "LEN=400;INTERVAL=2000"')  

print(" ./udping.py fe80::5400:ff:aabb:ccdd 4000")

print()

exit()  

IP=socket.gethostbyname(sys.argv[1])
PORT=int(sys.argv[2])
monitorcount = int(sys.argv[3])
is_ipv6=0;

if IP.find(":")!=-1:
is_ipv6=1;

if len(sys.argv)==5:
print (1)
exec(sys.argv[4])

if LEN<5: print("LEN must be >=5")
exit()
if INTERVAL<50: print("INTERVAL must be >=50")
exit()

signal.signal(signal.SIGINT, signal_handler)

if not is_ipv6:
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
else:
sock = socket.socket(socket.AF_INET6,socket.SOCK_DGRAM)

print("udping %s via port %d with %d bytes of payload"% (IP,PORT,LEN))
sys.stdout.flush()

while count<monitorcount:
payload= random_string(LEN)
sock.sendto(payload.encode(), (IP, PORT))
time_of_send=time.time()
deadline = time.time() + INTERVAL/1000.0
received=0
rtt=0.0

while True:  
    timeout=deadline - time.time()  
    if timeout <0:  
        break  
    #print "timeout=",timeout  
    sock.settimeout(timeout);  
    try:  
        recv\_data,addr = sock.recvfrom(65536)  
        # print(sock.recvfrom(65535))  
        if recv\_data== payload.encode()  and addr\[0\]==IP and addr\[1\]==PORT:  
            rtt=((time.time()-time\_of\_send)\*1000)  
            print("Reply from",IP,"seq=%d"%count, "time=%.2f"%(rtt),"ms")  
            sys.stdout.flush()  
            received=1  
            break  
    except socket.timeout:  
        break  
    except :  
        pass  
count+= 1  
if received==1:  
    count\_of\_received+=1  
    rtt\_sum+=rtt  
    rtt\_max=max(rtt\_max,rtt)  
    rtt\_min=min(rtt\_min,rtt)  
else:  
    print("Request timed out")  
    sys.stdout.flush()

time\_remaining=deadline-time.time()  
if(time\_remaining>0):  
    time.sleep(time\_remaining)  

signal_handler()

server端

#!/usr/bin env python3
import socket
while True:
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.bind(('ipaddress',port))
data,addr = sock.recvfrom(65535)
sock.sendto(data,addr)

也可以使用socat,实际测试使用socat会引入额外开销,时延不准确

socat -v UDP-LISTEN:4000,fork PIPE

queue版

https://www.cnblogs.com/darkchen/p/15524856.html