mysql-redis连接
阅读原文时间:2023年07月09日阅读:1

# log 数据库连接
class LogMysql(object):
conn = None
cursor = None

def \_\_init\_\_(self):  
    self.conn = pymysql.connect(host='', user='',  
                        password='',  
                        database='log', charset='utf8')  
    self.cursor = self.conn.cursor()

# 为了方便使用一般会选择将查询结果加上字段名称以字典组的方式返回查询结果  
def dict\_fetchall(self):  
    "Return all rows from a cursor as a dict"  
    # 获取查询字段  
    columns = \[col\[0\] for col in self.cursor.description\]  
    print(columns)  
    return \[dict(zip(columns, row)) for row in self.cursor.fetchall()\]

# 获取表列表  
def get\_table\_list(self):  
    # 判断表是否存在  
    self.cursor.execute("SHOW TABLES")  
    res = self.cursor.fetchall()  
    table\_list = \[\]  
    for i in res:  
        table\_list.append(i\[0\])  
    # print("table\_list", table\_list)  
    return table\_list

redis主库

class Redis(object):
conn = None

def \_\_init\_\_(self):  
    poll = redis.ConnectionPool(host='192.168.5.219', port=6379, db=14, password='root1234@A')  
    # 本地测试  
    # poll = redis.ConnectionPool(host='192.168.10.10', port=7000, db=14)  
    self.conn = redis.Redis(connection\_pool=poll)  

class LogMysql(object):

    conn = None

    cursor = None

    table = None

    database = None

    def __init__(self, database, table):

        self.table = table

        self.database = database

        self.conn = pymysql.connect(host='rm-2zezqp8sll2swzwby.mysql.rds.aliyuncs.com', user='datacenter',

                                    password='kbs11zx@',

                                    database=self.database, charset='utf8')

        # 本地测试

        # self.conn = pymysql.connect(host='192.168.10.5', user='root',

        #                             password='root',

        #                             database='unionlog', charset='utf8')

        self.cursor = self.conn.cursor()

    # 为了方便使用一般会选择将查询结果加上字段名称以字典组的方式返回查询结果

    def dict_fetchall(self):

        "Return all rows from a cursor as a dict"

        # 获取查询字段

        columns = [col[0] for col in self.cursor.description]

        # print(columns)

        return [dict(zip(columns, row)) for row in self.cursor.fetchall()]

    # 插入数据库

    def insert_db(self, data):

        # ##################### 表名 #####################

        table = self.table

        keys = ', '.join(data.keys())

        values = ', '.join(['%s'] * len(data))

        sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table, keys=keys, values=values)

        try:

            self.cursor.execute(sql, tuple(data.values()))

            print('insert Successful')

            self.conn.commit()

            self.cursor.close()

            self.conn.close()

        except Exception as e:

            print('insert Failed')

            self.conn.rollback()

            self.cursor.close()

            self.conn.close()

    # 更新数据库

    def updata_db(self, data):

        # ##################### 表名 #####################

        table = self.table

        keys = ', '.join(data.keys())

        values = ', '.join(['%s'] * len(data))

        # 实际用的是插入语句,不过加了ON DUPLICATE KEY UPDATE(主键存在,则执行更新操作)

        sql = 'INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE'.format(table=table, keys=keys,

                                                                                             values=values)

        update = ','.join([" {key} = %s".format(key=key) for key in data])

        sql += update

        try:

            self.cursor.execute(sql, tuple(data.values()) * 2)

            print('update Successful')

            self.conn.commit()

            self.cursor.close()

            self.conn.close()

        except Exception as e:

            print('update Failed')

            self.cursor.close()

            self.conn.close()

    def update_db_sql(self, sql):

        # sql="""update contact set mobile=100088 where name='kate'"""

        try:

            self.cursor.execute(sql)  #执行sql

            self.conn.commit() #提交到数据库

            # db_cursor.execute("select * from contact")

            # ars=db_cursor.fetchall()

            # for rs in ars:

            #     print(rs)

            print('更新成功')

        except Exception as e:

            print("error to update:",e)

            self.conn.rollback()  #发生错误则回滚

            self.cursor.close()

            self.conn.close()