FLask插件
阅读原文时间:2023年07月09日阅读:4

Flask插件

  • 下载

    pip install Flask-session
  • 导入

    from flask_session import Session
  • 实例化session

    在__init__文件中

    def create_app():
        app = Flask(__name__)
        Session(app)
        return app
  • 配置文件

    SESSION_TYPE = 'redis'
  • 用法和内置session一样

  • 实现原理

    • 内置的session调用 session_interface = xxxx

      • xxxx.open_session 解密cookie转化成字典给session
      • xxxx.save_session 将session加密给cookie
    • Flask-session 修改session_interface 制定的类

      • 来改变session存储的位置

创建连接

from sqlalchemy import create_engine
conn = create_engine(
    # 'mysql+pymysql://rout用户:密码@连接地址:端口号/数据库名?charset=编码方式'
    'mysql+pymysql://root:123@127.0.0.1:3306/day103?charset=ustf8',
    max_overflow = 0,   # 超过连接池大小外最多创建的连接数
    pool_size = 5,    # 连接池大小
    pool_timeout=30,    # 连接池中没有线程最多等待时间,否则报错
    pool_recycle=-1,    # 多久之后对连接池中的连接进行回收(重置), -1不回收

)

如何创建表

  • 导入

    from sqlalchemy.ext.declarative import declarative_base
    Base = declarative_base()
  • 单表创建

    class Book(Base):
       __tablename__ = "book"
    
       id = Column(Integer, primary_key=True)
       title = Column(String(32), nullable=False,index=True)
    
       def __repr__(self):
          return self.title
    
       __table_args__ = (
          # 联合唯一
          UniqueConstraint("id", "title", name="uni_id_title"),
          # 联合索引
          Index("id", "title")
       )
    
    
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, DateTime
    from sqlalchemy import Index, UniqueConstraint
    import datetime
    # 连接数据库
    ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)
    # 实例化
    Base = declarative_base()
    
    # 创建单表,继承Base
    class UserInfo(Base):
        __tablename__ = "user_info"
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)
    # 类似与django中的class Meta, 针对于本表 __table_args__ = ( # 设置联合唯一 UniqueConstraint("id", "name", name="uni_id_name"), # 设置联合索引 Index("name", "email") ) # 函数,方便创建表 def create_db(): Base.metadata.create_all(ENGINE) # 方便删除 def drop_db(): Base.metadata.drop_all(ENGINE) if __name__ == '__main__': # 执行 create_db()
  • 一对多表创建

    class Book(Base):
       __tablename__ = "book"
    
       id = Column(Integer, primary_key=True)
       title = Column(String(32), nullable=False)
       publisher_id = Column(Integer, ForeignKey("publisher.id"))
       # 不生成字段建立关系 方便操作
       # 一对多
       publisher = relationship("Publisher", backref="books")
    
    
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, DateTime
    from sqlalchemy import Index, UniqueConstraint, ForeignKey
    from sqlalchemy.orm import relationship
    import datetime
    
    ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)
    
    Base = declarative_base()
    
    # ======一对多示例=======
    class UserInfo(Base):
        __tablename__ = "user_info"
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)
    # FK字段的建立
    hobby_id = Column(Integer, ForeignKey("hobby.id"))
    # 不生成表结构 方便查询使用
    hobby = relationship("Hobby", backref="user")
    
    __table_args__ = (
        UniqueConstraint("id", "name", name="uni_id_name"),
        Index("name", "email")
    )
    class Hobby(Base): __tablename__ = "hobby"
    id = Column(Integer, primary_key=True)
    title = Column(String(32), default="码代码")
    def create_db(): Base.metadata.create_all(ENGINE) def drop_db(): Base.metadata.drop_all(ENGINE) if __name__ == '__main__': create_db() # drop_db()
  • 多对对创建

    • 第三张表自己生成

      class Book(Base):
         __tablename__ = "book"
      
         id = Column(Integer, primary_key=True)
         title = Column(String(32), nullable=False)
         publisher_id = Column(Integer, ForeignKey("publisher.id"))
         # 不生成字段建立关系 方便操作
         # 一对多
         publisher = relationship("Publisher", backref="books")
         # 多对多
         tags = relationship("Tag", secondary="book2tag", backref="books")
      class Book2Tag(Base):
         __tablename__ = "book2tag"
      
         id = Column(Integer, primary_key=True)
         book_id = Column(Integer, ForeignKey("book.id"))
         tag_id = Column(Integer, ForeignKey("tag.id"))
      
      
      from sqlalchemy import create_engine
      from sqlalchemy.ext.declarative import declarative_base
      from sqlalchemy import Column, Integer, String, DateTime
      from sqlalchemy import Index, UniqueConstraint, ForeignKey
      from sqlalchemy.orm import relationship
      import datetime
      
      ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)
      
      Base = declarative_base()
      
      # ======多对多示例=======
      class Book(Base):
          __tablename__ = "book"
      id = Column(Integer, primary_key=True)
      title = Column(String(32))
      # 不生成表字段 仅用于查询方便
      tags = relationship("Tag", secondary="book2tag", backref="books")
      class Tag(Base): __tablename__ = "tag"
      id = Column(Integer, primary_key=True)
      title = Column(String(32))
      class Book2Tag(Base): __tablename__ = "book2tag"
      id = Column(Integer, primary_key=True)
      book_id = Column(Integer, ForeignKey("book.id"))
      tag_id = Column(Integer, ForeignKey("tag.id"))
      def create_db(): Base.metadata.create_all(ENGINE) def drop_db(): Base.metadata.drop_all(ENGINE) if __name__ == '__main__': create_db() # drop_db()
    • 创建表命令

      Base.metadata.create_all(conn) # conn是连接池对象

