3.9 KiB
3.9 KiB
# play.py
from playwright.async_api import async_playwright as aP
import xvfbwrapper, io, os
from db import DB
async def Page(browser='chromium', headless=True):
if headless: xvfbwrapper.Xvfb().start()
else: os.environ['DISPLAY'] = ':0'
db = DB()
playwright = await aP().start()
browser = await getattr(playwright, browser).launch(headless=False)
context = await browser.new_context(accept_downloads=True)
context.set_default_timeout(0)
async def save(response):
if response.ok and not db.exists(url := response.url):
db[url] = await response.body()
async def load(route):
if body := db[route.request.url]:
return await route.fulfill(body=body)
await route.continue_()
context.on('response', save)
await context.route('**/*', load)
for block in ['**/*.gif', '**/css*.js']:
await context.route(block, lambda route: route.abort())
return await context.new_page()
async def main():
page = await Page()
await page.goto('https://naver.com')
await page.screenshot(path='screenshot.png')
if __name__ == "__main__":
asyncio.run(main())
# db.py
import sqlite3, json
class DB(sqlite3.Connection):
def __init__(self, db_name=".db.sqlite"):
super().__init__(db_name)
with self:
cur = self.execute("""
CREATE TABLE IF NOT EXISTS kv_store
(key TEXT PRIMARY KEY, value BLOB)
""")
def __setitem__(self, key, value):
value = value if isinstance(value, bytes) else json.dumps(value)
with self:
cur = self.execute("""
INSERT OR REPLACE INTO kv_store
(key, value) VALUES (?, ?)
""", (key, value))
return {"modified_count": cur.rowcount}
def __getitem__(self, key):
with self:
cur = self.execute("""
SELECT value FROM kv_store
WHERE key = ?
""", (key,))
result = cur.fetchone()
if result:
if isinstance(result[0], str):
try:
return json.loads(result[0])
except json.JSONDecodeError:
pass
return result[0]
def delete(self, key):
with self:
cur = self.execute("""
DELETE FROM kv_store
WHERE key = ?
""", (key,))
return {"deleted_count": cur.rowcount}
def keys(self, pattern="*"):
pattern = pattern.translate(str.maketrans(
{"\\": "\\\\", "%": "\\%", "_": "\\_", "*": "%", "?": "_"}))
with self:
cur = self.execute("""
SELECT key FROM kv_store
WHERE key LIKE ? ESCAPE '\\'
""", (pattern,))
result = cur.fetchall()
return [row[0] for row in result]
def __repr__(self): return repr(self.keys())
def exists(self, key):
with self:
cur = self.execute("""
SELECT 1 FROM kv_store
WHERE key = ?
LIMIT 1
""", (key,))
return bool(cur.fetchone())
if __name__ == "__main__":
db = DB()
# 문자열 저장 및 조회
db["hello"] = "world"
print(db["hello"]) # 출력: world (str 타입)
# 숫자 저장 및 조회
db["number"] = 42
print(db["number"]) # 출력: 42 (int 타입)
# 바이너리 데이터 저장 및 조회
db["binary"] = b"binary data"
print(db["binary"]) # 출력: b'binary data' (bytes 타입)
# 복잡한 객체 저장
complex_obj = {"name": "John", "age": 30, "city": "New York"}
db["complex"] = complex_obj
loaded_obj = db["complex"]
print(
loaded_obj
) # 출력: {'name': 'John', 'age': 30, 'city': 'New York'} (dict 타입)
db["test_key"] = "test_value"
print(db.exists("test_key")) # True
print(db.exists("non_existent_key")) # False