Source code for mkpreview.database

"""
A database class to help with managing SQLite databases
"""
import os
import sqlite3
import sys
import time
from shutil import copyfile
from sqlite3 import Error


# Globals
DEBUG = False
QUIET = False

# Tables
TABLE = dict()


# noinspection PyProtectedMember
[docs]class Database: """ Database class provides a higher level connector to SQLite3 It provides methods for: Usage: myDB = Database('/tmp/filename.db') Methods: create() : Creates a Database or tests existence of database : Allows the database to be backed up if it exists. name() : Returns the current database filename. Flag is true if the database exist connect() : Connects this class to the initialized database close() : Commits and Closes the current connection createTable() : Creates a table from a dictionary and adds primary indexes : Allows overwrite (drop the data in the table) insertORupdate(): Allows for an insert if not there and an update if there. : Uses a dictionary. Checks dictionary against columns to avoid errors isTable() : Returns True if the tables exists backupTable() : Makes a copy of the table and renames it with a current timestamp dropTable() : Remove all of the data from the table sqlExecute() : Executes an arbitrary SQL Script against the open database. deleteData() : removes database based on table and where. Variables: global DEBUG : Used for output global QUIET : User for output local message : Return Message for all methods local data(dict): A dictionary for method use. local flag : True / False for results of method self.database : database name for all methods self.method : calling method name self.class : name of the class being called self.db_connect : name of the connection Returns: All methods return a [True | False] and a message If bad values are passed, it will fast fail and return a message. No further evaluations are made. """ global DEBUG global QUIET data = dict() flag = False message = "" def __init__(self, database, **kwargs): self.database = database self.db_connect = None self.class_name = self.__class__.__name__ method = sys._getframe().f_code.co_name self.QUIET = kwargs.get('quiet', QUIET) self.DEBUG = kwargs.get('debug', DEBUG) print('Quite? {}'.format(self.QUIET)) if not self.QUIET: print("{class_name}:{method}:initialized database:{database}".format( method=method, class_name=self.class_name, database=database)) print("{class_name}:{method}:debug:{debug}".format( method=method, debug=self.DEBUG, class_name=self.class_name,)) def __enter__(self): self.connect() def __exit__(self): self.close()
[docs] def name(self, **kwargs): """ PARAMS: ------- None RETURN ----- FLAG = True if the file exists / False if not MESSAGE = database name """ method = sys._getframe().f_code.co_name message = self.database flag = True if self.DEBUG: print('{classname}:{method}:arguments:{args}'.format(classname=self.class_name, method=method, args=kwargs)) if not os.path.isfile(self.database): flag = False if self.DEBUG: print('{classname}:{method}:file exists:{file}:{flag}'.format(classname=self.class_name, method=method, file=self.database,flag=flag)) if not self.QUIET: print("Database Name {}".format(message)) return flag, message
[docs] def create(self, **kwargs): """PARAMS: ------- overwrite = [True | False] backup = [True | False] If database exists, with overwrite FALSE and backup FALSE, method exists with False If database does not exist it is created If database exists and overwrite without backup, it is deleted If database exists and backup is set, database will be backed up; regardless of overwrite. RETURN ----- FLAG = True if a database is created, False if a database is not created. MESSAGE = database name + actions taken """ method = sys._getframe().f_code.co_name message = self.database + " " flag = True now = time.time() bak_name = "" modifier = time.strftime("%Y-%m-%d-%s", time.gmtime(now)) overwrite = kwargs.get('overwrite', False) backup = kwargs.get('backup', True) if self.DEBUG: print('{classname}:{method}:arguments:{args}'.format(classname=self.class_name, method=method, args=kwargs)) db_exists = self.name()[0] if DEBUG and db_exists: print('classname}:{method}:{database}:exists'.format(classname=self.class_name, method=method,database=self.database)) if backup: if DEBUG: print('classname}:{method}:{database}:backed up:{bak_name}'.format(classname=self.class_name, method=method,database=self.database,bak_name=bak_name)) dir_name = os.path.dirname(self.database) base_name = os.path.basename(self.database) file_name, file_ext = base_name.rsplit('.', 1) bak_name = dir_name + "/" + file_name + "_" + modifier + "." + file_ext try: copyfile(self.database, bak_name) message += "- File was backed up" except IOError as e: message += "- Error file was not backed up {error}".format(error=e) return False, message if overwrite and db_exists: try: os.remove(self.database) message += "- database file deleted" except Error as e: message += "- Error file was not deleted {error}".format(error=e) return False, message self.connect() wal = "PRAGMA journal_mode=WAL;" cur = self.db_connect.cursor() cur.execute(wal) results = cur.fetchall() message += "- " + str(sqlite3.version) + ":" + str(results) if db_exists and not overwrite: message += "- Wal mode is enabled" else: message += "- New Database created with WAL mode enabled" self.db_connect.close() return flag, message
[docs] def connect(self, **kwargs): """connect class to database """ method = sys._getframe().f_code.co_name flag = True if DEBUG: print('{classname}:{method}:arguments:{args}'.format(classname=self.class_name, method=method, args=kwargs)) try: self.db_connect = sqlite3.connect(self.database) except Error as e: print('connect_db Error {error}'.format(error=e)) flag = False message = sqlite3.version if DEBUG: print('DB Create {message}'.format(message=message)) return flag
[docs] def close(self, **kwargs): """Close the database connection """ method = sys._getframe().f_code.co_name if DEBUG: print('Class {classname} Method {method} Arguments {args}'.format(classname=self.class_name, method=method, args=kwargs)) self.db_connect.commit() self.db_connect.close() if DEBUG: print('Database transactions committed and connection is closed')
[docs] def commit(self, **kwargs): """Close the database connection """ method = sys._getframe().f_code.co_name if DEBUG: print('Class {classname} Method {method} Arguments {args}'.format(classname=self.class_name, method=method, args=kwargs)) self.db_connect.commit() if DEBUG: print('Database transactions committed ') return True, 'Database transactions committed'
[docs] def create_table(self, **kwargs): """create a table, optional drop if exists (default) If overwrite set to false, table will error if it exists """ method = sys._getframe().f_code.co_name message = "" flag = True if DEBUG: print('Class {classname} Method {method} Arguments {args}'.format(classname=self.class_name, method=method, args=kwargs)) table_name = kwargs.get('table', None) table_fields = kwargs.get('fields', None) overwrite = kwargs.get('overwrite', False) unique_fields = kwargs.get('unique', None) sql_statement = " " if not table_name and not table_fields: return False, "Error wrong parameters" if self.istable(table=table_name) and overwrite: c = self.db_connect.cursor() c.execute("""DROP TABLE `{TABLE}` ; """.format(TABLE=table_name)) elif self.istable(table=table_name) and not overwrite: return True, 'Table Exists' if DEBUG: print('Table: {0} Fields: {1}'.format(table_name, table_fields)) sql_start = "CREATE TABLE IF NOT EXISTS '{table_name}' (".format(table_name=table_name) sql_end = ");" sql_fields = " 'text',\n".join(table_fields) sql_statement += sql_start sql_statement += sql_fields sql_statement += sql_end if DEBUG: print('SQL: {0}'.format(sql_statement)) try: c = self.db_connect.cursor() results = c.execute(sql_statement) message = message + str(results) self.db_connect.commit() except Exception as error: message = 'Create Table Error {error}'.format(error=error) flag = False return flag, message if unique_fields: sql_statement = "" # Create Unique indexes on these fields for unique in unique_fields: sql_statement += "CREATE UNIQUE INDEX `idx_{index}` ON `{table}` (`{index}`); ".format(table=table_name, index=unique) try: c.executescript(sql_statement) except Exception as error: flag = False print('Failed: Index not added: {}'.format(error)) return flag, message
[docs] def insert_update(self, **kwargs): """Insert a row or update the row with the field names in a dictionary parameters: ----------- table = Table Name key_field = Field for initial update and where value key_value = Where Value data = Data to update """ method = sys._getframe().f_code.co_name if DEBUG: print('Class {classname} Method {method} Arguments {args}'.format(classname=self.class_name, method=method, args=kwargs)) header_row = list() table_name = kwargs.get('table', None) key_field = kwargs.get('key_field', None) key_value = kwargs.get('key_value', None) data = kwargs.get('data', None) if not table_name and not key_field and not key_value and not data: return False, 'Invalid arguments {}'.format(kwargs) # Build the header for all field names. sql_statement = "SELECT name FROM PRAGMA_TABLE_INFO('{TABLE}');".format(TABLE=table_name) if DEBUG: print(sql_statement) try: c = self.db_connect.cursor() c.execute(sql_statement) for field_header in c.fetchall(): header_row.append(field_header[0]) if DEBUG: print(header_row) except Error as e: message = 'Create Table Error {error}'.format(error=e) if DEBUG: print('Error {}'.format(message)) return False, message # Insert or ignore the key field sql_statement = "INSERT or ignore INTO `{table}` (`{key_field}`) VALUES(?) ; ".format(table=table_name, key_field=key_field) if DEBUG: print('Insert/Ignore SQL {}'.format(sql_statement)) c = self.db_connect.cursor() results = c.execute(sql_statement, [key_value]) if DEBUG: print(c.lastrowid) if results: message = "SQL object inserted" self.db_connect.commit() else: message = "Failed to insert object" # Update the previous row inserted sql_list = list() sql_statement = "UPDATE `{TABLE}` SET \n".format(TABLE=table_name) for field in header_row: if data.get(field, None): sql_list.append("`{field}`='{value}'\n".format(field=field, value=data.get(field, None))) sql_statement += ", ".join(sql_list) sql_statement += " WHERE `{key_field}` like '%{key_value}%';".format(key_field=key_field, key_value=key_value) try: if DEBUG: print(sql_statement) c.execute(sql_statement) except Error as e: if DEBUG: print('Error {}'.format(e)) return True, message + str(c.lastrowid)
[docs] def istable(self, **kwargs): """ Test if table exists; if so return true """ method = sys._getframe().f_code.co_name if DEBUG: print('Class {classname} Method {method} Arguments {args}'.format(classname=self.class_name, method=method, args=kwargs)) table_name = kwargs.get('table', None) c = self.db_connect.cursor() c.execute(''' SELECT count(name) FROM sqlite_master WHERE type='table' AND name='{TABLE}' '''.format( TABLE=table_name)) if c.fetchone()[0] == 1: flag = True else: flag = False return flag
[docs] def backup_table(self, **kwargs): method = sys._getframe().f_code.co_name now = time.time() if DEBUG: print('Class {classname} Method {method} Arguments {args}'.format(classname=self.class_name, method=method, args=kwargs)) modifier = time.strftime("%Y-%m-%d-%s", time.gmtime(now)) table_name = kwargs.get('table', None) table_back = table_name + "_" + modifier sql_statement = "CREATE TABLE `{table_back}` as SELECT * from `{table}`;".format(table_back=table_back, table=table_name) if DEBUG: print('{classname}:{method}:from{table} to:{table_back}'. format(classname=self.class_name, method=method, table=table_name, table_back=table_back )) flag, message = self.sql_exec(sql=sql_statement) return flag, message
[docs] def sql_exec(self, **kwargs): """ PARAMS: -------- SQL : SQL Statement (no expansion) RETURNS: --------- flag : True / False results : SQL Results """ method = sys._getframe().f_code.co_name message = "" flag = True sql_statement = kwargs.get('sql', None) if DEBUG: print('Class {classname} Method {method} Arguments {args}'.format(classname=self.class_name, method=method, args=kwargs)) if not sql_statement: flag = False message = "SQL Statement missing" else: try: c = self.db_connect.cursor() c.executescript(sql_statement) self.db_connect.commit() message = c.fetchall() except Error as e: print('Failed: {}'.format(e)) if DEBUG: print('SQL executed {flag} statement: {sql}'.format(flag=flag, sql=sql_statement)) return flag, message
[docs] def delete_data(self, **kwargs): method = sys._getframe().f_code.co_name if DEBUG: print('Class {classname} Method {method} Arguments {args}'.format(classname=self.class_name, method=method, args=kwargs)) table_name = kwargs.get('table', None) key_field = kwargs.get('key_field', None) key_value = kwargs.get('key_value', None) sql_statement = "DELETE FROM `{TABLE}` WHERE {KEY} is like '%{VALUE}%';".format(TABLE=table_name, KEY=key_field, VALUE=key_value) flag, message = self.sql_exec(sql=sql_statement) return flag, message
[docs] def truncate_table(self, **kwargs): """truncates the table using delete table='Table Name' """ method = sys._getframe().f_code.co_name table_name = kwargs.get('table', None) if DEBUG: print('Class {classname} Method {method} Arguments {args}'.format(classname=self.class_name, method=method, args=kwargs)) sql_statement = "DELETE FROM `{TABLE}`;".format(TABLE=table_name) flag, message = self.sql_exec(sql=sql_statement) return flag, message