rabbit_消费者
阅读原文时间:2023年07月09日阅读:1

import pika
import json
import time
import os
import ast
import uuid
import time
import json
import hashlib

import redis
import pymysql

import logging
from logging import handlers

日志记录

class Logger(object):
level_relations = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'crit': logging.CRITICAL
} # 日志级别关系映射

def \_\_init\_\_(self, filename, level='info', when='D', backCount=3,  
             fmt='%(asctime)s - %(pathname)s\[line:%(lineno)d\] - %(levelname)s: %(message)s'):  
    self.logger = logging.getLogger(filename)  
    format\_str = logging.Formatter(fmt)  # 设置日志格式  
    self.logger.setLevel(self.level\_relations.get(level))  # 设置日志级别  
    sh = logging.StreamHandler()  # 往屏幕上输出  
    sh.setFormatter(format\_str)  # 设置屏幕上显示的格式  
    th = handlers.TimedRotatingFileHandler(filename=filename, when=when, backupCount=backCount,  
                                           encoding='utf-8')  # 往文件里写入#指定间隔时间自动生成文件的处理器  
    # 实例化TimedRotatingFileHandler  
    # interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:  
    # S 秒  
    # M 分  
    # H 小时、  
    # D 天、  
    # W 每星期(interval==0时代表星期一)  
    # midnight 每天凌晨  
    th.setFormatter(format\_str)  # 设置文件里写入的格式  
    self.logger.addHandler(sh)  # 把对象加到logger里  
    self.logger.addHandler(th)

file = 'all'
log = Logger('clear' + os.sep + '%s.log' % file)
logger = log.logger

DEBUG = False

class ClearLog(object):
if DEBUG:
DATABASE = 'unionlog'
# 本地测试
poll = redis.ConnectionPool(host='192.168.10.10', port=7000, db=5, password='', decode_responses=True)
conn = pymysql.connect(host='192.168.10.5', user='root',
password='root',
database=DATABASE, charset='utf8')
cursor = conn.cursor()
else:
DATABASE = 'log'
# 线上正式
poll = redis.ConnectionPool(host='192.168.5.219', port=6379, db=5, password='', decode_responses=True)
conn = pymysql.connect(host='', user='datacenter',
password='kbs11zx@',
database=DATABASE, charset='utf8')
cursor = conn.cursor()
CONN = redis.Redis(connection_pool=poll)
REDIS_PID_HASH = "tarsier.log.clear.pid.hash"
REDIS_PID_DELETE_HASH = "tarsier.log.delete.pid.hash"
REDIS_PID_DELETE_LIST = "tarsier.log.delete.pid.list"
REDIS_PID_DELETE_LIST_TEMP = "tarsier.log.delete.pid.list.temp"
table_list = []
table = 'tarsier_log_details'
instance = None

def \_\_new\_\_(cls, \*args, \*\*kwargs):  
    if cls.instance:  
        return cls.instance  
    else:  
        return super().\_\_new\_\_(cls)

@staticmethod  
def get\_table\_list(table):  
    ClearLog.table = table  
    # 判断表是否存在  
    if table in ClearLog.table\_list:  
        # print('表存在1')  
        pass  
    else:  
        ClearLog.cursor.execute("SHOW TABLES")  
        res = ClearLog.cursor.fetchall()  
        table\_temp = \[\]  
        for i in res:  
            table\_temp.append(i\[0\])  
        # print(table\_temp)  
        ClearLog.table\_list = table\_temp  
        if table in ClearLog.table\_list:  
            # print('表存在2')  
            pass  
        else:  
            # 创建表  
            sql = """create table %s like tarsier\_log\_details""" % (table)  
            try:  
                print('创建表')  
                ClearLog.cursor.execute(sql)  
            except Exception as e:  
                pass  
            ClearLog.cursor.execute("SHOW TABLES")  
            res = ClearLog.cursor.fetchall()  
            table\_temp = \[\]  
            for i in res:  
                table\_temp.append(i\[0\])  
            ClearLog.table\_list = table\_temp

