python 模块 SQLalchemy

 

‘‘‘# &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&# SQLAlchemy ORM框架(需要依赖数据库API语言)# 架构图# SQLalchemy orm# SQLalchemy core# --> schema/types 架构/类型# --> sql expression language SQL表达式语言# --> engine(框架引擎)# --> connection pooling (数据库连接池)# --> diaiect (选择连接数据路的DB api种类)# DBAPI(pymysql,oracle,sqlite)目标: 类/对箱操作 -> SQL -> pymsql、mysqldb -> 在去数据中执行连接数据库: MYSQL - python mysql+mysqldb://<user>:<password>@<host>:<port>/<dbname> pymysql mysql+pymsql://<username>:<password>@<host>/<dbname> 示例: "mysql+pymysql://root:123456@127.0.0.1:3306/t1?charset=utf8" cx_Oracle oracle+cx_oracle://user:pwd@host:port/dbname 基本使用 import sqlalchemy from sqlalchemy import create_engine engine = create_engine import sqlalchemy from sqlalchemy import create_engine import pymysql import threading # 基本连接操作:(不推荐这种用法!) engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/anec?charset=utf8", max_overflow=2, pool_size=5, pool_timeout=30, pool_recycle=-1, ) def task(arg): conn = engine.raw_connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) cursor.execute(‘select id,name from student;‘) result = cursor.fetchall() print(result) cursor.close() conn.close() for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start()#------------------------------ # 基本连接操作:(推荐这种用法!)# import sqlalchemy# from sqlalchemy import create_engine# from sqlalchemy.orm import scoped_session# from sqlalchemy.orm import sessionmaker# # engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/anec?charset=utf8", pool_size = 5)# Session = sessionmaker(bind=engine)# # # 这种方式的话,就是多线程的,scoped_session 传递Session值至类,封装了treading.local() ,从而达到多线程# sess = scoped_session(Session)# obj = modeles.表名(name=‘axlex‘,email=‘123@qq.com‘)# sess.add(obj)# sess.commit()#------------------------------# 创建数据表from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_base# from sqlalchemy import Column,Integer,String......from sqlalchemy import *Base = declarative_base()class Info(Base): __tablename__=‘info‘ id = Column(Integer,primary_key=True) name = Column(String(64),index=True,nullable=False)def init_db(): """ 用类 创建数据表 :return: """ engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/anec?charset=utf8", max_overflow=2, pool_size=5, pool_timeout=30, pool_recycle=-1, ) Base.metadata.create_all(engine)def drop_db(): """ 用类 创建数据表 :return: """ engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/anec?charset=utf8", max_overflow=2, pool_size=5, pool_timeout=30, pool_recycle=-1, ) Base.metadata.drop_all(engine)# sqlalchemy默认不能修改表!!!!!!!# init_db()# ----------------------------------# 操作数据from sqlalchemy.orm import sessionmakerengine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/anec?charset=utf8", max_overflow=2, pool_size=5, pool_timeout=30, pool_recycle=-1, )conn = sessionmaker(bind=engine)session = conn()# 添加单条数据# obj1 = Info(name=‘anec‘)# session.add(obj1)# 添加多条数据# data = [# Info(name=‘adasda‘),# Info(name=‘aaaaa‘),# Info(name=‘bbbbb‘),# ]# session.add_all(data)# 查询数据# 查询所有数据result = session.query(表名).all()for i in result; print(i.id) print(i.name)# 条件查询数据result = session.query(表名).filter(表名.字段 > 2)# 删除数据result = session.query(表名).filter(表名.字段 > 2).delete()# 修改数据result = session.query(表名).filter(表名.字段 = 2).update({‘字段‘:"新数值"})result = session.query(表名).filter(表名.字段 > 2).update({‘字段‘:"字符字段"+字符},synchronize_session=False)result = session.query(表名).filter(表名.字段 > 2).update({‘字段‘:"数值字段"+数值},synchronize_session=evaluate)session.commit()session.close()‘‘‘

 

相关文章