58 lines
3.0 KiB
Markdown
58 lines
3.0 KiB
Markdown
```python
|
|
def Chrome(headless=True, agent=None, proxy=None):
|
|
import requests, subprocess, base64
|
|
try: requests.get(f'http://localhost:9222', timeout=.1)
|
|
except: subprocess.Popen(['chrome', f'--remote-debugging-port={9222}',
|
|
*([f'--user-agent={agent}'] if agent else []),
|
|
*([f'--proxy-server={proxy}'] if proxy else []),
|
|
*(['--headless=new'] if headless else []),
|
|
'--disable-web-security', '--disable-translate',
|
|
'--ignore-certificate-errors', '--remote-allow-origins=*',
|
|
'--disable-backgrounding-occluded-windows',])
|
|
browser = requests.get(f'http://localhost:9222/json/version').json()['webSocketDebuggerUrl']
|
|
def new_page(self):
|
|
def send(url, request):
|
|
import websocket, json; ws = websocket.create_connection(url)
|
|
try: ws.send(json.dumps(request)); return json.loads(ws.recv())
|
|
finally: ws.close()
|
|
page = 'ws://localhost:9222/devtools/page/' + (targetId := send(browser,
|
|
dict(id=1, method='Target.createTarget',
|
|
params=dict(url='about:blank', newWindow=True))
|
|
)['result']['targetId'])
|
|
def evaluate(self, javascript):
|
|
try: return (res := self('Runtime.evaluate',
|
|
expression=javascript, returnByValue=True)['result'])['value']
|
|
except: raise Exception(res)
|
|
def wait_element(self, selector, timeout=30, check_interval=0.5):
|
|
import time; start_time = time.time()
|
|
while time.time() - start_time < timeout:
|
|
if self.evaluate(
|
|
f"document.querySelector('{selector}') ? true : false"):
|
|
return
|
|
time.sleep(check_interval)
|
|
raise TimeoutError(f"Element '{selector}' not found within {timeout} seconds")
|
|
def cookies(self, cookies=None):
|
|
if cookies is None: return self('Network.getAllCookies')['cookies']
|
|
self('Network.clearBrowserCookies')
|
|
for cookie in cookies: self('Network.setCookie', **cookie)
|
|
def screenshot(self):
|
|
from IPython.display import HTML
|
|
display(HTML(f'<img src="data:image/png;base64,{self('Page.captureScreenshot')['data']}">'))
|
|
def goto(self, url):
|
|
self('Page.navigate', url=url if '://' in url else f'https://{url}')
|
|
while self.evaluate('document.readyState') != 'complete':
|
|
self.sleep(0.1)
|
|
return True
|
|
return type('', (), dict(__call__=lambda _, method, **params:
|
|
send(page, {"id": 1, "method": method, "params": params})['result'],
|
|
goto=goto,
|
|
close=lambda self: self('Target.closeTarget', targetId=targetId)['success'],
|
|
sleep=lambda _, seconds: __import__('time').sleep(seconds),
|
|
evaluate=evaluate,
|
|
wait_element=wait_element,
|
|
cookies=cookies,
|
|
screenshot=screenshot,
|
|
source=lambda self: self.evaluate('document.documentElement.outerHTML'),
|
|
))()
|
|
return type('', (), dict(new_page=new_page))()
|
|
``` |