from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_baseengine = create_engine("mysql+pymysql://root:Leon@localhost/study?charset=utf8",encoding='utf-8',echo=True)Base = declarative_base()
from sqlalchemy import Column, Integer, Stringclass User(Base): __tablename__ = 'user' # 表名 # 各个字段 id = Column(Integer, primary_key=True) # 主键 name = Column(String(32)) password = Column(String(64)) def __repr__(self): # 返回数据定义 return "[%s,%s,%s]"%(self.id,self.name,self.password)Base.metadata.create_all(engine) # 创建表结构
# Column中还可以设定是否唯一:unique=True # 设置值不为空:nullable=False # 时间格式为:DateTime # 枚举值:Column(Enum("1","2","3")) # 布尔值:Boolean,表现形式为True/False
Session_class = sessionmaker(bind=engine) # 创建与数据库的会话session class ,注意,这里返回给session的是个class,不是实例session = Session_class() # 生成session实例#adduser_obj = User(name="Leon", password="123456")user_obj1 = User(name="张三", password="156")session.add_all([user_obj,user_obj1]) #添加# session.add(obj1)这样可以单个的添加session.comiit() #事务提交
# 返回数据需要在类中的__repr__中设定session.query(User).filter(User.name=="Leon").first() # 查询第一个session.query(User).filter(User.name=="Leon").all() # 查询多个,返回一个可循环列表对象# 多个查询使用多个filter进行过滤# 也可以在filter中用逗号`,`来and匹配
user_obj = session.query(User).filter(User.name=="Leon").first()user_obj.password = "NewPassword"session.comiit()
obj = session.query(User).filter(User.name=="Leon").first()session.delete(obj)session.commit()
print(session.query(User).filter(User.name=="Leon").count())
from sqlalchemy import funcprint(session.query(func.count(User.name),User.name).group_by(User.name).all())
1.设置外键
+ 场景:一个学生拥有多个上课记录+ 数据库表结构
class Student(Base): __tablename__ = "student" id = Column(Integer,primary_key=True) name = Column(String(32),nullable=False) register_data = Column(DATE,nullable=False) def __repr__(self): return "%s,%s" % (self.id, self.name)class StudyRecord(Base): __tablename__ = 'study_record' id = Column(Integer, primary_key=True) day = Column(Integer,nullable=False) status = Column(String(32),nullable=False) stu_id = Column(Integer,ForeignKey('student.id')) # 加上一个关系,在这个表中可以通过student来反查student表中的数据,而在student表中可以通过my_study_record来反查StudyRecord中的数据 student = relationship("Student",backref="my_study_record") def __repr__(self): return "%s day:%s status: %s"%(self.student.name,self.day,self.status)
2.在表中查询数据
# 查询某个人的上课记录stu_obj = session.query(Student).filter(Student.name=="Leon").first()print(stu_obj.my_study_record) # 在student表中通过my_study_record来反查StudyRecord中的数据,显示跟局repr来定,此处返回字符串# 连表反向查询,查询没来上课的人record_obj = session.query(StudyRecord).filter(StudyRecord.status=="NO").first()print(record_obj.student)
1.多外键
class Customer(Base): __tablename__ = 'customer' id = Column(Integer, primary_key=True) name = Column(String(64)) billing_address_id = Column(Integer, ForeignKey("address.id")) shipping_address_id = Column(Integer, ForeignKey("address.id")) # 一个人对应一个账单地址和一个购物地址 address_b = relationship("Address",backref="my_addr_bill",foreign_keys=[billing_address_id]) address_s = relationship("Address",backref="my_addr_shop",foreign_keys=[shipping_address_id]) def __repr__(self): return "name:%s"%self.nameclass Address(Base): __tablename__ = 'address' id = Column(Integer, primary_key=True) street = Column(String(64)) city = Column(String(64)) state = Column(String(64)) def __repr__(self): return "street:%s|city:%s|state:%s"%(self.street,self.city,self.state)
2.添加数据
a1 = Address(street='望京',city='北京',state="中国")a2 = Address(street='雁塔',city='西安',state="中国")a3 = Address(street='太原',city='山西',state="中国")session.add_all([a1,a2,a3])c1 = Customer(name="Leon",address_b=a1,address_s=a2)c2 = Customer(name="eric",address_b=a1,address_s=a3)session.add_all([c1,c2])session.commit()
3.在此表中查询数据
# 通过名字直接查询他的两个地址customer_obj = session.query(Customer).filter(Customer.name=="Leon").first()print(customer_obj.address_b,customer_obj.address_s)# 通过地址查询该地址属于谁Add_obj = session.query(Address).filter(Address.city=="北京").first()print(Add_obj.my_addr_bill)
1.创建表结构
# 一个作者可以出版很多本书,一本书有很多个作者,直接用第三张表来讲作者表和图书表关联起来class Author(Base): __tablename__ = 'authors' id = Column(Integer, primary_key=True) name = Column(String(32)) # 作者 def __repr__(self): return self.namebook_m2m_author = Table('book_m2m_author', Base.metadata, Column('book_id',Integer,ForeignKey('books.id')), Column('author_id',Integer,ForeignKey('authors.id')), )class Book(Base): __tablename__ = 'books' id = Column(Integer,primary_key=True) name = Column(String(64)) pub_date = Column(DATE) # 出版日期 authors = relationship('Author',secondary=book_m2m_author,backref='books') def __repr__(self): return self.name
2.添加数据
# 添加数据b1 = Book(name='Python',pub_date='2017-08-29')b2 = Book(name='Linux',pub_date='2011-5-2')b3 = Book(name='PHP',pub_date='2012-3-9')b4 = Book(name='Java',pub_date='2015-11-2')a1 = Author(name="Leon")a2 = Author(name="Jack")a3 = Author(name="Rain")#建立关系b1.authors=[a1,a2]b2.authors=[a1,a3]b3.authors=[a2,a3]b4.authors=[a1,a2,a3]Session_class = sessionmaker(bind=engine)session = Session_class()session.add_all([b1,b2,b3,b4,a1,a2,a3])session.commit()
3.数据查询
book_obj = session.query(Book).filter(Book.name=='Python').first()print(book_obj.authors)authors_obj = session.query(Author).filter(Author.name=="Leon").first()print(authors_obj.books)
4.此时删除数据
author_obj =s.query(Author).filter_by(name="Jack").first() book_obj = s.query(Book).filter_by(name="Python").first() book_obj.authors.remove(author_obj) #从一本书里删除一个作者session.commit()
book_m2m_author
表中的数据也会自动更新author_obj =s.query(Author).filter_by(name="Alex").first()session.delete(author_obj)session.commit()