158 lines
4.5 KiB
Markdown
158 lines
4.5 KiB
Markdown
```python
|
|
# google.py
|
|
|
|
#%%
|
|
import xvfbwrapper, playwright.async_api
|
|
from PIL import Image; from io import *
|
|
#%%
|
|
xvfbwrapper.Xvfb().start()
|
|
playwright = await playwright.async_api.async_playwright().start()
|
|
browser = await playwright.chromium.launch(headless=False,
|
|
args=['--enable-features=WebContentsForceDark'])
|
|
page = await browser.new_page()
|
|
#%%
|
|
await page.goto('https://google.com')
|
|
image = Image.open(BytesIO(await page.screenshot()))
|
|
image.save('google.png')
|
|
```
|
|
|
|
위의 `google.py`는 top-level await를 사용하므로 아래와 같이 실행해야 한다.
|
|
```sh
|
|
python -m asyncio < google.py
|
|
```
|
|
|
|
```python
|
|
# play.py
|
|
from playwright.async_api import async_playwright as aP
|
|
import os, io, asyncio, xvfbwrapper
|
|
from db import DB
|
|
|
|
async def Page(browser='chromium', headless=True):
|
|
if headless: xvfbwrapper.Xvfb().start()
|
|
else: os.environ['DISPLAY'] = ':0'
|
|
|
|
db = DB()
|
|
playwright = await aP().start()
|
|
browser = await getattr(playwright, browser).launch(headless=False)
|
|
context = await browser.new_context(accept_downloads=True)
|
|
context.set_default_timeout(0)
|
|
|
|
async def save(response):
|
|
try:
|
|
if response.ok and not db.exists(url := response.url):
|
|
db[url] = await response.body()
|
|
except: pass
|
|
|
|
async def load(route):
|
|
if body := db[route.request.url]: return await route.fulfill(body=body)
|
|
await route.continue_()
|
|
|
|
context.on('response', save)
|
|
await context.route('**/*', load)
|
|
for block in ['**/*.gif', '**/css*.js']:
|
|
await context.route(block, lambda route: route.abort())
|
|
|
|
return await context.new_page()
|
|
|
|
async def main():
|
|
page = await Page()
|
|
await page.goto('https://reddit.com')
|
|
await page.screenshot(path='screenshot.png')
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|
|
```
|
|
|
|
```python
|
|
# db.py
|
|
import sqlite3, json, os
|
|
|
|
class DB(sqlite3.Connection):
|
|
def __init__(self, db_name=".db.sqlite"):
|
|
super().__init__(os.path.expanduser(db_name))
|
|
with self:
|
|
self.execute('''
|
|
CREATE TABLE IF NOT EXISTS kv_store
|
|
(key TEXT PRIMARY KEY, value BLOB)
|
|
''')
|
|
|
|
def __setitem__(self, key, value):
|
|
value = value if isinstance(value, bytes) else json.dumps(value)
|
|
with self:
|
|
result = self.execute('''
|
|
INSERT OR REPLACE INTO kv_store
|
|
(key, value) VALUES (?, ?)
|
|
''', (key, value)).rowcount
|
|
return {"modified_count": result}
|
|
|
|
def __getitem__(self, key):
|
|
with self:
|
|
result = self.execute('''
|
|
SELECT value FROM kv_store
|
|
WHERE key = ?
|
|
''', (key,)).fetchone()
|
|
if result:
|
|
if isinstance(value := result[0], str):
|
|
try: return json.loads(value)
|
|
except json.JSONDecodeError: pass
|
|
return value
|
|
|
|
def delete(self, key):
|
|
with self:
|
|
result = self.execute('''
|
|
DELETE FROM kv_store
|
|
WHERE key = ?
|
|
''', (key,)).rowcount
|
|
return {"deleted_count": result}
|
|
|
|
def keys(self, pattern="*"):
|
|
pattern = pattern.translate(str.maketrans(
|
|
{"\\": "\\\\", "%": "\\%", "_": "\\_", "*": "%", "?": "_"}))
|
|
with self:
|
|
result = self.execute('''
|
|
SELECT key FROM kv_store
|
|
WHERE key LIKE ? ESCAPE '\\'
|
|
''', (pattern,)).fetchall()
|
|
return [row[0] for row in result]
|
|
|
|
def __repr__(self): return repr(self.keys())
|
|
|
|
def exists(self, key):
|
|
with self:
|
|
result = self.execute('''
|
|
SELECT 1 FROM kv_store
|
|
WHERE key = ?
|
|
LIMIT 1
|
|
''', (key,)).fetchone()
|
|
return bool(result)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
db = DB('~/db.sqlite')
|
|
|
|
# 문자열 저장 및 조회
|
|
db["hello"] = "world"
|
|
print(db["hello"]) # 출력: world (str 타입)
|
|
|
|
# 숫자 저장 및 조회
|
|
db["number"] = 42
|
|
print(db["number"]) # 출력: 42 (int 타입)
|
|
|
|
# 바이너리 데이터 저장 및 조회
|
|
db["binary"] = b"binary data"
|
|
print(db["binary"]) # 출력: b'binary data' (bytes 타입)
|
|
|
|
# 복잡한 객체 저장
|
|
complex_obj = {"name": "John", "age": 30, "city": "New York"}
|
|
db["complex"] = complex_obj
|
|
loaded_obj = db["complex"]
|
|
print(
|
|
loaded_obj
|
|
) # 출력: {'name': 'John', 'age': 30, 'city': 'New York'} (dict 타입)
|
|
|
|
db["test_key"] = "test_value"
|
|
|
|
print(db.exists("test_key")) # True
|
|
print(db.exists("non_existent_key")) # False
|
|
```
|