# 更新数据库  
@staticmethod  
def updata\_db(data):  
    # ##################### 表名 #####################  
    table = "tarsier\_log\_details\_%s" % ClearLog.timestamp\_to\_str(format="%Y%m")

    ClearLog.get\_table\_list(table)  
    keys = ', '.join(data.keys())  
    values = ', '.join(\['%s'\] \* len(data))  
    # 实际用的是插入语句,不过加了ON DUPLICATE KEY UPDATE(主键存在,则执行更新操作)  
    sql = 'INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE'.format(table=table, keys=keys,  
                                                                                         values=values)  
    update = ','.join(\[" {key} = %s".format(key=key) for key in data\])  
    sql += update  
    try:  
        ClearLog.cursor.execute(sql, tuple(data.values()) \* 2)  
        print('update Successful')  
        ClearLog.conn.commit()  
    except Exception as e:  
        logger.error(e)  
        print('update Failed')

@staticmethod  
def update\_db\_sql(sql):  
    try:  
        ClearLog.cursor.execute(sql)  # 执行sql  
        ClearLog.conn.commit()  # 提交到数据库  
        print('更新成功')  
    except Exception as e:  
        print("ERROR:{}".format(str(e)))  
        ClearLog.conn.rollback()  # 发生错误则回滚  
        logger.info('error:%s' % str(e))

def \_\_call\_\_(self, \*args, \*\*kwargs):  
    pass

def \_\_init\_\_(self):  
    # ClearLog.main()  
    pass

@staticmethod  
def md5\_me(key):  
    md5 = hashlib.md5()  
    md5.update(str(key).encode('utf-8'))  
    value = md5.hexdigest()  
    return value

@staticmethod  
def main():  
    with open('20201110.log', encoding='utf-8') as f:  
        count = 0  
        for item in f:  
            line = item.strip()  
            data = ast.literal\_eval(line)  
            # 数据清洗 - 开始  
            ClearLog.clear\_log(data)  
            # print(ClearLog.\_\_dict\_\_)  
            count += 1  
            if count % 10000 == 0:  
                print(count)  
                # break

@staticmethod  
def main2(data):  
    # 数据清洗 - 开始  
    ClearLog.clear\_log(data)

@staticmethod  
def clear\_log(data):  
    res\_data = {}  
    rsUid = data.get('rsUid', '')  
    rsPageId = data.get('rsPageId', '')  
    rshyuid = data.get('rshyuid', '')  
    pageid = data.get('pageid', '')  
    pageUrl = data.get('pageUrl', '')  
    userAgent = data.get('userAgent', '')  
    referrer = data.get('referrer', '')  
    nowDate = data.get('nowDate', '')  
    device = data.get('device', '')  
    rshyType = data.get('rshyType', '')  
    targetDataset = str(data.get('targetDataset', ''))  
    targetValue = data.get('targetValue', '')  
    targetClassName = data.get('targetClassName', '')  
    inputData = str(data.get('inputData', ''))  
    rshyUserIp = data.get('rshyUserIp', '')  
    netloc = data.get('netloc', '')  
    urlPath = data.get('urlPath', '')  
    siteName = data.get('siteName', '')  
    TIME = ClearLog.timestamp\_to\_str()  
    ID = ClearLog.get\_uuid()  
    rshyTime = data.get('rshyTime', '')  
    try:  
        rsdate = rshyTime.split()\[0\]  
        temp = rshyTime.split()\[1\]  
        rshour = temp.split(':')\[0\]  
    except:  
        rsdate = ''  
        rshour = 0  
    res\_data.update({  
        "id": ID,  
        "rsuid": rsUid,  
        "rshytime": rshyTime,  
        "rshour": rshour,  
        "rsdate": rsdate,  
        "rspageid": rsPageId,  
        "rshyuid": rshyuid,  
        "pageid": pageid,  
        "pageurl": pageUrl,  
        "useragent": userAgent,  
        "referrer": referrer,  
        "device": device,  
        "rshytype": rshyType,  
        "targetvalue": targetValue,  
        "targetdataset": targetDataset,  
        "targetclassname": targetClassName,  
        "inputdata": inputData,  
        "starttime": nowDate,  
        "rshyuserip": rshyUserIp,  
        "netloc": netloc,  
        "urlpath": urlPath,  
        "sitename": siteName,  
        "createtime": TIME,  
        "updatetime": TIME,  
    })  
    if rshyType == 'view' or rshyType == '':  
        # 先判断这个值是否与存储一样  
        rsUidKey = rsPageId  # ClearLog.md5\_me(pageid)  
        # print("pid", rsUidKey)  
        if not rsPageId:  
            return

        # 一直刷新pid  
        ClearLog.CONN.hset(ClearLog.REDIS\_PID\_DELETE\_HASH, rsUidKey, nowDate)

        res\_temp = rsUid + pageUrl + referrer + userAgent + device  
        # print('##############')  
        res\_rs\_uid = ClearLog.md5\_me(res\_temp)  
        # print(res\_rs\_uid)  
        # 从redis中获取uid对应数据, 如果数据一样不做存储  
        exist\_uid = ClearLog.CONN.hget(ClearLog.REDIS\_PID\_HASH, rsUidKey)  
        # print(exist\_uid)  
        if not exist\_uid or res\_rs\_uid != str(exist\_uid):  
            ClearLog.CONN.hset(ClearLog.REDIS\_PID\_HASH, rsUidKey, res\_rs\_uid)  
            # 数据入库  
            ClearLog.write\_data(res\_data)  
            # 存储一份记录时间hash  
            ClearLog.CONN.hset(ClearLog.REDIS\_PID\_DELETE\_HASH, rsUidKey, nowDate)  
            # 并将此数据入删除队列  
            data\_temp = {"pid": rsUidKey, "date": nowDate}  
            ClearLog.CONN.lpush(ClearLog.REDIS\_PID\_DELETE\_LIST, json.dumps(data\_temp))  
        return  
    # if not rshyType:  
    #     return  
    ClearLog.write\_data(res\_data)

