一、DBUtils
DBUtils 是一套允许线程化 Python 程序可以安全和有效的访问数据库的模块,DBUtils提供两种外部接口: PersistentDB :提供线程专用的数据库连接,并自动管理连接。 PooledDB :提供线程间可共享的数据库连接,并自动管理连接。
操作数据库模板:
1 import datetime 2 import sys 3 import os 4 import configparser 5 import logging 6 import psycopg2 7 8 from DBUtils.PooledDB import PooledDB 9 10 11 12 13 class DatabaseOperator(object): 14 ‘‘‘ 15 class for database operator 16 ‘‘‘ 17 18 19 def __init__(self, 20 database_config_path, database_config=None): 21 ‘‘‘ 22 Constructor 23 ‘‘‘ 24 self._database_config_path = database_config_path 25 26 # load database configuration 27 if not database_config : 28 self._database_config = self.parse_postgresql_config(database_config_path) 29 else: 30 self._database_config = database_config 31 self._pool = None 32 33 def database_config_empty(self): 34 if self._database_config: 35 return False 36 else: 37 return True 38 39 def parse_postgresql_config(self, database_config_path=None): 40 ‘‘‘解析pei数据库配置文件 41 参数 42 --------- 43 arg1 : conf_file 44 数据库配置文件路径 45 返回值 46 -------- 47 dict 48 解析配置属性dict--config 49 50 示例 51 -------- 52 无 53 ‘‘‘ 54 if database_config_path == None and self._database_config_path != None: 55 database_config_path = self._database_config_path 56 if not os.path.isfile(database_config_path): 57 sys.exit("ERROR: Could not find configuration file: {0}".format(database_config_path)) 58 parser = configparser.SafeConfigParser() 59 parser.read(database_config_path) 60 config = {} 61 config[‘database‘] = parser.get(‘UniMonDB‘, ‘Database‘) 62 config[‘db_user‘] = parser.get(‘UniMonDB‘, ‘UserName‘) 63 config[‘db_passwd‘] = parser.get(‘UniMonDB‘, ‘Password‘) 64 config[‘db_port‘] = parser.getint(‘UniMonDB‘, ‘Port‘) 65 config[‘db_host‘] = parser.get(‘UniMonDB‘, ‘Servername‘) 66 self._database_config = config 67 68 return config 69 70 71 def get_pool_conn(self): 72 73 if not self._pool: 74 self.init_pgsql_pool() 75 return self._pool.connection() 76 77 def init_pgsql_pool(self): 78 ‘‘‘利用数据库属性连接数据库 79 参数 80 --------- 81 arg1 : config 82 数据库配置属性 83 返回值 84 -------- 85 86 示例 87 -------- 88 无 89 ‘‘‘ 90 # 字典config是否为空 91 config = self.parse_postgresql_config() 92 POSTGREIP = config[‘db_host‘] 93 POSTGREPORT = config[‘db_port‘] 94 POSTGREDB = config[‘database‘] 95 POSTGREUSER = config[‘db_user‘] 96 POSTGREPASSWD = config[‘db_passwd‘] 97 try: 98 logging.info(‘Begin to create {0} postgresql pool on:{1}.\n‘.format(POSTGREIP, datetime.datetime.now())) 99 100 pool = PooledDB(101 creator=psycopg2, # 使用链接数据库的模块mincached102 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数103 mincached=1, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建104 maxcached=4, # 链接池中最多闲置的链接,0和None不限制105 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错106 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制107 setsession=[], # 开始会话前执行的命令列表。108 host=POSTGREIP,109 port=POSTGREPORT,110 user=POSTGREUSER,111 password=POSTGREPASSWD,112 database=POSTGREDB)113 self._pool = pool 114 logging.info(‘SUCCESS: create postgresql success.\n‘)115 116 except Exception as e:117 logging.error(‘ERROR: create postgresql pool failed:{0}\n‘)118 self.close_db_cursor()119 sys.exit(‘ERROR: create postgresql pool error caused by {0}‘.format(str(e)))120 121 122 def pg_select_operator(self, sql):123 ‘‘‘进行查询操作,函数返回前关闭cursor,conn124 参数125 ---------126 arg1 : sql查询语句127 返回值128 --------129 list:result130 类型为list的查询结果:result131 132 示例133 --------134 无135 ‘‘‘136 # 执行查询137 try:138 conn = self.get_pool_conn()139 cursor = conn.cursor() 140 cursor.execute(sql)141 result = cursor.fetchall()142 except Exception as e:143 logging.error(‘ERROR: execute {0} causes error‘.format(sql))144 sys.exit(‘ERROR: load data from database error caused {0}‘.format(str(e)))145 finally:146 cursor.close()147 conn.close() 148 return result149 150 def test_pool_con(self):151 sql = ‘select * from tbl_devprofile‘152 result = self.pg_select_operator(sql)153 print(result)154 155 def pg_insert_operator(self, sql):156 157 result = False158 try:159 conn = self.get_pool_conn()160 cursor = conn.cursor() 161 cursor.execute(sql)162 result = True163 except Exception as e:164 logging.error(‘ERROR: execute {0} causes error‘.format(sql))165 sys.exit(‘ERROR: insert data from database error caused {0}‘.format(str(e)))166 finally:167 cursor.close()168 conn.commit()169 conn.close() 170 return result171 172 def pg_update_operator(self, sql):173 174 result = False175 try:176 conn = self.get_pool_conn()177 cursor = conn.cursor() 178 cursor.execute(sql)179 result = True180 except Exception as e:181 logging.error(‘ERROR: execute {0} causes error‘.format(sql))182 sys.exit(‘ERROR: update data from database error caused {0}‘.format(str(e)))183 finally:184 cursor.close()185 conn.commit()186 conn.close() 187 return result188 189 def pg_delete_operator(self, sql):190 result = False191 # 执行查询192 try:193 conn = self.get_pool_conn()194 cursor = conn.cursor() 195 cursor.execute(sql)196 result = True197 except Exception as e:198 logging.error(‘ERROR: execute {0} causes error‘.format(sql))199 sys.exit(‘ERROR: delete data from database error caused {0}‘.format(str(e)))200 finally:201 cursor.close()202 conn.commit()203 conn.close() 204 return result205 206 207 def close_pool(self):208 ‘‘‘关闭pool209 参数210 ---------211 无 212 213 返回值214 --------215 无216 示例217 --------218 无219 ‘‘‘220 if self._pool != None:221 self._pool.close()222 223 if __name__ == ‘__main__‘:224 path = "E:\\Users\\Administrator\\eclipse-workspace\\com.leagsoft.basemodule\\base\\config\\sql_conf.conf"225 db = DatabaseOperator(226 database_config_path=path)227 db.test_pool_con()
二、多线程
原理:创建多个线程类,多个线程类共享一个队里Queue,每一个线程类可以操作数据库
1 from threading import Thread 2 3 class Worker(Thread): 4 def __init__(self, queue): 5 Thread.__init__(self) 6 self.queue = queue 7 8 def run(self): 9 while True:10 # Get the work from the queue and expand the tuple11 # 从队列中获取任务12 database_operator, device, stand_alone_result = self.queue.get()13 operateResult(database_operator, device, stand_alone_result)14 # 任务执行完之后要通知队列15 self.queue.task_done()
填充队列
1 # 使用队列多线程 2 logging.info(‘begin to update all device risk score by multi_processing.\n‘) 3 from queue import Queue 4 queue = Queue() 5 # 六个线程,每个线程共享一个队列 6 for _ in range(6): 7 worker = Worker(queue) 8 worker.setDaemon(True) 9 worker.start()10 11 for record in all_devid:12 device = record[0]13 devtype = record[1]14 all_countlist = all_dict.get(device)15 stand_alone_result = device_assess(all_countlist)16 if (devtype in (server_devtype + computer_devtype)) and (stand_alone_result < 100):17 stand_alone_result *= 0.818 # 将设备风险评分数据保存到数据库中19 queue.put((database_operator, device, stand_alone_result))20 21 #等待队列任务执行完22 queue.join()23 24 25 def operateResult(database_operator, device, stand_alone_result):26 ‘‘‘27 函数名称: device_assess28 描述: 保存单台设备分数到数据库29 调用: 无30 被调用: main31 被访问的表: tbl_devprofile32 被修改的表: 无33 输入参数: database_operator, device:设备uid, stand_alone_result:单台设备风险分数34 输出参数:无35 返回值: 单台设备风险分数值36 其它: 无37 ‘‘‘38 import time39 find_profile_sql = "SELECT uiddevrecordid FROM tbl_devprofile WHERE uiddevrecordid=‘{0}‘;".format(device)40 isExistRecord = database_operator.pg_select_operator(find_profile_sql)41 #currentTime=datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘)42 currentTime=time.strftime(‘%Y-%m-%d %H:%M:%S‘,time.localtime(time.time()))43 if len(isExistRecord) > 0:44 updata_profile_sql = "UPDATE tbl_devprofile SET irisklevel={0}, dtrisktime=‘{1}‘ 45 WHERE uiddevrecordid=‘{2}‘;".format(stand_alone_result, currentTime, device)46 database_operator.pg_update_operator(updata_profile_sql)47 else:48 insert_profile_sql = "INSERT INTO tbl_devprofile VALUES(‘{0}‘,NULL,NULL,NULL,NULL,NULL,NULL,NULL,{1},‘{2}‘);".format(49 device, stand_alone_result, currentTime)50 database_operator.pg_insert_operator(insert_profile_sql)
使用单线程时,执行完代码花费20s左右,使用多线程时花费5s左右。
Reference:
[1] https://blog.csdn.net/zhaihaifei/article/details/54016939
[2] https://www.cnblogs.com/hao-ming/p/7215050.html?utm_source=itdadao&utm_medium=referral
[3] https://www.cnblogs.com/wozijisun/p/6160065.html (多线程)
[4] http://www.lpfrx.com/archives/4431/
[5] https://www.cnblogs.com/95lyj/p/9047554.html