Files
..
2024-09-15 11:04:57 +00:00
2024-12-03 06:25:54 +00:00
2024-09-15 11:04:57 +00:00

Simple SQL

def SQL(db='sql.db', *, sqlite3=__import__('sqlite3')):
    con = sqlite3.connect(db, isolation_level=None, timeout=1e999)
    con.execute('PRAGMA journal_mode = WAL'); con.row_factory = sqlite3.Row
    return lambda q, *p: list(map(dict, con.execute(q, p)))
sql = SQL()
sql("SELECT * FROM sqlite_master WHERE type = 'table'")

Dedupe

def SQL():
    import sqlite3, hashlib, os
    con = sqlite3.connect('.db', isolation_level=None)
    sql = lambda q, *p: list(con.execute(q, p))
    if not os.path.exists('.db-blob') and os.mkdir('.db-blob') is None:
        con.executescript('''PRAGMA journal_mode=WAL;
            CREATE TABLE kv(k, v, t DEFAULT CURRENT_TIMESTAMP);
            CREATE INDEX idx_kv_v ON kv(v); CREATE INDEX idx_kv_k_t ON kv(k,t DESC)''')
    def setitem(_, filename, blob):
        if not sql('SELECT 1 FROM kv WHERE v=?', sha1 := hashlib.sha1(blob).hexdigest()):
            with open(f'.db-blob/{sha1}', 'xb') as f: f.write(blob)
        sql('INSERT INTO kv(k,v) VALUES(?,?)', filename, sha1)
    def getitem(_, filename):
        if sha1 := sql('SELECT v FROM kv WHERE k=? ORDER BY t DESC', filename):
            return open(f'.db-blob/{sha1[0][0]}', 'rb').read()
    return type('', (), dict(__setitem__=setitem, __getitem__=getitem))()
sql = SQL()

print(sql['hello']) # None
sql['hello'] = b'world'
print(sql['hello']) # b'world'
print(sql['hello']) # b'world'

Dict-like

def SQL():
    from sqlite3 import connect, Row
    (con := connect('.db', isolation_level=None)).row_factory = Row
    con.execute('PRAGMA journal_mode=wal')
    con.execute('PRAGMA busy_timeout='f'{1e9}')
    con.execute('CREATE TABLE IF NOT EXISTS kv(k PRIMARY KEY, v)')
    return type('', (), dict(
        __call__=lambda _, q, *p: list(map(dict, con.execute(q, p))),
        __setitem__=lambda sql, k, v: sql('REPLACE INTO kv VALUES(?,?)', k, v),
        __getitem__=lambda sql, k: sql('SELECT v FROM kv WHERE k=?', k)[0]['v'],
        __iter__=lambda sql: (kv.values() for kv in sql('SELECT * FROM kv'))))()
sql = SQL()
''
sql[1] = 2
print(sql[1]) # 2
print(dict(sql)) # {1 : 2}
def Dict(db='dict.db', *, sqlite3=__import__('sqlite3')):
    sql = sqlite3.connect(db, isolation_level=None, timeout=1e999).execute
    sql('PRAGMA journal_mode=WAL'); sql('CREATE TABLE IF NOT EXISTS d(k PRIMARY KEY, v)')
    return type('', (), dict(__iter__=lambda _: (k[0] for k in sql('SELECT k FROM d')),
    __setitem__=lambda _, *kv: sql('INSERT OR REPLACE INTO d VALUES (?,?)', kv),
    __getitem__=lambda _, *k: sql('SELECT v FROM d WHERE k = ?', k).fetchone()[0]))()
sql = Dict()
sql[4] = 2
print(sql[4]) # 2
print(list(sql)) # [4]

Cache

class Fetch:
    def __init__(self, db='fetch.db'):
        self.con = __import__('sqlite3').connect(db, isolation_level=None)
        self('PRAGMA busy_timeout='f'{1e9}')
        self('PRAGMA journal_mode=WAL')
        self('PRAGMA wal_checkpoint(FULL)')
        self('CREATE TABLE IF NOT EXISTS cache(url PRIMARY KEY, blob)')
    def __call__(self, *args): return list(self.con.execute(*args))
    def __getitem__(self, url):
        if blob := self('SELECT blob FROM cache WHERE url=?', (url,)):
            return blob[0][0]
        else:
            blob = __import__('urllib').request.urlopen(url).read()
            self('INSERT INTO cache VALUES(?,?)', (url, blob))
            return blob