@staticmethod  
def write\_data(data):  
    ClearLog.updata\_db(data)  
    file\_name = ClearLog.timestamp\_to\_str\_m()  
    with open('clear{}{}.clear.log'.format(os.sep, file\_name), 'a+', encoding='utf-8') as f:  
        f.write(str(data) + '\\n')

# 格式化时间转时间戳  
@staticmethod  
def str\_to\_timestamp(str\_time=None, format='%Y-%m-%d %H:%M:%S'):  
    if str\_time:  
        time\_tuple = time.strptime(str\_time, format)  # 把格式化好的时间转换成元祖  
        result = time.mktime(time\_tuple)  # 把时间元祖转换成时间戳  
        return int(result)  
    return int(time.time())

# 把时间戳转换成格式化  
@staticmethod  
def timestamp\_to\_str(timestamp=None, format='%Y-%m-%d %H:%M:%S'):  
    if timestamp:  
        time\_tuple = time.localtime(timestamp)  # 把时间戳转换成时间元祖  
        result = time.strftime(format, time\_tuple)  # 把时间元祖转换成格式化好的时间  
        return result  
    else:  
        return time.strftime(format)

# 把时间戳转换成格式化  
@staticmethod  
def timestamp\_to\_str\_m(timestamp=None, format='%Y-%m-%d'):  
    if timestamp:  
        time\_tuple = time.localtime(timestamp)  # 把时间戳转换成时间元祖  
        result = time.strftime(format, time\_tuple)  # 把时间元祖转换成格式化好的时间  
        return result  
    else:  
        return time.strftime(format)

# uuid  
@staticmethod  
def get\_uuid():  
    res = str(uuid.uuid4())  
    UUID = ''.join(res.split('-'))  
    return UUID

# 每5分钟删除一次hash中的值,并将停留时间算出  
@staticmethod  
def del\_tarsier\_log\_pid\_hash():  
    table = ClearLog.table + '\_%s' % ClearLog.timestamp\_to\_str\_m(format='%Y%m')

    print('每5分钟删除一次hash中的值,并将停留时间算出')  
    get\_pid\_list = ClearLog.CONN.hgetall(ClearLog.REDIS\_PID\_DELETE\_HASH)  
    # print(get\_pid\_list)  
    for hash\_pid\_item in get\_pid\_list:

        redisDate = ClearLog.CONN.hget(ClearLog.REDIS\_PID\_DELETE\_HASH, hash\_pid\_item)  
        # 如果存储时间与当前时间相差1min之外更新  最后访问时间与停留时间 并将hash的值进行删除  否则不做处理  
        try:  
            redis\_data\_time = ClearLog.str\_to\_timestamp(redisDate)  
            now\_data\_time = time.time()  
            chufatime = now\_data\_time - redis\_data\_time  
            # starttime =  
            # staytime = redis\_data\_time - starttime  
            if chufatime >= 60:  
                # 进行更新操作  
                sql = """update {} set endtime='{}' where rspageid='{}'""".format(table, redisDate,  
                                                                                  hash\_pid\_item)  
                print(sql)  
                ClearLog.update\_db\_sql(sql)  
                # 更新完进行redis的值删除操作  
                ClearLog.CONN.hdel(ClearLog.REDIS\_PID\_DELETE\_HASH, hash\_pid\_item)  
        except Exception as e:  
            pass  
    print('====================================')