对数据库表的操作(增删改查)

  • 创建管理器

    from sqlalchemy.orm import sessionmaker, scoped_session
    Session = sessionmaker(bind=conn)
    # 线程安全 根本本地线程会使用一个session
    session = scoped_session(Session)
    
    
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker, scoped_session
    from models_demo import Tag
    
    ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)
    
    Session = sessionmaker(bind=ENGINE)
    
    # 每次执行数据库操作的时候,都需要创建一个session
    
    # 线程安全,基于本地线程实现每个线程用同一个session
    
    session = scoped_session(Session)
    
    # =======执行ORM操作==========
    tag_obj = Tag(title="SQLAlchemy")
    # 添加
    session.add(tag_obj)
    # 提交
    session.commit()
    # 关闭session
    session.close()
  • 基本增删改查

    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker, scoped_session
    from models_demo import Tag, UserInfo
    import threading
    
    ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",)
    
    Session = sessionmaker(bind=ENGINE)
    
    # 每次执行数据库操作的时候,都需要创建一个session
    session = Session()
    session = scoped_session(Session)
    
    # ============添加============
    # tag_obj = Tag(title="SQLAlchemy")
    # # 添加
    # session.add(tag_obj)
    # session.add_all([
    #     Tag(title="Python"),
    #     Tag(title="Django"),
    # ])
    # # 提交
    # session.commit()
    # # 关闭session
    # session.close()
    
    # ============基础查询============
    # ret1 = session.query(Tag).all()
    # ret2 = session.query(Tag).filter(Tag.title == "Python").all()
    # ret3 = session.query(Tag).filter_by(title="Python").all()
    # ret4 = session.query(Tag).filter_by(title="Python").first()
    # print(ret1, ret2, ret3, ret4)
    
    # ============删除===========
    # session.query(Tag).filter_by(id=1).delete()
    # session.commit()
    
    # ===========修改===========
    session.query(Tag).filter_by(id=22).update({Tag.title: "LOL"})
    session.query(Tag).filter_by(id=23).update({"title": "王者毒药"})
    session.query(Tag).filter_by(id=24).update({"title": Tag.title + "~"}, synchronize_session=False)
    # synchronize_session="evaluate" 默认值进行数字加减
    session.commit()
  • 常用操作

    # 条件查询
    ret1 = session.query(Tag).filter_by(id=22).first()
    ret2 = session.query(Tag).filter(Tag.id > 1, Tag.title == "LOL").all()
    ret3 = session.query(Tag).filter(Tag.id.between(22, 24)).all()
    ret4 = session.query(Tag).filter(~Tag.id.in_([22, 24])).first()
    from sqlalchemy import and_, or_
    ret5 = session.query(Tag).filter(and_(Tag.id > 1, Tag.title == "LOL")).first()
    ret6 = session.query(Tag).filter(or_(Tag.id > 1, Tag.title == "LOL")).first()
    ret7 = session.query(Tag).filter(or_(
        Tag.id>1,
        and_(Tag.id>3, Tag.title=="LOL")
    )).all()
    # 通配符
    ret8 = session.query(Tag).filter(Tag.title.like("L%")).all()
    ret9 = session.query(Tag).filter(~Tag.title.like("L%")).all()
    # 限制
    ret10 = session.query(Tag).filter(~Tag.title.like("L%")).all()[1:2]
    # 排序
    ret11 = session.query(Tag).order_by(Tag.id.desc()).all()  # 倒序
    ret12 = session.query(Tag).order_by(Tag.id.asc()).all()  # 正序
    # 分组
    ret13 = session.query(Tag.test).group_by(Tag.test).all()
    # 聚合函数
    from sqlalchemy.sql import func
    ret14 = session.query(
        func.max(Tag.id),
        func.sum(Tag.test),
        func.min(Tag.id)
    ).group_by(Tag.title).having(func.max(Tag.id > 22)).all()
    # 连表
    ret15 = session.query(UserInfo, Hobby).filter(UserInfo.hobby_id == Hobby.id).all()
    # print(ret15) 得到一个列表套元组 元组里是两个对象
    ret16 = session.query(UserInfo).join(Hobby).all()
    # print(ret16) 得到列表里面是前一个对象
    # 相当于inner join
    # for i in ret16:
    #     # print(i[0].name, i[1].title)
    #     print(i.hobby.title)
    ret17 = session.query(Hobby).join(UserInfo, isouter=True).all()
    ret17_1 = session.query(UserInfo).join(Hobby, isouter=True).all()
    ret18 = session.query(Hobby).outerjoin(UserInfo).all()
    ret18_1 = session.query(UserInfo).outerjoin(Hobby).all()
    # 相当于left join
    print(ret17)
    print(ret17_1)
    print(ret18)
    print(ret18_1)
  • 基于relationship的Fk

    # 基于relationship的FK
    # 添加
    user_obj = UserInfo(name="提莫", hobby=Hobby(title="种蘑菇"))
    session.add(user_obj)
    
    hobby = Hobby(title="弹奏一曲")
    hobby.user = [UserInfo(name="琴女"), UserInfo(name="妹纸")]
    session.add(hobby)
    session.commit()
    
    # 基于relationship的正向查询
    user_obj_1 = session.query(UserInfo).first()
    print(user_obj_1.name)
    print(user_obj_1.hobby.title)
    
    # 基于relationship的反向查询
    hb = session.query(Hobby).first()
    print(hb.title)
    for i in hb.user:
        print(i.name)
    
    session.close()
  • 基于relationship的M2M

    # 添加
    book_obj = Book(title="Python源码剖析")
    tag_obj = Tag(title="Python")
    b2t = Book2Tag(book_id=book_obj.id, tag_id=tag_obj.id)
    session.add_all([
        book_obj,
        tag_obj,
        b2t,
    ])
    session.commit()
    
    #  上面有坑哦~~~~
    book = Book(title="测试")
    book.tags = [Tag(title="测试标签1"), Tag(title="测试标签2")]
    session.add(book)
    session.commit()
    
    tag = Tag(title="LOL")
    tag.books = [Book(title="大龙刷新时间"), Book(title="小龙刷新时间")]
    session.add(tag)
    session.commit()
    
    # 基于relationship的正向查询
    book_obj = session.query(Book).filter_by(id=4).first()
    print(book_obj.title)
    print(book_obj.tags)
    # 基于relationship的反向查询
    tag_obj = session.query(Tag).first()
    print(tag_obj.title)
    print(tag_obj.books)
  • 下载

    pip install flask-script
  • 导入

    from flask_demo import create_app
    from flask_script import Manager
  • 实例化

    app = create_app()
    # 实例化
    manager = Manager(app)
    
    if __name__ == '__main__':
       # app.run()
       manager.run()
  • 自定义的命令

  • !!! 依赖flask-script

  • 下载

    pip install flask-migrate
  • 导入

    from flask_Migrate import Migrate, MigrateCommand
  • 实例化

    Migrate(app)
    
    manager.add_command('db', MigrateCommand)
    """
        python manager.py db init
        python manager.py db migrate # 类型于makemigrations
        python manager.py db uprade  # migrate
    """
师博客:

https://www.cnblogs.com/GGGG-XXXX/articles/9447619.html