Source code for egegrouper.sqlite3_model

# EGEGrouper - Software for grouping electrogastroenterography examinations.

# Copyright (C) 2017-2018 Aleksandr Popov

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import sqlite3
import numpy as np
from collections import OrderedDict
import os

from .base_model import BaseModel
from . import sme
from . import sme_sqlite3
from . import sme_json

[docs]class Model(BaseModel): """Model implementation for SQLite3 SME database."""
[docs] def __init__(self): """Constructor. Create fields and set initial state of model. """ super().__init__() self.conn = None self.c = None self.set_state(storage_opened=False, file_name = None)
[docs] def create_storage(self, file_name): """Create new database. Parameters ---------- file_name : str File name. """ if self.state()['storage_opened']: self.close_storage() abs_file_name = os.path.expanduser(file_name) if os.path.isfile(abs_file_name): os.remove(abs_file_name) self.conn = sqlite3.connect(abs_file_name) self.c = self.conn.cursor() self.c.executescript(create_sme_db_script()) self.conn.commit() self.set_state(storage_opened=True, file_name = abs_file_name)
[docs] def open_storage(self, file_name): """Open database. Parameters ---------- file_name : str File name. """ if self.state()['storage_opened']: self.close_storage() abs_file_name = os.path.expanduser(file_name) self.conn = sqlite3.connect(abs_file_name) self.c = self.conn.cursor() self.c.execute("pragma foreign_keys=on") self.set_state(storage_opened=True, file_name=abs_file_name)
[docs] def close_storage(self): """Close current database.""" if self.state()['storage_opened']: self.conn.close() self.set_state(storage_opened=False, file_name=None)
[docs] def storage_exists(self, file_name): """Check if the file exists. Parameters ---------- file_name : str Name of database file. Returns ------- : bool True if exists, False otherwise. """ abs_file_name = os.path.expanduser(file_name) return os.path.isfile(abs_file_name)
@BaseModel.do_if_storage_opened @BaseModel.do_if_storage_opened def storage_info(self): """Return tabular information about database. Returns ------- ext_data : list of tuple Table with information about database. ext_headers : tuple Headers. """ self.c.execute(""" SELECT * FROM egeg_group; """) data = self.c.fetchall() headers = tuple(map(lambda x: x[0], self.c.description)) num_in_groups = [] ext_data = [] for d in data: self.c.execute(""" SELECT COUNT(E.exam_id) FROM examination as E, group_element as GE WHERE GE.exam_id = E.exam_id AND GE.group_id = ? """, [d[0], ]) n = self.c.fetchall()[0][0] ext_data.append(d + (n,)) ext_headers = headers + ("number",) self.c.execute(""" SELECT COUNT(exam_id) FROM examination WHERE exam_id NOT IN (SELECT exam_id FROM group_element) """) ungrouped_num = self.c.fetchall()[0][0] last_string = ['' for h in ext_headers] last_string[0] = '0' last_string[-2] = 'Ungrouped' last_string[-1] = ungrouped_num ext_data.append(tuple(last_string)) return ext_data, ext_headers @BaseModel.do_if_storage_opened def group_info(self, group_id): """Return short information about examinations in selected group. Parameters ---------- group_id : str Group ID Returns ------- data : list of tuple Examination descriptions. headers : tuple Headers. """ if group_id == '0': self.c.execute(""" SELECT exam_id, name, diagnosis, age, gender FROM examination WHERE exam_id NOT IN (SELECT exam_id FROM group_element) """) else: self.c.execute(""" SELECT COUNT(*) FROM egeg_group WHERE group_id = ? """, [group_id]) if self.c.fetchall()[0][0] == 0: return None, None self.c.execute(""" SELECT E.exam_id, E.name, E.diagnosis, E.age, E.gender FROM examination AS E, group_element AS GE WHERE GE.exam_id = E.exam_id AND GE.group_id = ? """, [group_id, ]) data = self.c.fetchall() headers = tuple(map(lambda x: x[0], self.c.description)) return data, headers @BaseModel.do_if_storage_opened def insert_exam(self, exam): """Insert examination into current database. Parameters ---------- exam : sme.Examination Examination object """ sme_sqlite3.put_exam(self.conn, exam) @BaseModel.do_if_storage_opened def delete_exam(self, exam_id): """Remove examination from database. Parameters ---------- exam_id : str Examination ID. """ self.c.execute(""" DELETE FROM Examination WHERE exam_id = ? """,(exam_id, )) self.conn.commit() @BaseModel.do_if_storage_opened def insert_group(self, name, description): """Add new group of examinations. Parameters ---------- name : str Name of new group. description : str Description for new group. """ self.c.execute(""" INSERT INTO egeg_group (name, description) VALUES (?, ?) """, [name, description, ]) self.conn.commit() @BaseModel.do_if_storage_opened def delete_group(self, group_id): """Delete group of examinations from database. Parameters ---------- group_id : str Group ID. """ self.c.execute(""" DELETE FROM egeg_group WHERE group_id = ? """, [group_id, ]) self.conn.commit() @BaseModel.do_if_storage_opened def group_exam(self, exam_id, group_ids, placed_in): """Add and delete examination to and from groups. Parameters ---------- exam_id : str Examination ID. group_ids : list of str Group IDs. placed_in : list of bool True for examinations to be placed in groups. Length of group_ids must be equal to length of placed_in. """ for (group_id, p) in zip(group_ids, placed_in): if p: try: self.c.execute(""" INSERT OR IGNORE INTO group_element VALUES (?, ?) """, [exam_id, group_id, ]) except sqlite3.IntegrityError: pass else: try: self.c.execute(""" DELETE FROM group_element WHERE exam_id = ? AND group_id = ? """, [exam_id, group_id, ]) except sqlite3.IntegrityError: pass self.conn.commit() @BaseModel.do_if_storage_opened def where_exam(self, exam_id): """Return description of groups where examination in or not in. Parameters ---------- exam_id : str Examination ID. Returns ------- group_records : list of tuple All group records. headers : list of str Names of group attributes. placed_in : list of bool True if exam in group, False otherwise. """ including_exam_groups_ids = [ r[0] for r in self.c.execute(""" SELECT G.group_id FROM egeg_group as G, group_element WHERE G.group_id = group_element.group_id AND group_element.exam_id = ? """, [exam_id]) ] self.c.execute("SELECT * FROM egeg_group") group_records = self.c.fetchall() headers = tuple(map(lambda x: x[0], self.c.description)) placed_in = [gr[0] in including_exam_groups_ids for gr in group_records] return group_records, headers, placed_in @BaseModel.do_if_storage_opened def group_record(self, group_id): """Return attribute names and values of selected group. Parameters ---------- group_id : str Group ID. Returns ------- : OrderedDict Attributes names and values for selected group. """ attr = OrderedDict() self.c.execute(""" SELECT name, description FROM egeg_group WHERE group_id = ? """, (group_id, )) attr['name'], attr['description'] = self.c.fetchone() return attr @BaseModel.do_if_storage_opened def update_group_record(self, group_id, attr): """Update group record in database. Parameters ---------- group_id : str Group ID. attr : OrderedDict Attributes names and values. """ self.c.execute(""" UPDATE egeg_group SET name = ?, description = ? WHERE group_id = ? """, (attr['name'], attr['description'], group_id, )) self.conn.commit() @BaseModel.do_if_storage_opened def exam(self, exam_id, meta_only=False): """Return examination from database. Parameters ---------- exam_id : str Examination ID. meta_only : bool Get only meta data if True. Returns ------- sme.Examination Examination object. """ return sme_sqlite3.get_exam(self.conn, exam_id, meta_only) def __exam_ids(self, group_id): """ Return examination ids in group. """ if group_id == '*': self.c.execute("SELECT exam_id FROM examination") elif group_id == '0': self.c.execute(""" SELECT exam_id FROM examination WHERE exam_id NOT IN (SELECT exam_id FROM group_element) """) else: self.c.execute(""" SELECT E.exam_id FROM examination AS E, group_element AS GE WHERE GE.exam_id = E.exam_id AND GE.group_id = ? """, [group_id, ]) ans = self.c.fetchall() if len(ans) == 0: return [] res = [row[0] for row in ans] return res def __exams_of_group(self, group_id, meta_only=False): """ Return examination in group. """ exam_ids = self.__exam_ids(group_id) res = [self.exam(exam_id, meta_only) for exam_id in exam_ids] return res def __exams_of_groups(self, group_ids, meta_only=False): """ Return examinations in several groups. """ ids_lists = [self.__exam_ids(g_id) for g_id in group_ids] exam_ids_set = set(ids_lists[0]) for ids_list in ids_lists[1:]: exam_ids_set = exam_ids_set | set(ids_list) exam_ids = list(exam_ids_set) es = [self.exam(exam_id) for exam_id in exam_ids] return es @BaseModel.do_if_storage_opened def exams(self, group_id, meta_only=False): """ Return exams from selected group or groups. Parameters ---------- group_id : str or list of str Group ID. If list, the union of sets of examinations from groups returned. meta_only: bool If True, only meta data returned. Returns ------- exams: list of sme.Examination Examinations list. """ if type(group_id) == type("1"): es = self.__exams_of_group(group_id, meta_only) return es es = self.__exams_of_groups(group_id, meta_only) return es
def create_sme_db_script(): query = """ pragma foreign_keys=1; create table signal( signal_id integer, data blob, dt real, edited integer, meas_id integer references measurement(meas_id) on delete cascade, primary key(signal_id) ); create table measurement( meas_id integer, time text, exam_id integer references examination(exam_id) on delete cascade, primary key(meas_id) ); create table examination( exam_id integer, name text, diagnosis text, age integer, gender text, primary key(exam_id) ); create table egeg_group( group_id integer, name text, description text, primary key(group_id) ); create table group_element( exam_id integer references examination(exam_id) on delete cascade, group_id integer references egeg_group(group_id) on delete cascade, primary key(exam_id, group_id) ); """ return query