# 每一天清除一次队列信息  
@staticmethod  
def del\_tarsier\_log\_pid\_list():  
    logger.info('每一天清除一次队列信息')  
    res\_str = ClearLog.CONN.lpop(ClearLog.REDIS\_PID\_DELETE\_LIST)  
    while res\_str:  
        try:  
            # 并将此数据入删除队列  
            # data\_temp = {"pid": rsUidKey, "date": nowDate}  
            res\_json = json.loads(res\_str)  
            # print(res\_json)  
            nowDate = res\_json.get("date", '')  
            rsUidKey = res\_json.get("pid", '')  
            redis\_data\_time = ClearLog.str\_to\_timestamp(nowDate)  
            now\_data\_time = time.time()  
            chufatime = now\_data\_time - redis\_data\_time  
            if chufatime >= 24 \* 60 \* 60:  
                # 更新完进行redis的值删除操作  
                ClearLog.CONN.hdel(ClearLog.REDIS\_PID\_HASH, rsUidKey)  
                # print('删除')  
            else:  
                ClearLog.CONN.rpush(ClearLog.REDIS\_PID\_DELETE\_LIST\_TEMP, json.dumps(res\_json))  
            res\_str = ClearLog.CONN.lpop(ClearLog.REDIS\_PID\_DELETE\_LIST)  
        except:  
            pass  
    # print('处理队列')  
    res\_str = ClearLog.CONN.lpop(ClearLog.REDIS\_PID\_DELETE\_LIST\_TEMP)  
    while res\_str:  
        res\_json = json.loads(res\_str)  
        ClearLog.CONN.rpush(ClearLog.REDIS\_PID\_DELETE\_LIST, json.dumps(res\_json))  
        res\_str = ClearLog.CONN.lpop(ClearLog.REDIS\_PID\_DELETE\_LIST\_TEMP)  
    logger.info('清除完毕')

把时间戳转换成格式化

def timestamp_to_str_day(timestamp=None, format='%Y%m%d'):
if timestamp:
time_tuple = time.localtime(timestamp) # 把时间戳转换成时间元祖
result = time.strftime(format, time_tuple) # 把时间元祖转换成格式化好的时间
return result
else:
return time.strftime(format)

Connect to RabbitMQ and create channel

rabbit_host = "192.168.2.129"
rabbit_username = 'rshy'
rabbit_password = 'root1234@AWJSW'
queue_topic = 'logs.collect.statistics'
user = pika.PlainCredentials(rabbit_username, rabbit_password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbit_host, credentials=user,)) # heartbeat=0

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.129'))

channel = connection.channel()

Declare and listen queue

channel.queue_declare(queue=cfg.QUEUE_TOPIC)

channel.queue_declare(queue=queue_topic)

ClearLogObj = ClearLog()
def consumer():

print(' \[\*\] Waiting for messages. To exit press CTRL+C')

# Function process and print data  
def callback(ch, method, properties, body):  
    # print("Method: {}".format(method))  
    # print("Properties: {}".format(properties))

    data = json.loads(body)  
    # print("ID: {}".format(data\['id'\]))  
    # print("Name: {}".format(data\['name'\]))  
    # print('Description: {}'.format(data\['description'\]))  
    print("--data--:", data)  
    ClearLogObj.main2(data)  
    file\_name = timestamp\_to\_str\_day()  
    with open('consumer' + os.sep + file\_name + '.log', 'a+', encoding='utf-8') as f:  
        f.write(str(data) + '\\n')

# Listen and receive data from queue  
# channel.basic\_consume(cfg.QUEUE\_TOPIC, callback, True)  
channel.basic\_consume(queue\_topic, callback, True)  
channel.start\_consuming()

if __name__ == '__main__':
consumer()

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章