fetch = Fetch()

Class

class SQL:
    def __init__(self, filename='db.sqlite'):
        import sqlite3
        self.conn = sqlite3.connect(filename, 
            isolation_level=None # autocommit mode: ON
        )
        self('PRAGMA journal_mode=WAL;') # WAL mode: ON
        self.conn.row_factory = lambda *args: dict(sqlite3.Row(*args))

    def __call__(self, *args):
        import pandas as pd
        return pd.DataFrame(self.conn.execute(*args))

    def __getattr__(self, table):
        return self(f"SELECT * FROM {table}")

    def __repr__(self):
        return self("SELECT name FROM sqlite_master WHERE type='table'").to_string()
#%%
try: get_ipython()
except: display = print

sql = SQL()
sql

#%%
# 학생 테이블 생성
sql('''CREATE TABLE IF NOT EXISTS students 
       (id INTEGER PRIMARY KEY, name TEXT, age INTEGER, grade INTEGER)''')

# 과목 테이블 생성
sql('''CREATE TABLE IF NOT EXISTS courses 
       (id INTEGER PRIMARY KEY, name TEXT, professor TEXT)''')

# 수강 테이블 생성 (학생과 과목의 관계)
sql('''CREATE TABLE IF NOT EXISTS enrollments 
       (student_id INTEGER, course_id INTEGER, 
        FOREIGN KEY(student_id) REFERENCES students(id), 
        FOREIGN KEY(course_id) REFERENCES courses(id))''')
sql

#%%
# 학생 데이터 삽입
sql('INSERT INTO students (name, age, grade) VALUES (?, ?, ?)', ('김철수', 20, 2))
sql('INSERT INTO students (name, age, grade) VALUES (?, ?, ?)', ('이영희', 22, 4))
sql('INSERT INTO students (name, age, grade) VALUES (?, ?, ?)', ('박민수', 19, 1))

# 과목 데이터 삽입
sql('INSERT INTO courses (name, professor) VALUES (?, ?)', ('데이터베이스', '홍길동'))
sql('INSERT INTO courses (name, professor) VALUES (?, ?)', ('알고리즘', '이순신'))
sql('INSERT INTO courses (name, professor) VALUES (?, ?)', ('인공지능', '강감찬'))

# 수강 데이터 삽입
sql('INSERT INTO enrollments (student_id, course_id) VALUES (?, ?)', (1, 1))
sql('INSERT INTO enrollments (student_id, course_id) VALUES (?, ?)', (1, 2))
sql('INSERT INTO enrollments (student_id, course_id) VALUES (?, ?)', (2, 2))
sql('INSERT INTO enrollments (student_id, course_id) VALUES (?, ?)', (3, 3))

print("Students:")
display(sql.students)
print("\nCourses:")
display(sql.courses)
print("\nEnrollments:")
display(sql.enrollments)

#%%
# JOIN을 사용하여 학생별 수강 과목 조회
print("\n학생별 수강 과목:")
enrollments_df = sql('''
    SELECT students.name AS student_name, courses.name AS course_name
    FROM students
    JOIN enrollments ON students.id = enrollments.student_id
    JOIN courses ON enrollments.course_id = courses.id
''')
display(enrollments_df)

#%%
# 특정 과목을 수강하는 학생 수 조회
course_name = '알고리즘'
result = sql('''
    SELECT COUNT(*) as student_count
    FROM enrollments
    JOIN courses ON enrollments.course_id = courses.id
    WHERE courses.name = ?
''', (course_name,))
print(f"\n'{course_name}' 과목을 수강하는 학생 수: {result['student_count'].iloc[0]}")

#%%
# 가장 많은 학생이 수강하는 과목 조회
most_popular_course = sql('''
    SELECT courses.name, COUNT(*) as enrollment_count
    FROM enrollments
    JOIN courses ON enrollments.course_id = courses.id
    GROUP BY courses.id
    ORDER BY enrollment_count DESC
    LIMIT 1
''')
print(f"\n가장 인기 있는 과목: {most_popular_course['name'].iloc[0]} (수강생 {most_popular_course['enrollment_count'].iloc[0]}명)")

#%%
# 최종 상태 출력
print("\n최종 상태:")
print("Students:")
display(sql('SELECT * FROM students'))
print("\nCourses:")
display(sql('SELECT * FROM courses'))
print("\nEnrollments:")
display(sql('SELECT * FROM enrollments'))