import os
import shutil
import sqlite3
from af.controller.data.DataController import DataController
[docs]class SqliteController(DataController):
"""Class that implements a way to connecting to sqlite dbs
"""
CONTROLLER_TYPE = 'sqlite'
CONTROLLER_EXTENSION = 'SQLite (*.sqlite3 *.db *.sqlite)'
def __init__(self, data_location):
DataController.__init__(self, data_location)
[docs] def execute_query(self, query):
"""Executes a query against a loaded db table.
Returns the query result in the form of a generator.
:param string query: query to execute
:rtype: list<generator>
"""
with sqlite3.connect(self.data_location) as conn:
cursor = conn.cursor()
cursor.execute(query)
for row in cursor:
yield row
[docs] def db_available_tables(self):
"""Returns all the available tables of a sqlite database.
:rtype: list
"""
query = "SELECT name FROM sqlite_master WHERE type='table';"
tables = list(self.execute_query(query))
return tables
[docs] def table_columns_info(self, table_name):
"""Returns all the header data of a given sqlite table
:param string table_name: name of the table which is queried
:rtype: list
"""
query = "SELECT * FROM {table}".format(table=table_name)
with sqlite3.connect(self.data_location) as conn:
cursor = conn.cursor()
cursor.execute(query)
columns_info = list(map(lambda x: x[0], cursor.description))
return columns_info
[docs] def get_table_data(self, table_name):
"""Returns all the data of a given sqlite table
:param string table_name: name of the table which is queried
:rtype: list
"""
query = "SELECT * FROM {table}".format(table=table_name)
return list(self.execute_query(query))
[docs] def get_table_columns_type(self, table_name):
"""Returns the types of all the columns of a given sqlite table
:param string table_name: name of the table which is queried
:rtype: string
"""
query = "SELECT * FROM {table} LIMIT 1".format(table=table_name)
return [type(column) for column in list(self.execute_query(query))[0]]
[docs] def amount_of_rows(self, table_name):
"""Returns the amount of rows a table contains
:param string table_name: name of the table which is queried
:rtype: int
"""
query = "SELECT COUNT(*) FROM {table}".format(table=table_name)
return list(self.execute_query(query))[0][0]
[docs] def get_frequency_of_qi_attributes(self, table_name, qi_list):
"""Returns the frequency of certain attributes list on a given sqlite table.
:param string table_name: name of the table which is queried
:param list qi_list: list of attributes to query their frequency
:rtype: list<generator>
"""
query = "SELECT COUNT(*), "
query += ','.join(qi_list) + ' '
query += "FROM {table} GROUP BY ".format(table=table_name)
query += ','.join(qi_list)
for freq in self.execute_query(query):
yield freq
[docs] def get_frequency_of_eq_classes(self, table_name, qi_list):
"""Returns the frequency of the equivalence classes of attributes list on a given sqlite table.
:param string table_name: name of the table which is queried
:param list qi_list: list of attributes to query their frequency
:rtype: list
"""
query = "SELECT COUNT(*) FROM {table} GROUP BY ".format(table=table_name)
query += ','.join(qi_list)
return len(list(self.execute_query(query)))
[docs] def get_count_of_distinct_qi_values(self, table_name, qi):
"""Returns count of distinct values of a certain qi attribute
:param string table_name: name of the table which is queried
:param string qi: Quasi Identifier attribute name
:rtype: int
"""
query = "SELECT COUNT(distinct {qi}) FROM {table}".format(table=table_name, qi=qi)
for row in self.execute_query(query):
yield row[0]
[docs] def get_distinct_qi_values(self, table_name, qi):
"""Returns all the distinct values of a certain qi attribute
:param string table_name: name of the table which is queried
:param string qi: Quasi Identifier attribute name
:rtype: list<generator>
"""
query = "SELECT distinct {qi} FROM {table}".format(table=table_name, qi=qi)
for row in self.execute_query(query):
yield row[0]
@staticmethod
[docs] def create_db_copy(from_location, to_location):
"""Creates the copy of a certain db to a new location
:param string from_location: original location of the db
:param string to_location: new location for the db
"""
if os.path.isfile(to_location):
os.remove(to_location)
shutil.copy2(from_location, to_location)
[docs] def update_qi_value(self, table_name, qi, new_value, old_value):
"""Given a table and a qi attribute of it, it updates it's value to a new one.
:param string table_name: name of the table which is queried
:param string qi: Quasi Identifier attribute name
:param string new_value: New value for the qi attribute
:param string old_value: Current value of the qi attribute
"""
query = "UPDATE {table} SET {qi}=? WHERE {qi}= ?".format(table=table_name, qi=qi)
with sqlite3.connect(self.data_location) as conn:
cursor = conn.cursor()
cursor.execute(query, (new_value, old_value))
conn.commit()
[docs] def update_qi_values_in_range(self, cursor, table_name, qi, new_value, old_values):
"""Given a table and a qi attribute of it, it updates it's value to a new one for those belonging to a certain range.
:param string table_name: name of the table which is queried
:param string qi: Quasi Identifier attribute name
:param string new_value: New value for the qi attribute
:param list old_values: List of possible current values of the qi attribute
"""
query = "UPDATE {table} SET {qi}=? WHERE {qi} IN ( ".format(table=table_name, qi=qi)
for value in old_values:
query += '?'
if value != old_values[-1]:
query += ', '
query += ')'
old_values.insert(0, new_value)
cursor.execute(query, tuple(old_values))
[docs] def update_qi_values(self, table_name, qi, dic):
"""Given a table and a qi attribute of it, it updates the qi using the dic key-value store where the key is the new value and the value of the dic are the old values
:param string table_name: name of the table which is queried
:param string qi: Quasi Identifier attribute name
:param dict dicc: Dictionary containing all the updates of the form {new_value: list_of_old_values}
"""
with sqlite3.connect(self.data_location) as conn:
cursor = conn.cursor()
for new_value, old_values in dic.iteritems():
self.update_qi_values_in_range(cursor, table_name, qi, new_value, old_values)
[docs] def get_count_of_qi_value(self, table_name, qi_list, values):
"""Return the amount of times a certain row contains the values of a qi attribute list
:param string table_name: name of the table which is queried
:param list qi_list: List containing all the qi attributes names to query
:param list values: List of all the particular values for each of the attributes contained on the qi_list
:rtype: int
"""
self.validate_param_lengths(qi_list, values)
query = "SELECT COUNT(*) FROM {table} WHERE ".format(table=table_name)
for qi in qi_list:
query += "{qi} = ?".format(qi=qi)
if qi != qi_list[-1]:
query += ' AND '
with sqlite3.connect(self.data_location) as conn:
cursor = conn.cursor()
return list(cursor.execute(query, tuple(values)))[0][0]
[docs] def remove_row(self, cursor, table_name, qi_list, values):
"""Deletes those rows on the table that match the values conditions for the qi_list attributes
:param string table_name: name of the table which is queried
:param list qi_list: List containing all the qi attributes names to query
:param list values: List of all the particular values for each of the attributes contained on the qi_list
"""
self.validate_param_lengths(qi_list, values)
query = "DELETE FROM {table} WHERE ".format(table=table_name)
for qi in qi_list:
query += "{qi} = ?".format(qi=qi)
if qi != qi_list[-1]:
query += ' AND '
cursor.execute(query, tuple(values))
[docs] def remove_rows(self, table_name, qi_list, rows_to_remove):
"""Given a list of row values to remove, it deletes each one of them from the given table.
:param string table_name: name of the table which is queried
:param list qi_list: List containing all the qi attributes names to query
:param list rows_to_remove: Contains all the values to remove from the table.
"""
with sqlite3.connect(self.data_location) as conn:
cursor = conn.cursor()
for row in rows_to_remove:
self.remove_row(cursor, table_name, qi_list, row)
[docs] def rename_table(self, old_table_name, new_table_name):
"""Renames a certain table to a new table name
:param string old_table_name: Current name of the table
:param string new_table_name: New name for the table
"""
query = "ALTER TABLE {old_table_name} RENAME TO {new_table_name}".format(old_table_name=old_table_name, new_table_name=new_table_name)
with sqlite3.connect(self.data_location) as conn:
cursor = conn.cursor()
cursor.execute(query)
conn.commit()
@staticmethod
[docs] def validate_param_lengths(qi_list, values):
"""Validates that the qi_list attributes names coincide in length with the values associated.
:param list qi_list: Attributes names
:param list values: Values for the attributes of the qi_list
"""
if len(qi_list) != len(values):
raise Exception("The lenght of the attributes is different from the values")
[docs] def execute_many(self, query, values_list):
"""Execute a query in the form of a typical executemany sqlite fashion
:param string query: Query to execute in bulk mode
:param list values_list: Values that will go taking place during the bulk query
"""
with sqlite3.connect(self.data_location) as conn:
cursor = conn.cursor()
cursor.executemany(query, values_list)
conn.commit()
[docs] def get_groups_examples(self, table, qi_list):
"""Given a table name and a qi list, retrieves certain distinct rows in the form of a dictionary containing the columns info and the data sample.
:param string table: Table name to query
:param list qi_list: Quasi-Identifiable attributes names to group by
:rtype: dict
"""
query = "SELECT count(*) as amount, * from {0} GROUP BY {1} LIMIT 10;".format(table, ','.join(qi_list))
with sqlite3.connect(self.data_location) as conn:
cursor = conn.cursor()
cursor.execute(query)
columns_info = list(map(lambda x: x[0].title(), cursor.description))
data = cursor.fetchall()
return {'columns': columns_info, 'data': data}