```python # google.py #%% import xvfbwrapper, playwright.async_api from PIL import Image; from io import * #%% xvfbwrapper.Xvfb().start() playwright = await playwright.async_api.async_playwright().start() browser = await playwright.chromium.launch(headless=False, args=['--enable-features=WebContentsForceDark']) page = await browser.new_page() #%% await page.goto('https://google.com') image = Image.open(BytesIO(await page.screenshot())) image.save('google.png') ``` 위의 `google.py`는 top-level await를 사용하므로 아래와 같이 실행해야 한다. ```sh python -m asyncio < google.py ``` ```python # play.py from playwright.async_api import async_playwright as aP import os, io, asyncio, xvfbwrapper from db import DB async def Page(browser='chromium', headless=True): if headless: xvfbwrapper.Xvfb().start() else: os.environ['DISPLAY'] = ':0' db = DB() playwright = await aP().start() browser = await getattr(playwright, browser).launch(headless=False) context = await browser.new_context(accept_downloads=True) context.set_default_timeout(0) async def save(response): try: if response.ok and not db.exists(url := response.url): db[url] = await response.body() except: pass async def load(route): if body := db[route.request.url]: return await route.fulfill(body=body) await route.continue_() context.on('response', save) await context.route('**/*', load) for block in ['**/*.gif', '**/css*.js']: await context.route(block, lambda route: route.abort()) return await context.new_page() async def main(): page = await Page() await page.goto('https://reddit.com') await page.screenshot(path='screenshot.png') if __name__ == "__main__": asyncio.run(main()) ``` ```python # db.py import sqlite3, json, os class DB(sqlite3.Connection): def __init__(self, db_name=".db.sqlite"): super().__init__(os.path.expanduser(db_name)) with self: self.execute(''' CREATE TABLE IF NOT EXISTS kv_store (key TEXT PRIMARY KEY, value BLOB) ''') def __setitem__(self, key, value): value = value if isinstance(value, bytes) else json.dumps(value) with self: result = self.execute(''' INSERT OR REPLACE INTO kv_store (key, value) VALUES (?, ?) ''', (key, value)).rowcount return {"modified_count": result} def __getitem__(self, key): with self: result = self.execute(''' SELECT value FROM kv_store WHERE key = ? ''', (key,)).fetchone() if result: if isinstance(value := result[0], str): try: return json.loads(value) except json.JSONDecodeError: pass return value def delete(self, key): with self: result = self.execute(''' DELETE FROM kv_store WHERE key = ? ''', (key,)).rowcount return {"deleted_count": result} def keys(self, pattern="*"): pattern = pattern.translate(str.maketrans( {"\\": "\\\\", "%": "\\%", "_": "\\_", "*": "%", "?": "_"})) with self: result = self.execute(''' SELECT key FROM kv_store WHERE key LIKE ? ESCAPE '\\' ''', (pattern,)).fetchall() return [row[0] for row in result] def __repr__(self): return repr(self.keys()) def exists(self, key): with self: result = self.execute(''' SELECT 1 FROM kv_store WHERE key = ? LIMIT 1 ''', (key,)).fetchone() return bool(result) if __name__ == "__main__": db = DB('~/db.sqlite') # 문자열 저장 및 조회 db["hello"] = "world" print(db["hello"]) # 출력: world (str 타입) # 숫자 저장 및 조회 db["number"] = 42 print(db["number"]) # 출력: 42 (int 타입) # 바이너리 데이터 저장 및 조회 db["binary"] = b"binary data" print(db["binary"]) # 출력: b'binary data' (bytes 타입) # 복잡한 객체 저장 complex_obj = {"name": "John", "age": 30, "city": "New York"} db["complex"] = complex_obj loaded_obj = db["complex"] print( loaded_obj ) # 출력: {'name': 'John', 'age': 30, 'city': 'New York'} (dict 타입) db["test_key"] = "test_value" print(db.exists("test_key")) # True print(db.exists("non_existent_key")) # False ```