newWebMethod

#!/bin/env python#-*- encoding=utf8 -*-import tracebackimport os,ssl,randomimport string,zipfileimport PublicMethod as PMfrom time import *from _weakref import proxyfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.select import Selectfrom selenium.webdriver.common.action_chains import ActionChainsopera_list=[Opera,opera,op]firefox_list=[Firefox,firefox,ff]chrome_list=[Chrome,chrome,Google,google,gg]# 再操作一次元素,修饰器方法def try_again(func): def wrapper(self,*args,**kwargs): try: result=func(self,*args,**kwargs) except: error_msg=traceback.format_exc() if self.retry_error(error_msg): result=func(self,*args,**kwargs) else: raise Exception(error_msg) PM.random_sleep(1,3) return result return wrapper# Web.异常截图修饰器def save_screen_shot(func): def wrapper(self,*args,**kwargs): try: func(self,*args,**kwargs) except: # 输出并保存错误信息 traceback.print_exc() error_msg=traceback.format_exc() time=PM.get_time(%m%d_%H%M%S) log_path=self.task_path+\\log_%s.txt%time traceback.print_exc(file=open(log_path,w+)) # 根据错误类型判断是否需要重新运行 self.web.rerun_error(error_msg) PM.output_result(self.task_path+\\output.txt) print "PM.fail[‘reason‘]:\n",PM.fail[reason] # 获取当前页面的URL、HTML、截图 self.web.get_error_page_info(log_path) print sleep 1000s sleep(1000) # 关闭浏览器,并清除相关进程 try: self.driver.quit() print Close Window by modifier! except: print u浏览器已关闭(E)! close_browser_by_pid(self.python_pid,self.driver_pid,self.mozilla_pid) raise Exception(Error thrown by modifier !) return wrapper# 获取任务相关进程PIDdef get_all_pid(browser): if browser in chrome_list: driver=chromedriver mozilla=chrome elif browser in firefox_list: driver=geckodriver mozilla=firefox elif browser in opera_list: driver=operadriver mozilla=opera # 获取python_pid python_pid=os.getpid() # 获取driver_pid try: result=os.popen(wmic process where (ParentProcessId=%s) get Caption,ProcessId%python_pid) res=result.read() for line in res.splitlines(): if driver in line: driver_pid=line.split( )[1] except: driver_pid=None # 获取mozilla_pid try: result=os.popen(wmic process where (ParentProcessId=%s) get Caption,ProcessId%driver_pid) res=result.read() for line in res.splitlines(): if mozilla in line: mozilla_pid=line.split( )[1] except: mozilla_pid=None print python_pid,driver_pid,mozilla_pid return python_pid,driver_pid,mozilla_pid# 关闭浏览器,及其相关进程def close_browser_by_pid(python_pid,driver_pid,mozilla_pid): try: os.popen(taskkill /F /pid:%s /T%mozilla_pid) print kill mozilla by pid ! except: print kill mozilla by pid failed ! try: os.popen(taskkill /F /pid:%s /T%driver_pid) print kill driver by pid ! except: print kill driver by pid failed ! print python PID:%s%python_pid# 配置浏览器,并启动class StartBrowser(): def __init__(self,url,param_dict,task_path=None): self.url=url self.headless=False self.browser=param_dict[browser] self.cookies=param_dict[cookies] self.task_path=param_dict[task_path] self.ip=param_dict[ip] self.port=param_dict[port] self.ip_username=param_dict[ip_username] self.ip_passwd=param_dict[ip_passwd] self.binary_location=param_dict[binary_location] # 生成Chrome代理IP插件 def create_proxyauth_extension(self,scheme=http,plugin_path=None): manifest_json = """ { "version": "1.0.0", "manifest_version": 2, "name": "Chrome Proxy", "permissions": [ "proxy", "tabs", "unlimitedStorage", "storage", "<all_urls>", "webRequest", "webRequestBlocking" ], "background": { "scripts": ["background.js"] }, "minimum_chrome_version":"22.0.0" } """ background_js = string.Template( """ var config = { mode: "fixed_servers", rules: { singleProxy: { scheme: "${scheme}", host: "${host}", port: parseInt(${port}) }, bypassList: ["foobar.com"] } }; chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); function callbackFn(details) { return { authCredentials: { username: "${username}", password: "${password}" } }; } chrome.webRequest.onAuthRequired.addListener( callbackFn, {urls: ["<all_urls>"]}, [‘blocking‘] ); """ ).substitute(host=self.ip,port=self.port,username=self.ip_username,password=self.ip_passwd,scheme=scheme) with zipfile.ZipFile(plugin_path,w) as zp: zp.writestr("manifest.json", manifest_json) zp.writestr("background.js", background_js) return plugin_path # Chrome代理登录 def chrome_proxy_options(self): chrome_options=webdriver.ChromeOptions() chrome_options.add_argument(disable-infobars)# 不显示浏览器正被自动化软件控制的提示 chrome_options.add_argument("--start-maximized")# 最大化窗口启动 chrome_options.add_argument(--ignore-certificate-errors) # 设置为无头浏览器 if self.headless: chrome_options.add_argument(--headless) chrome_options.add_argument(--disable-gpu) # 添加代理ip if self.ip is not None: chrome_options.add_argument(--proxy-server=http://%s:%s%(self.ip,self.port)) # Opera浏览器需要添加binary_location if self.browser in opera_list: chrome_options.binary_location=self.binary_location # 添加用户名/密码认证IP if self.ip_username is not None: print u 创建一个代理IP插件! path=%s/vimm_chrome_proxyauth_plugin.zip%self.task_path proxyauth_plugin_path=self.create_proxyauth_extension(plugin_path=path) chrome_options.add_extension(proxyauth_plugin_path) return chrome_options # Firefox代理登录 def firefox_proxy_options(self): profile=webdriver.FirefoxProfile() # 设置为无头浏览器 if self.headless: profile.set_preference(-headless) #设置无头模式 # 添加代理ip if self.ip is not None: profile.set_preference(network.proxy.type,1) profile.set_preference(network.proxy.http,self.ip) profile.set_preference(network.proxy.http_port,self.port) profile.set_preference(network.proxy.ssl,self.ip) profile.set_preference(network.proxy.ssl_port,self.port) profile.update_preferences() return profile # 检查IP是否正确 def check_IP(self,driver): try_again=False try: driver.get(http://pv.sohu.com/cityjson?ie=utf-8) sleep(5) page_source=driver.page_source except: page_source=u打开检查IP地址的URL失败! print page_source:\n%s%page_source # if self.ip is not None: # 对比IP地址 if self.ip in page_source: print u 代理IP设置正确! else: driver.quit() if self.port==3128: self.port=58378 elif self.port==58378: self.port=3128 try_again=True return try_again # 打开浏览器 def _open_browser(self): # 创建driver for i in range(2): if self.browser in opera_list: options=self.chrome_proxy_options() driver=webdriver.Opera(options=options) elif self.browser in firefox_list: profile=self.firefox_proxy_options() driver=webdriver.Firefox(firefox_profile=profile) elif self.browser in chrome_list: chrome_options=self.chrome_proxy_options() driver=webdriver.Chrome(chrome_options=chrome_options) # 检查IP是否正确 if self.ip is not None: r=self.check_IP(driver) if r: if i==1: PM.fail[reason]=u代理IP设置错误,退出浏览器! raise Exception else: # 如果返回无需重新设置代理,退出循环 break else: break # driver.execute_script("function(){Object.defineProperties(navigator,{webdriver:{get:() =&amp;gt; false}})}") # sleep(1000) # 打开URL # 判断是否需要添加cookies if self.cookies!=None: print u使用cookie认证登录 driver.get(self.url)# 初次建立连接,随后方可修改cookie driver.delete_all_cookies()# 删除第一次建立连接时的cookie for cookie in self.cookies: if self.browser==Firefox: del cookie[domain] driver.add_cookie(cookie) # 最大化浏览器,打开url,返回driver driver.maximize_window() sleep(3) # 使用cookie时再次访问页面,便可实现免登录访问 driver.get(self.url) return driver # 打开浏览器。如果打开异常,返回异常信息 def open_browser(self): # 启动浏览器,创建driver try: driver=self._open_browser() except: error_msg=traceback.format_exc() PM.fail[reason]=u浏览器启动失败!\n原因:\n%s%error_msg print PM.fail[reason] PM.output_result(self.task_path+\\output.txt) python_pid,driver_pid,mozilla_pid=get_all_pid(self.browser) driver.quit() close_browser_by_pid(python_pid,driver_pid,mozilla_pid) raise Exception(error_msg) return driver# 定义操作元素的方法class WebMethod(): def __init__(self,driver,task_path=None): self.driver=driver self.task_path=task_path def c_time(self): return strftime(%Y-%m-%d %H:%M:%S,localtime()) # ** # 异常信息处理方法 # * # 从页面获取信息,判断异常原因 def get_page_error(self): # 网络异常错误信息收集 try: reload_button=self.is_exists(//button[@id="reload-button"]) print reload_button:%s%reload_button except: reload_button=False try: error_page=self.is_exists(//div[id="errorPageContainer"]) print error_page:%s%error_page except: error_page=False try: network_error=self.is_exists(//div[@id="main-message"]/h1[@jsselect="heading"]) print network_error:%s%network_error except: network_error=False try: tryAgain_button=web.is_exists(//button[@id="errorTryAgain"]) print tryAgain_button:%s%tryAgain_button except: tryAgain_button=False # 其它异常信息收集 try: is_relogin=self.is_exists(//span[@id="GREET-SIGN-IN-TO-EBAY"]) except: is_relogin=False if reload_button or network_error or error_page: PM.fail[reason]=u网络连接错误,请重新运行! elif is_relogin: PM.fail[reason]=ueBay需要反复登录,请检查账号状态! # 如果错误原因不再是默认的,引用该方法时将错误原因写入output if PM.fail[reason]!=unknown reasons,please contact IT: return True else: return False # 可以再操作一次元素的异常情况 def retry_error(self,error_msg): try_again=False # if ‘NoSuchElementException:‘ in error_msg: # try_again=True if Command.GO_BACK in error_msg: try_again=True elif Command.CLICK_ELEMENT in error_msg: try_again=True elif Command.GET_CURRENT_URL in error_msg: try_again=True elif Command.GET_PAGE_SOURCE in error_msg: try_again=True elif ElementClickInterceptedException in error_msg: try_again=True # driver.set_elmt_not_visb() PM.random_sleep(5,10) if try_again: print u **再次执行当前操作··· return try_again # 可以重新运行程序的异常情况 def rerun_error(self,error_msg): rerun=False ‘‘‘ TimeoutException: Message: Timeout loading page after 300000ms WebDriverException: Message: Failed to decode response from marionette WebDriverException: Message: Reached error page: about:neterror?e=netTimeout WebDriverException: Message: Reached error page: about:neterror?e=nssFailure ‘‘‘ if self.get_page_error(): rerun=True elif self.retry_error(error_msg): rerun=True elif WebDriverException: in error_msg: rerun=True elif unknown session in error_msg: rerun=True elif unknown sessionId in error_msg: rerun=True elif invalid session id in error_msg: rerun=True elif Message: no such session in error_msg: rerun=True elif session deleted because of page crash in error_msg: rerun=True elif error: [Errno in error_msg: rerun=True elif MaxRetryError in error_msg: rerun=True elif in check_response in error_msg: if Message: timeout in error_msg: rerun=True if rerun: PM.fail[reason]=u未知原因失败,可以尝试重新运行一次。如再次相同原因失败,请联系开发人员!\n错误原因:\n%s%error_msg return rerun # ** # locator处理方法 # * # 判断定位元素使用的定位方式 def return_loc_type(self,locator): if locator.startswith(/): return By.XPATH else: return By.CSS_SELECTOR # 判断元素是否存在 def is_exists(self,locator,timeout=1): element_obj=0 loc_list=PM.return_loc_list(locator) # print ‘locator:‘,locator for i in range(timeout): for loc in loc_list: loc_type=self.return_loc_type(loc) elmt_list=self.driver.find_elements(by=loc_type,value=loc) length=len(elmt_list) # print loc_type,length,loc if length>=1: element_obj=1 break if length>=1: print(loc) break elif timeout>1: sleep(1) # 超时设置超过1s的都是需要操作元素的,所以需要返回元素对象 if timeout>1: # 找到了元素,就返回元素对象。如果找不到元素,直接就报出了错误 element_obj=self.driver.find_element(by=loc_type,value=loc) return element_obj # 判断元素是否可见 def is_displayed(self,locator,timeout=1): result=0 locator=PM.return_loc_list(locator) for i in range(timeout): for loc in locator: loc_type=self.return_loc_type(loc) elmt_list=self.driver.find_elements(by=loc_type,value=loc) length=len(elmt_list) if length>=1: # 元素已存在当前页面,判断元素是否可见 visible=self.driver.find_element(by=loc_type,value=loc).is_displayed() if visible: result=1 print This element is visible:%s%loc break if length>=1: break elif timeout>1: sleep(1) return result # ** # locator操作方法 # * # 等待元素出现 def wait_element_OK(self,locator,timeout=20): print([%s] waiting element is visible...By locator:%self.c_time()) self.is_exists(locator,timeout) PM.random_sleep(1,3) # 点击元素 @try_again def click_element(self,locator,timeout=20): print([%s] clicking element...By locator:%self.c_time()) element_obj=self.is_exists(locator,timeout) element_obj.click() # 输入文本 @try_again def input_text(self,text,locator,timeout=20): print([%s] entering text...By locator:%self.c_time()) element_obj=self.is_exists(locator,timeout) element_obj.clear() sleep(1) for str in text: element_obj.send_keys(str) PM.random_sleep(0.2,1) # 获取元素指定属性值 @try_again def get_attr(self,attr,locator,timeout=20): print([%s] getting element attribute values...By locator:%self.c_time()) element_obj=self.is_exists(locator,timeout) return element_obj.get_attribute(attr) # 获取标签对中间的文本 @try_again def get_text(self,locator,timeout=20): print([%s] getting element text...By locator:%self.c_time()) element_obj=self.is_exists(locator,timeout) return element_obj.text # 鼠标悬停于某元素上 @try_again def mouse_over(self,locator,timeout=20): print([%s] mouse is hovering...By locator:%self.c_time()) element_obj=self.is_exists(locator,timeout) ActionChains(self.driver).move_to_element(element_obj).perform() # 选择选项,使用option的文本  @try_again def select_by_attr(self,locator,attr,attr_type=value,timeout=20): print([%s] selecting drop-down box elements by %s...By locator:%(self.c_time(),attr_type)) element_obj=self.is_exists(locator,timeout) if attr_type==text: Select(element_obj).select_by_visible_text(attr) elif attr_type==index: Select(element_obj).select_by_index(attr) else: Select(element_obj).select_by_value(attr) # 返 回 @try_again def _go_back(self): print([%s] returning to the previous page...By locator:%self.c_time()) self.driver.back() # 返回至上一级页面 def go_back(self,url=None,desc=u上一个页面): result=False url1=self.driver.current_url for i in range(2): self._go_back() for j in range(5): sleep(2) url2=self.driver.current_url if url1!=url2: print u 点击go back,成功返回至%s%desc result=True break if result: break elif i==1: if url is None: print u 返回%s时,页面跳转失败!%desc else: self.driver.get(url) print u 返回至%s,URL:%s%(desc,url) return result # 判断点击元素后页面是否发生跳转,如果没有再点击一次 def click_goTo(self,locator,timeout=20): result=False print([%s] clicking to jump the page...By locator:%self.c_time()) url1=self.driver.current_url for i in range(2): self.click_element(locator,timeout) for j in range(5): sleep(2) url2=self.driver.current_url if url1!=url2: print u 页面跳转成功!Locator:%s%locator result=True break if result: break elif i==1: print u 页面跳转失败!Locator:%s%locator return result # ** # Others # * # 从某个元素位置滑动到另一个元素位置 def adjust_element(self,start_loc,end_loc,timeout=20): print([%s] adjusting element position...By locator:%self.c_time()) if type(start_loc)==int: start_Y=start_loc# 起始位置Y坐标 else: element_obj=self.is_exists(start_loc,timeout) start_position=element_obj.location start_Y=start_position[y]# 起始位置Y坐标 if type(end_loc)==int: end_Y=end_loc# 结束位置Y坐标 else: element_obj=self.is_exists(end_loc,timeout) end_position=element_obj.location end_Y=end_position[y]# 结束位置Y坐标 # 向下滑 if start_Y<end_Y: next_Y=start_Y+random.randint(330,590)# 首次滑动距离 for i in range(20): PM.random_sleep(1,3) if next_Y>end_Y: break else: self.driver.execute_script(window.scrollTo(0,%d)%next_Y) next_Y+=random.randint(330,590)# 每次增加的滑动距离 # 向上滑 elif start_Y>end_Y: next_Y=start_Y-random.randint(330,590)# 首次滑动距离 for j in range(20): PM.random_sleep(1,3) if next_Y<end_Y: break else: self.driver.execute_script(window.scrollTo(0,%d)%next_Y) next_Y-=random.randint(330,590)# 每次增加的滑动距离 # 最终直接滑动到结束位置 self.driver.execute_script(window.scrollTo(0,%d)%(end_Y-200)) # 在页面中随意滚动鼠标滚轮,并滚动回页面顶部 def random_scroll_page(self,min,max): print([%s] random sliding of pages...By locator:%self.c_time()) max_count=random.randint(min,max) # 第一次滑动到的Y坐标位置 y_position=random.randint(310,380) for i in range(max_count): self.driver.execute_script(window.scrollTo(0,%d)%y_position) y_position+=random.randint(330,390) PM.random_sleep(1,3) for j in range(max_count+1): self.driver.execute_script(window.scrollTo(0,%d)%y_position) y_position-=random.randint(430,590) PM.random_sleep(0.5,2) # 回到页面顶部 self.driver.execute_script(window.scrollTo(0,0)) PM.random_sleep(1,3) # 选择新打开的窗口 def select_new_window(self): print([%s] selecting the newly opened browser window...By locator:%self.c_time()) for i in range(10): sleep(2) all_handles=self.driver.window_handles if len(all_handles)>1: break elif i==9: print u新窗口打开失败,一共有%s个窗口!%len(all_handles) result=False current_handle=self.driver.current_window_handle for handle in all_handles: if handle!=current_handle: self.driver.switch_to_window(handle) result=True return result # 收集错误页面的信息,并写入log.txt def get_error_page_info(self,log_path): print([%s] Error occurred. Getting the current page information...By locator:%self.c_time()) fp=open(log_path,a+) try: # 保存当前页面URL fp.writelines(self.driver.current_url) # 保存当前页面HTML with open("%s\\error_page_source.html"%self.task_path,"w") as f: p.writelines(self.driver.page_source.encode("utf-8")) # 截取当前页面图片 time=PM.get_time(%m%d_%H%M%S) file_path=%s\\error_page%s.png%(self.task_path,time) self.driver.get_screenshot_as_file(path) print u\n页面截图保存路径:\n%%file_path except: fp.writelines(u无法执行webdriver提供的方法,请检查session是否丢失!) finally: fp.close()

 

未完成***

相关文章