Files
2024-09-25 23:48:58 -04:00
..
2024-09-25 23:48:58 -04:00

import sqlite3
import time

class SQLiteRedis:
    def __init__(self, db_name='redis.sqlite'):
        self.conn = sqlite3.connect(db_name, isolation_level=None)
        self.conn.execute('PRAGMA journal_mode=WAL')
        self.cur = self.conn.cursor()
        self._create_tables()

    def _create_tables(self):
        self.cur.execute('''CREATE TABLE IF NOT EXISTS kv_store
                            (key TEXT PRIMARY KEY, value TEXT)''')
        self.cur.execute('''CREATE TABLE IF NOT EXISTS expiry
                            (key TEXT PRIMARY KEY, expire_at INTEGER)''')
        self.cur.execute('''CREATE TABLE IF NOT EXISTS list_store
                            (key TEXT, idx INTEGER, value TEXT,
                             PRIMARY KEY (key, idx))''')
        self.cur.execute('''CREATE TABLE IF NOT EXISTS hash_store
                            (key TEXT, field TEXT, value TEXT,
                             PRIMARY KEY (key, field))''')


    def set(self, key, value):
        self.cur.execute("REPLACE INTO kv_store (key, value) VALUES (?, ?)",
                         (key, value))

    def get(self, key):
        self.cur.execute("SELECT value FROM kv_store WHERE key = ?", (key,))
        result = self.cur.fetchone()
        if result:
            return result[0]
        return None

    def delete(self, key):
        self.cur.execute("DELETE FROM kv_store WHERE key = ?", (key,))
        self.cur.execute("DELETE FROM expiry WHERE key = ?", (key,))

    def expire(self, key, seconds):
        expire_at = int(time.time()) + seconds
        self.cur.execute("REPLACE INTO expiry (key, expire_at) VALUES (?, ?)",
                         (key, expire_at))

    def ttl(self, key):
        self.cur.execute("SELECT expire_at FROM expiry WHERE key = ?", (key,))
        result = self.cur.fetchone()
        if result:
            ttl = result[0] - int(time.time())
            return max(ttl, 0)
        return -1

    def _cleanup_expired(self):
        now = int(time.time())
        self.cur.execute("DELETE FROM kv_store WHERE key IN "
                         "(SELECT key FROM expiry WHERE expire_at <= ?)", (now,))
        self.cur.execute("DELETE FROM expiry WHERE expire_at <= ?", (now,))

    def close(self):
        self.conn.close()

    def lpush(self, key, *values):
        for value in values:
            self.cur.execute("INSERT INTO list_store (key, idx, value) "
                             "SELECT ?, COALESCE(MIN(idx), 0) - 1, ? "
                             "FROM list_store WHERE key = ?", (key, value, key))
        return self.llen(key)

    def rpush(self, key, *values):
        for value in values:
            self.cur.execute("INSERT INTO list_store (key, idx, value) "
                             "SELECT ?, COALESCE(MAX(idx), -1) + 1, ? "
                             "FROM list_store WHERE key = ?", (key, value, key))
        return self.llen(key)

    def lpop(self, key):
        self.cur.execute("SELECT value FROM list_store WHERE key = ? "
                         "ORDER BY idx ASC LIMIT 1", (key,))
        result = self.cur.fetchone()
        if result:
            self.cur.execute("DELETE FROM list_store WHERE key = ? "
                             "AND idx = (SELECT MIN(idx) FROM list_store WHERE key = ?)",
                             (key, key))
            return result[0]
        return None

    def llen(self, key):
        self.cur.execute("SELECT COUNT(*) FROM list_store WHERE key = ?", (key,))
        return self.cur.fetchone()[0]

    def lrange(self, key, start, stop):
        self.cur.execute("SELECT value FROM list_store WHERE key = ? "
                         "ORDER BY idx ASC LIMIT ? OFFSET ?",
                         (key, stop - start, start))
        return [row[0] for row in self.cur.fetchall()]

    def hset(self, key, field, value):
        self.cur.execute("REPLACE INTO hash_store (key, field, value) VALUES (?, ?, ?)",
                         (key, field, value))
        return 1

    def hget(self, key, field):
        self.cur.execute("SELECT value FROM hash_store WHERE key = ? AND field = ?",
                         (key, field))
        result = self.cur.fetchone()
        return result[0] if result else None

    def hdel(self, key, *fields):
        deleted = 0
        for field in fields:
            self.cur.execute("DELETE FROM hash_store WHERE key = ? AND field = ?",
                             (key, field))
            deleted += self.cur.rowcount
        return deleted

    def hlen(self, key):
        self.cur.execute("SELECT COUNT(*) FROM hash_store WHERE key = ?", (key,))
        return self.cur.fetchone()[0]

    def hgetall(self, key):
        self.cur.execute("SELECT field, value FROM hash_store WHERE key = ?", (key,))
        return dict(self.cur.fetchall())


# 사용 예시
r = SQLiteRedis()
r.set('mykey', 'Hello, World!')
print(r.get('mykey'))
r.expire('mykey', 10)
print(r.ttl('mykey'))

# LIST 사용
r.lpush("mylist", "world", "hello")
r.rpush("mylist", "!")
print(r.lrange("mylist", 0, -1))  # ['hello', 'world', '!']
print(r.lpop("mylist"))  # 'hello'

# HASH 사용
r.hset("myhash", "name", "John")
r.hset("myhash", "age", "30")
print(r.hget("myhash", "name"))  # 'John'
print(r.hgetall("myhash"))  # {'name': 'John', 'age': '30'}