python DBUtils 线程池 连接 Postgresql(多线程公用线程池,DB-API : psycopg2)

一、DBUtils

DBUtils 是一套允许线程化 Python 程序可以安全和有效的访问数据库的模块,DBUtils提供两种外部接口: PersistentDB :提供线程专用的数据库连接,并自动管理连接。 PooledDB :提供线程间可共享的数据库连接,并自动管理连接。

操作数据库模板:

 1 import datetime 2 import sys 3 import os 4 import configparser 5 import logging 6 import psycopg2 7  8 from DBUtils.PooledDB import PooledDB 9  10  11  12  13 class DatabaseOperator(object): 14 ‘‘‘ 15  class for database operator 16 ‘‘‘ 17  18  19 def __init__(self,  20 database_config_path, database_config=None): 21 ‘‘‘ 22  Constructor 23 ‘‘‘ 24 self._database_config_path = database_config_path 25  26 # load database configuration 27 if not database_config : 28 self._database_config = self.parse_postgresql_config(database_config_path) 29 else: 30 self._database_config = database_config 31 self._pool = None 32  33 def database_config_empty(self): 34 if self._database_config: 35 return False 36 else: 37 return True 38  39 def parse_postgresql_config(self, database_config_path=None): 40 ‘‘‘解析pei数据库配置文件 41  参数 42  --------- 43  arg1 : conf_file 44  数据库配置文件路径 45  返回值 46  -------- 47  dict 48  解析配置属性dict--config 49  50  示例 51  -------- 52  53 ‘‘‘ 54 if database_config_path == None and self._database_config_path != None: 55 database_config_path = self._database_config_path 56 if not os.path.isfile(database_config_path): 57 sys.exit("ERROR: Could not find configuration file: {0}".format(database_config_path)) 58 parser = configparser.SafeConfigParser() 59  parser.read(database_config_path) 60 config = {} 61 config[database] = parser.get(UniMonDB, Database) 62 config[db_user] = parser.get(UniMonDB, UserName) 63 config[db_passwd] = parser.get(UniMonDB, Password) 64 config[db_port] = parser.getint(UniMonDB, Port) 65 config[db_host] = parser.get(UniMonDB, Servername) 66 self._database_config = config 67  68 return config  69  70  71 def get_pool_conn(self): 72  73 if not self._pool: 74  self.init_pgsql_pool() 75 return self._pool.connection() 76  77 def init_pgsql_pool(self): 78 ‘‘‘利用数据库属性连接数据库 79  参数 80  --------- 81  arg1 : config 82  数据库配置属性 83  返回值 84  -------- 85  86  示例 87  -------- 88  89 ‘‘‘ 90 # 字典config是否为空 91 config = self.parse_postgresql_config() 92 POSTGREIP = config[db_host] 93 POSTGREPORT = config[db_port] 94 POSTGREDB = config[database] 95 POSTGREUSER = config[db_user] 96 POSTGREPASSWD = config[db_passwd] 97 try: 98 logging.info(Begin to create {0} postgresql pool on:{1}.\n.format(POSTGREIP, datetime.datetime.now())) 99 100 pool = PooledDB(101 creator=psycopg2, # 使用链接数据库的模块mincached102 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数103 mincached=1, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建104 maxcached=4, # 链接池中最多闲置的链接,0和None不限制105 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错106 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制107 setsession=[], # 开始会话前执行的命令列表。108 host=POSTGREIP,109 port=POSTGREPORT,110 user=POSTGREUSER,111 password=POSTGREPASSWD,112 database=POSTGREDB)113 self._pool = pool 114 logging.info(SUCCESS: create postgresql success.\n)115 116 except Exception as e:117 logging.error(ERROR: create postgresql pool failed:{0}\n)118  self.close_db_cursor()119 sys.exit(ERROR: create postgresql pool error caused by {0}.format(str(e)))120 121 122 def pg_select_operator(self, sql):123 ‘‘‘进行查询操作,函数返回前关闭cursor,conn124  参数125  ---------126  arg1 : sql查询语句127  返回值128  --------129  list:result130  类型为list的查询结果:result131 132  示例133  --------134 135 ‘‘‘136 # 执行查询137 try:138 conn = self.get_pool_conn()139 cursor = conn.cursor() 140  cursor.execute(sql)141 result = cursor.fetchall()142 except Exception as e:143 logging.error(ERROR: execute {0} causes error.format(sql))144 sys.exit(ERROR: load data from database error caused {0}.format(str(e)))145 finally:146  cursor.close()147  conn.close() 148 return result149 150 def test_pool_con(self):151 sql = select * from tbl_devprofile152 result = self.pg_select_operator(sql)153 print(result)154 155 def pg_insert_operator(self, sql):156 157 result = False158 try:159 conn = self.get_pool_conn()160 cursor = conn.cursor() 161  cursor.execute(sql)162 result = True163 except Exception as e:164 logging.error(ERROR: execute {0} causes error.format(sql))165 sys.exit(ERROR: insert data from database error caused {0}.format(str(e)))166 finally:167  cursor.close()168  conn.commit()169  conn.close() 170 return result171 172 def pg_update_operator(self, sql):173 174 result = False175 try:176 conn = self.get_pool_conn()177 cursor = conn.cursor() 178  cursor.execute(sql)179 result = True180 except Exception as e:181 logging.error(ERROR: execute {0} causes error.format(sql))182 sys.exit(ERROR: update data from database error caused {0}.format(str(e)))183 finally:184  cursor.close()185  conn.commit()186  conn.close() 187 return result188 189 def pg_delete_operator(self, sql):190 result = False191 # 执行查询192 try:193 conn = self.get_pool_conn()194 cursor = conn.cursor() 195  cursor.execute(sql)196 result = True197 except Exception as e:198 logging.error(ERROR: execute {0} causes error.format(sql))199 sys.exit(ERROR: delete data from database error caused {0}.format(str(e)))200 finally:201  cursor.close()202  conn.commit()203  conn.close() 204 return result205 206 207 def close_pool(self):208 ‘‘‘关闭pool209  参数210  ---------211 212 213  返回值214  --------215 216  示例217  --------218 219 ‘‘‘220 if self._pool != None:221  self._pool.close()222 223 if __name__ == __main__:224 path = "E:\\Users\\Administrator\\eclipse-workspace\\com.leagsoft.basemodule\\base\\config\\sql_conf.conf"225 db = DatabaseOperator(226 database_config_path=path)227 db.test_pool_con()

