# log 数据库连接
class LogMysql(object):
conn = None
cursor = None
def \_\_init\_\_(self):
self.conn = pymysql.connect(host='', user='',
password='',
database='log', charset='utf8')
self.cursor = self.conn.cursor()
# 为了方便使用一般会选择将查询结果加上字段名称以字典组的方式返回查询结果
def dict\_fetchall(self):
"Return all rows from a cursor as a dict"
# 获取查询字段
columns = \[col\[0\] for col in self.cursor.description\]
print(columns)
return \[dict(zip(columns, row)) for row in self.cursor.fetchall()\]
# 获取表列表
def get\_table\_list(self):
# 判断表是否存在
self.cursor.execute("SHOW TABLES")
res = self.cursor.fetchall()
table\_list = \[\]
for i in res:
table\_list.append(i\[0\])
# print("table\_list", table\_list)
return table\_list
class Redis(object):
conn = None
def \_\_init\_\_(self):
poll = redis.ConnectionPool(host='192.168.5.219', port=6379, db=14, password='root1234@A')
# 本地测试
# poll = redis.ConnectionPool(host='192.168.10.10', port=7000, db=14)
self.conn = redis.Redis(connection\_pool=poll)
class LogMysql(object):
conn = None
cursor = None
table = None
database = None
def __init__(self, database, table):
self.table = table
self.database = database
self.conn = pymysql.connect(host='rm-2zezqp8sll2swzwby.mysql.rds.aliyuncs.com', user='datacenter',
password='kbs11zx@',
database=self.database, charset='utf8')
# 本地测试
# self.conn = pymysql.connect(host='192.168.10.5', user='root',
# password='root',
# database='unionlog', charset='utf8')
self.cursor = self.conn.cursor()
# 为了方便使用一般会选择将查询结果加上字段名称以字典组的方式返回查询结果
def dict_fetchall(self):
"Return all rows from a cursor as a dict"
# 获取查询字段
columns = [col[0] for col in self.cursor.description]
# print(columns)
return [dict(zip(columns, row)) for row in self.cursor.fetchall()]
# 插入数据库
def insert_db(self, data):
# ##################### 表名 #####################
table = self.table
keys = ', '.join(data.keys())
values = ', '.join(['%s'] * len(data))
sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table, keys=keys, values=values)
try:
self.cursor.execute(sql, tuple(data.values()))
print('insert Successful')
self.conn.commit()
self.cursor.close()
self.conn.close()
except Exception as e:
print('insert Failed')
self.conn.rollback()
self.cursor.close()
self.conn.close()
# 更新数据库
def updata_db(self, data):
# ##################### 表名 #####################
table = self.table
keys = ', '.join(data.keys())
values = ', '.join(['%s'] * len(data))
# 实际用的是插入语句,不过加了ON DUPLICATE KEY UPDATE(主键存在,则执行更新操作)
sql = 'INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE'.format(table=table, keys=keys,
values=values)
update = ','.join([" {key} = %s".format(key=key) for key in data])
sql += update
try:
self.cursor.execute(sql, tuple(data.values()) * 2)
print('update Successful')
self.conn.commit()
self.cursor.close()
self.conn.close()
except Exception as e:
print('update Failed')
self.cursor.close()
self.conn.close()
def update_db_sql(self, sql):
# sql="""update contact set mobile=100088 where name='kate'"""
try:
self.cursor.execute(sql) #执行sql
self.conn.commit() #提交到数据库
# db_cursor.execute("select * from contact")
# ars=db_cursor.fetchall()
# for rs in ars:
# print(rs)
print('更新成功')
except Exception as e:
print("error to update:",e)
self.conn.rollback() #发生错误则回滚
self.cursor.close()
self.conn.close()
手机扫一扫
移动阅读更方便
你可能感兴趣的文章