nonebot_plugin_csust_notice.../db.py

70 lines
1.9 KiB
Python

import sqlite3
import os
from .notice import Notice
from typing import Any
from nonebot.log import logger
class CachedNotice(object):
def __init__(self, db_path: str) -> None:
self.engine = None
self.db_path = db_path
self.connect()
self.init_table()
def __del__(self):
self.engine.close()
def connect(self):
self.engine = sqlite3.connect(self.db_path)
def init_table(self):
cur = self.engine.cursor()
cur.execute("""CREATE TABLE IF NOT EXISTS notices (
id BIGINT PRIMARY KEY,
url TEXT NOT NULL DEFAULT '',
title TEXT NOT NULL DEFAULT '',
content_preview TEXT NOT NULL DEFAULT '',
publish_date DATE NOT NULL DEFAULT '',
content TEXT NOT NULL DEFAULT '',
source TEXT NOT NULL DEFAULT ''
);""")
self.engine.commit()
cur.close()
def get_notice_by_id(self, id: int) -> Notice:
cur = self.engine.cursor()
cur.execute(
f"""SELECT id, url, title, content_preview, publish_date, content, source FROM notices WHERE id = {id}""")
data = cur.fetchone()
cur.close()
if data:
return Notice(*data)
else:
return None
def is_notice_exists(self, notice_id: int) -> bool:
res = self.get_notice_by_id(notice_id)
if res:
return True
else:
return False
def add_notice(self, data: Notice):
if self.is_notice_exists(data.id):
return
cur = self.engine.cursor()
sql = """INSERT INTO notices (id, url, title, content_preview, publish_date, content, source)
VALUES ({}, '{}', '{}', '{}', '{}', '{}', '{}');""".format(*map(quote_escape, data.get_properties()))
cur.execute(sql)
self.engine.commit()
cur.close()
def quote_escape(x: str | Any):
if isinstance(x, str):
x = x.replace("'", "''")
return x