二、多线程

原理:创建多个线程类,多个线程类共享一个队里Queue,每一个线程类可以操作数据库

 1 from threading import Thread 2  3 class Worker(Thread): 4 def __init__(self, queue): 5 Thread.__init__(self) 6 self.queue = queue 7  8 def run(self): 9 while True:10 # Get the work from the queue and expand the tuple11 # 从队列中获取任务12 database_operator, device, stand_alone_result = self.queue.get()13  operateResult(database_operator, device, stand_alone_result)14 # 任务执行完之后要通知队列15 self.queue.task_done()

填充队列

 1 # 使用队列多线程 2 logging.info(begin to update all device risk score by multi_processing.\n) 3 from queue import Queue 4 queue = Queue() 5 # 六个线程,每个线程共享一个队列 6 for _ in range(6): 7 worker = Worker(queue) 8  worker.setDaemon(True) 9  worker.start()10 11 for record in all_devid:12 device = record[0]13 devtype = record[1]14 all_countlist = all_dict.get(device)15 stand_alone_result = device_assess(all_countlist)16 if (devtype in (server_devtype + computer_devtype)) and (stand_alone_result < 100):17 stand_alone_result *= 0.818 # 将设备风险评分数据保存到数据库中19  queue.put((database_operator, device, stand_alone_result))20 21 #等待队列任务执行完22  queue.join()23 24 25 def operateResult(database_operator, device, stand_alone_result):26 ‘‘‘27  函数名称: device_assess28  描述: 保存单台设备分数到数据库29  调用: 无30  被调用: main31  被访问的表: tbl_devprofile32  被修改的表: 无33  输入参数: database_operator, device:设备uid, stand_alone_result:单台设备风险分数34  输出参数:无35  返回值: 单台设备风险分数值36  其它: 无37 ‘‘‘38 import time39 find_profile_sql = "SELECT uiddevrecordid FROM tbl_devprofile WHERE uiddevrecordid=‘{0}‘;".format(device)40 isExistRecord = database_operator.pg_select_operator(find_profile_sql)41 #currentTime=datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘)42 currentTime=time.strftime(%Y-%m-%d %H:%M:%S,time.localtime(time.time()))43 if len(isExistRecord) > 0:44 updata_profile_sql = "UPDATE tbl_devprofile SET irisklevel={0}, dtrisktime=‘{1}‘ 45  WHERE uiddevrecordid=‘{2}‘;".format(stand_alone_result, currentTime, device)46  database_operator.pg_update_operator(updata_profile_sql)47 else:48 insert_profile_sql = "INSERT INTO tbl_devprofile VALUES(‘{0}‘,NULL,NULL,NULL,NULL,NULL,NULL,NULL,{1},‘{2}‘);".format(49  device, stand_alone_result, currentTime)50 database_operator.pg_insert_operator(insert_profile_sql)

使用单线程时,执行完代码花费20s左右,使用多线程时花费5s左右。

 

 

Reference:

[1] https://blog.csdn.net/zhaihaifei/article/details/54016939

[2] https://www.cnblogs.com/hao-ming/p/7215050.html?utm_source=itdadao&utm_medium=referral

[3] https://www.cnblogs.com/wozijisun/p/6160065.html (多线程)

[4] http://www.lpfrx.com/archives/4431/

[5] https://www.cnblogs.com/95lyj/p/9047554.html

相关文章