Python操作MySQL数据库(二)

pymsql是Python中操作MySQL的模块,其使用方法和MySQLdb几乎相同。

下载安装:

pip install pymysql

1.执行SQL语句

#!/usr/bin/env python# -*- coding:utf-8 -*-import pymysql # 创建连接conn = pymysql.connect(host=‘127.0.0.1‘, port=3306, user=‘root‘, passwd=‘123‘, db=‘t1‘)# 创建游标cursor = conn.cursor() # 执行SQL,并返回收影响行数effect_row = cursor.execute("update hosts set host = ‘1.1.1.2‘") # 执行SQL,并返回受影响行数#effect_row = cursor.execute("update hosts set host = ‘1.1.1.2‘ where nid > %s", (1,)) # 执行SQL,并返回受影响行数#effect_row = cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)]) # 提交,不然无法保存新建或者修改的数据conn.commit() # 关闭游标cursor.close()# 关闭连接conn.close()

2.获取新创建数据自增ID

#!/usr/bin/env python# -*- coding:utf-8 -*-import pymysql conn = pymysql.connect(host=‘127.0.0.1‘, port=3306, user=‘root‘, passwd=‘123‘, db=‘t1‘)cursor = conn.cursor()cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)])conn.commit()cursor.close()conn.close() # 获取最新自增IDnew_id = cursor.lastrowid

3.获取查询数据

#!/usr/bin/env python# -*- coding:utf-8 -*-import pymysql conn = pymysql.connect(host=‘127.0.0.1‘, port=3306, user=‘root‘, passwd=‘123‘, db=‘t1‘)cursor = conn.cursor()cursor.execute("select * from hosts") # 获取第一行数据row_1 = cursor.fetchone() # 获取前n行数据# row_2 = cursor.fetchmany(3)# 获取所有数据# row_3 = cursor.fetchall() conn.commit()cursor.close()conn.close()

 

注:在fetch数据时按照顺序进行,可以使用cursor.scroll(num,mode)来移动游标位置,如:

  • cursor.scroll(1,mode=‘relative‘)  # 相对当前位置移动
  • cursor.scroll(2,mode=‘absolute‘) # 相对绝对位置移动

4.fetch数据类型默认返回的是元祖类型,返回字典类型如下:

#!/usr/bin/env python# -*- coding:utf-8 -*-import pymysql conn = pymysql.connect(host=‘127.0.0.1‘, port=3306, user=‘root‘, passwd=‘123‘, db=‘t1‘) # 游标设置为字典类型cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)r = cursor.execute("call p1()") result = cursor.fetchone() conn.commit()cursor.close()conn.close()

注:返回字典类型,另外一种方式如下:

# -*- coding:utf-8 -*-# /user/bin/python# @Time : 2018/5/23 23:32# @Author : liyongle# @File : sql_ method.py
import contextlibimport osimport pymysql@contextlib.contextmanagerdef __get_mysql_db__( host="10.1.1.113", port=3306, user="liyongle", passwd="xxxx", db="test_lib", charset="utf8"): con = pymysql.connect( host=host, port=port, user=user, passwd=passwd, db=db, charset=charset, cursorclass=pymysql.cursors.DictCursor, ) cur = con.cursor() try: yield cur finally: con.commit() cur.close() con.close()def run_query(query): # 这个函数返回一个列表,列表里面的元素都是字典 with __get_mysql_db__() as cursor: cursor.execute(query) return cursor.fetchall()def main(): sql = "SELECT case_id FROM testcase LEFT JOIN req_method ON req_method.reuqest_id = testcase.method_id" run_query(sql)if __name__ == ‘__main__‘: main()

5.也可以将其封装成类,包含增删改查方法,使用时直接导入调用即可

安装数据库连接池

pip install DBUtils

具体代码如下:

# -*- coding:utf-8 -*-# /user/bin/python# @Time : 2018/5/24 0:16# @Author : liyongle# @File : sql_ method.pyimport pymysqlimport threadingfrom DBUtils.PooledDB import PooledDB, SharedDBConnectionPOOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=20, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 #maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host=‘192.168.11.38‘, port=3306, user=‘root‘, passwd=‘xxxxxx‘, db=‘test_db‘, charset=‘utf8‘)def connect(): # 创建连接 conn = POOL.connection() # 创建游标 cursor = conn.cursor(pymysql.cursors.DictCursor) return conn,cursordef close(conn,cursor): # 关闭游标 cursor.close() # 关闭连接 conn.close()def fetch_one(sql,args): conn,cursor = connect() # 执行SQL,并返回收影响行数 effect_row = cursor.execute(sql,args) result = cursor.fetchone() close(conn,cursor) return resultdef fetch_all(sql,args): conn, cursor = connect() # 执行SQL,并返回收影响行数 cursor.execute(sql,args) result = cursor.fetchall() close(conn, cursor) return resultdef insert(sql,args): """ 创建数据 :param sql: 含有占位符的SQL :return: """ conn, cursor = connect() # 执行SQL,并返回收影响行数 effect_row = cursor.execute(sql,args) conn.commit() close(conn, cursor)def delete(sql,args): """ 创建数据 :param sql: 含有占位符的SQL :return: """ conn, cursor = connect() # 执行SQL,并返回收影响行数 effect_row = cursor.execute(sql,args) conn.commit() close(conn, cursor) return effect_rowdef update(sql,args): conn, cursor = connect() # 执行SQL,并返回收影响行数 effect_row = cursor.execute(sql, args) conn.commit() close(conn, cursor) return effect_row

相关文章