Listagem 11 - Página 423: Classe DBAgenda

##############################################################################
# Parte do livro Introdução à Programação com Python
# Autor: Nilo Ney Coutinho Menezes
# Editora Novatec (c) 2010-2024
# Quarta Edição - Março/2024 - ISBN 978-85-7522-886-9
#
# Site: https://python.nilo.pro.br/
#
# Arquivo: capítulo 11/11.1541 - Sem Título.py
# Página: 423
# Título: Classe DBAgenda
##############################################################################
BANCO = """
create table tipos(id integer primary key autoincrement,
                   descrição text);
create table nomes(id integer primary key autoincrement,
                   nome text);
create table telefones(id integer primary key autoincrement,
                   id_nome integer,
                   número text,
                   id_tipo integer);
insert into tipos(descrição) values ("Celular");
insert into tipos(descrição) values ("Fixo");
insert into tipos(descrição) values ("Fax");
insert into tipos(descrição) values ("Casa");
insert into tipos(descrição) values ("Trabalho");
"""


class DBAgenda:
    def __init__(self, banco):
        self.tiposTelefone = DBTiposTelefone()
        self.banco = banco
        novo = not os.path.isfile(banco)
        self.conexão = sqlite3.connect(banco)
        self.conexão.row_factory = sqlite3.Row
        if novo:
            self.cria_banco()
        self.carregaTipos()

    def carregaTipos(self):
        for tipo in self.conexão.execute("select * from tipos"):
            id_ = tipo["id"]
            descrição = tipo["descrição"]
            self.tiposTelefone.adiciona(DBTipoTelefone(id_, descrição))

    def cria_banco(self):
        self.conexão.executescript(BANCO)

    def pesquisaNome(self, nome):
        if not isinstance(nome, DBNome):
            raise TypeError("nome deve ser do tipo DBNome")
        achado = self.conexão.execute(
            """select count(*)
                                         from nomes where nome = ?""",
            (nome.nome,),
        ).fetchone()
        if achado[0] > 0:
            return self.carrega_por_nome(nome)
        else:
            return None

    def carrega_por_id(self, id):
        consulta = self.conexão.execute("select * from nomes where id = ?", (id,))
        return self.carrega(consulta.fetchone())

    def carrega_por_nome(self, nome):
        consulta = self.conexão.execute(
            "select * from nomes where nome = ?", (nome.nome,)
        )
        return self.carrega(consulta.fetchone())

    def carrega(self, consulta):
        if consulta is None:
            return None
        novo = DBDadoAgenda(DBNome(consulta["nome"], consulta["id"]))
        for telefone in self.conexão.execute(
            "select * from telefones where id_nome = ?", (novo.nome.id,)
        ):
            ntel = DBTelefone(
                telefone["número"], None, telefone["id"], telefone["id_nome"]
            )
            for tipo in self.tiposTelefone:
                if tipo.id == telefone["id_tipo"]:
                    ntel.tipo = tipo
                    break
            novo.telefones.adiciona(ntel)
        return novo

    def lista(self):
        consulta = self.conexão.execute("select * from nomes order by nome")
        for registro in consulta:
            yield self.carrega(registro)

    def novo(self, registro):
        try:
            cur = self.conexão.cursor()
            cur.execute("insert into nomes(nome) values (?)", (str(registro.nome),))
            registro.nome.id = cur.lastrowid
            for telefone in registro.telefones:
                cur.execute(
                    """insert into telefones(número,
                               id_tipo, id_nome) values (?,?,?)""",
                    (telefone.número, telefone.tipo.id, registro.nome.id),
                )
                telefone.id = cur.lastrowid
            self.conexão.commit()
        except Exception:
            self.conexão.rollback()
            raise
        finally:
            cur.close()

    def atualiza(self, registro):
        try:
            cur = self.conexão.cursor()
            cur.execute(
                "update nomes set nome=? where id = ?",
                (str(registro.nome), registro.nome.id),
            )
            for telefone in registro.telefones:
                if telefone.id is None:
                    cur.execute(
                        """insert into telefones(número,
                                   id_tipo, id_nome)
                                   values (?,?,?)""",
                        (telefone.número, telefone.tipo.id, registro.nome.id),
                    )
                    telefone.id = cur.lastrowid
                else:
                    cur.execute(
                        """update telefones set número=?,
                                          id_tipo=?, id_nome=?
                                          where id = ?""",
                        (
                            telefone.número,
                            telefone.tipo.id,
                            registro.nome.id,
                            telefone.id,
                        ),
                    )
            for apagado in registro.telefones.apagados:
                cur.execute("delete from telefones where id = ?", (apagado,))
            self.conexão.commit()
            registro.telefones.limpa()
        except Exception:
            self.conexão.rollback()
            raise
        finally:
            cur.close()

    def apaga(self, registro):
        try:
            cur = self.conexão.cursor()
            cur.execute("delete from telefones where id_nome = ?", (registro.nome.id,))
            cur.execute("delete from nomes where id = ?", (registro.nome.id,))
            self.conexão.commit()
        except Exception:
            self.conexão.rollback()
            raise
        finally:
            cur.close()
Clique aqui para baixar o arquivo