Files
wiki/playwright
2024-08-23 08:29:03 -04:00
..
2024-08-11 14:01:10 +09:00
2024-08-23 08:29:03 -04:00

# main.py
from playwright.async_api import async_playwright as aP
from db import DB
import xvfbwrapper, io, PIL.Image, os

async def Page(browser='chromium', headless=True):
    if headless: xvfbwrapper.Xvfb().start()
    else: os.environ['DISPLAY'] = ':0'
    db = DB()

    playwright = await aP().start()
    browser = await getattr(playwright, browser).launch(headless=False)
    context = await browser.new_context(accept_downloads=True)
    context.set_default_timeout(0)

    async def handle_request(route):
        url = route.request.url
        if body := db[url]: return await route.fulfill(body=body)
        if response := await route.continue_():
            if response.ok and not db.exists(url):
                db[url] = await response.body()
            await route.fulfill(response=response)

    await context.route('**/*', handle_request)
    for block in ['**/*.gif', '**/css*.js']:
        await context.route(block, lambda route: route.abort())
        
    return await context.new_page()
# db.py
import sqlite3, json

class DB(sqlite3.Connection):
    def __init__(self, db_name="db.sqlite"):
        super().__init__(db_name)
        with self:
            self.execute('''
                CREATE TABLE IF NOT EXISTS kv_store
                (key TEXT PRIMARY KEY, value BLOB)
            ''')

    def __setitem__(self, key, value):
        if not isinstance(value, bytes): value = json.dumps(value)
        with self:
            cur = self.execute('''
                INSERT OR REPLACE INTO kv_store
                (key, value) VALUES (?, ?)
            ''', (key, value))
            return {"modified_count": cur.rowcount}

    def __getitem__(self, key):
        with self:
            result = self.execute('''
                SELECT value FROM kv_store
                WHERE key = ?
            ''', (key,)).fetchone()
        if result:
            if isinstance(result[0], str):
                try: return json.loads(result[0])
                except json.JSONDecodeError: pass
            return result[0]

    def delete(self, key):
        with self:
            cur = self.execute('''
                DELETE FROM kv_store 
                WHERE key = ?
            ''', (key,))
            return {"deleted_count": cur.rowcount}

    def keys(self, pattern='*'):
        pattern = pattern.translate(str.maketrans({
            '\\': '\\\\', '%': '\\%', '_': '\\_', '*': '%', '?': '_'
        }))
        with self:
            result = self.execute('''
                SELECT key FROM kv_store
                WHERE key LIKE ? ESCAPE '\\'
            ''', (pattern,)).fetchall()
        return [row[0] for row in result]

    def __repr__(self): return repr(self.keys())

    def exists(self, key):
        with self:
            cur = self.execute('''
                SELECT 1 FROM kv_store
                WHERE key = ?
                LIMIT 1
            ''', (key,))
            return bool(cur.fetchone())