Упрощаем работу с MySQL в Python: создаем класс для управления базой данных

GetCoder.ru
Изображение статьи

Если вы используете MySQL и Python, то наверняка сталкивались с необходимостью писать однотипные запросы для подключения к базе, создания таблиц, вставки, обновления и удаления данных. В этой статье я покажу, как это сделать на примере простого класса dbMySQL.

Работа с базами данных — важная часть многих приложений. Если вы используете MySQL и Python, то наверняка сталкивались с необходимостью писать однотипные запросы для подключения к базе, создания таблиц, вставки, обновления и удаления данных. В этой статье я покажу, как это сделать на примере простого класса dbMySQL.

Что может наш класс?

  • Подключаться к базе данных MySQL;
  • Создавать таблицы;
  • Вставлять данные;
  • Выбирать данные;
  • Обновлять данные;
  • Удалять данные;
  • Закрывать соединение с базой;

Подключение к базе данных

При создании объекта класса dbMySQL мы передаем параметры для подключения: хост, имя пользователя, пароль и название базы данных. Если подключение не удается, программа выведет соответствующую ошибку.

  • copy
db = dbMySQL("localhost", "user_name", "password", "dbname")

Создание таблиц

Метод create_table позволяет создавать таблицы. Нужно указать имя таблицы, её структуру и тип движка (например, MyISAM).

  • copy
db.create_table("Staff", "id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255), position VARCHAR(30)", "MyISAM")

Вставка данных

Метод insert добавляет новую запись в таблицу. Можно указать, в какие столбцы вставлять данные.

  • copy
variable = {"name": "Ivan", "position": "Cook"} db.insert(variable, "Staff", "name, position", ":name, :position")

Выбор данных

Метод select позволяет выбирать данные из таблицы. Можно указать условия (WHERE) и сортировку (ORDER BY).

  • copy
variable = {"name": "Ivan"} result = db.select(variable, "*", "Staff", "name = :name") print(result)

Обновление данных

Метод update изменяет существующие записи. Нужно указать, какие поля обновлять и по какому условию.

  • copy
variable = {"name": "Ivan", "position": "Writer"} db.update(variable, "Staff", "position = :position", "name = :name")

Удаление данных

Метод delete удаляет записи из таблицы по указанному условию.

  • copy
variable = {"name": "Ivan"} db.delete(variable, "Staff", "name = :name")

Закрытие соединения

Метод disconnect закрывает соединение с базой данных. Это важно для освобождения ресурсов.

  • copy
db.disconnect()

Код

Вот как можно использовать наш класс для работы с базой данных:

  • copy
# -*- coding: utf-8 -*- import re import mysql.connector from mysql.connector import errorcode class dbMySQL(): def __init__(self, server, user, passwd, dbname): try: self.__cnx = mysql.connector.connect(user=user, password=passwd, host=server, database=dbname) self.__cursor = self.__cnx.cursor(dictionary=True) except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) def __strReplace(self, keyMass=[], dataMass=[], result = ''): if isinstance(keyMass, str): keyMass = [keyMass] if isinstance(dataMass, str): dataMass = [dataMass] vKey = len(dataMass) - 1 for i in range(0, len(keyMass)): if i > vKey: if vKey == 0: dataMass.append(dataMass[vKey]) else: dataMass.append('') result = result.replace(keyMass[i], str(dataMass[i])) return result def create_table(self, nameTable, structure, encoding): try: sql = 'CREATE TABLE '+nameTable+' ('+structure+') ENGINE='+encoding self.__cursor.execute(sql) self.__cnx.commit() result = True except mysql.connector.Error as e: result = 'Error: create_table ('+str(e)+')' return result def insert(self, variable, nameTable, rows = None, values = None): try: sql = 'INSERT INTO '+nameTable sql += ' ('+rows+')' if rows != None else '' sql += ' VALUES ('+values+')' variable = {} if variable == None else variable arrElem = re.findall("(\:(\w+)*)", sql) if len(arrElem) > 0: arrayElem = [[],[]] for elem in arrElem: arrayElem[0].append(elem[0]) arrayElem[1].append('%('+str(elem[1])+')s') sql = self.__strReplace(arrayElem[0], arrayElem[1], sql) self.__cursor.execute(sql, variable) result = self.__cursor.lastrowid self.__cnx.commit() except Exception as e: result = 'Error: insert ('+str(e)+')' return result def select(self, variable, what, nameTable, where = None, order = None): try: sql = 'SELECT '+what+' FROM '+nameTable sql += ' WHERE '+where if where != None else '' sql += ' ORDER BY '+order if order != None else '' variable = {} if variable == None else variable arrElem = re.findall("(\:(\w+)*)", sql) if len(arrElem) > 0: arrayElem = [[],[]] for elem in arrElem: arrayElem[0].append(elem[0]) arrayElem[1].append('%('+str(elem[1])+')s') sql = self.__strReplace(arrayElem[0], arrayElem[1], sql) self.__cursor.execute(sql, variable) result = self.__cursor.fetchall() except Exception as e: result = 'Error: select ('+str(e)+')' return result def update(self, variable, nameTable, params, where = None): try: sql = 'UPDATE '+nameTable+' SET '+params sql += ' WHERE '+where if where != None else '' variable = {} if variable == None else variable arrElem = re.findall("(\:(\w+)*)", sql) if len(arrElem) > 0: arrayElem = [[],[]] for elem in arrElem: arrayElem[0].append(elem[0]) arrayElem[1].append('%('+str(elem[1])+')s') sql = self.__strReplace(arrayElem[0], arrayElem[1], sql) self.__cursor.execute(sql, variable) self.__cnx.commit() result = True except Exception as e: result = 'Error: update ('+str(e)+')' return result def delete(self, variable, nameTable, where): try: sql = 'DELETE FROM '+nameTable+' WHERE '+where variable = {} if variable == None else variable arrElem = re.findall("(\:(\w+)*)", sql) if len(arrElem) > 0: arrayElem = [[],[]] for elem in arrElem: arrayElem[0].append(elem[0]) arrayElem[1].append('%('+str(elem[1])+')s') sql = self.__strReplace(arrayElem[0], arrayElem[1], sql) self.__cursor.execute(sql, variable) self.__cnx.commit() result = True except Exception as e: result = 'Error: delete ('+str(e)+')' return result def disconnect(self): try: self.__cnx.close() result = True except Exception as e: result = 'Error: disconnect ('+str(e)+')' return result def __del__(self): try: self.__cnx.close() result = True except Exception as e: result = 'Error: disconnect ('+str(e)+')' # Пример запросов def main(): # Параметры подключения host = "localhost" user = "user_name" password = "password" dbname = "dbname" # Подключение db = dbMySQL(host, user, password, dbname) # CREATE TABLE answer = db.create_table("Staff", "id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255), position VARCHAR(30)", "MyISAM") print(answer) # INSERT variable = {"name": "Ivan", "position": "Cook"} answer = db.insert(variable, "Staff", "name, position", ":name, :position") print(answer) # SELECT variable = {"name": "Ivan"} answer = db.select(variable, "*", "Staff", "name = :name") print(answer) # UPDATE variable = {"name": "Ivan", "position": "Writer"} answer = db.update(variable, "Staff", "position = :position", "name = :name") print(answer) # DELETE variable = {"name": "Ivan"} answer = db.delete(variable, "Staff", "name = :name") print(answer) # Закрыть соединение answer = db.disconnect() if __name__ == "__main__": main()
Исходный код

Заключение

Создание такого класса — отличный способ упростить работу с базой данных в Python. Он делает код чище, безопаснее и удобнее для повторного использования. Если вы часто работаете с MySQL, попробуйте адаптировать этот класс под свои задачи или расширить его функциональность. Удачи в программировании!

  • 03.03.2025
  • 40
  • 0

Упрощаем работу с MySQL в Python: создаем класс для управления базой данных

Работа с базами данных — важная часть многих приложений. Если вы используете MySQL и Python, то наверняка сталкивались с необходимостью писать однотипные запросы для подключения к базе, создания таблиц, вставки, обновления и удаления данных. В этой статье я покажу, как это сделать на примере простого класса dbMySQL.

Что может наш класс?

  • Подключаться к базе данных MySQL;
  • Создавать таблицы;
  • Вставлять данные;
  • Выбирать данные;
  • Обновлять данные;
  • Удалять данные;
  • Закрывать соединение с базой;

Подключение к базе данных

При создании объекта класса dbMySQL мы передаем параметры для подключения: хост, имя пользователя, пароль и название базы данных. Если подключение не удается, программа выведет соответствующую ошибку.

  • copy
db = dbMySQL("localhost", "user_name", "password", "dbname")

Создание таблиц

Метод create_table позволяет создавать таблицы. Нужно указать имя таблицы, её структуру и тип движка (например, MyISAM).

  • copy
db.create_table("Staff", "id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255), position VARCHAR(30)", "MyISAM")

Вставка данных

Метод insert добавляет новую запись в таблицу. Можно указать, в какие столбцы вставлять данные.

  • copy
variable = {"name": "Ivan", "position": "Cook"} db.insert(variable, "Staff", "name, position", ":name, :position")

Выбор данных

Метод select позволяет выбирать данные из таблицы. Можно указать условия (WHERE) и сортировку (ORDER BY).

  • copy
variable = {"name": "Ivan"} result = db.select(variable, "*", "Staff", "name = :name") print(result)

Обновление данных

Метод update изменяет существующие записи. Нужно указать, какие поля обновлять и по какому условию.

  • copy
variable = {"name": "Ivan", "position": "Writer"} db.update(variable, "Staff", "position = :position", "name = :name")

Удаление данных

Метод delete удаляет записи из таблицы по указанному условию.

  • copy
variable = {"name": "Ivan"} db.delete(variable, "Staff", "name = :name")

Закрытие соединения

Метод disconnect закрывает соединение с базой данных. Это важно для освобождения ресурсов.

  • copy
db.disconnect()

Код

Вот как можно использовать наш класс для работы с базой данных:

  • copy
# -*- coding: utf-8 -*- import re import mysql.connector from mysql.connector import errorcode class dbMySQL(): def __init__(self, server, user, passwd, dbname): try: self.__cnx = mysql.connector.connect(user=user, password=passwd, host=server, database=dbname) self.__cursor = self.__cnx.cursor(dictionary=True) except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) def __strReplace(self, keyMass=[], dataMass=[], result = ''): if isinstance(keyMass, str): keyMass = [keyMass] if isinstance(dataMass, str): dataMass = [dataMass] vKey = len(dataMass) - 1 for i in range(0, len(keyMass)): if i > vKey: if vKey == 0: dataMass.append(dataMass[vKey]) else: dataMass.append('') result = result.replace(keyMass[i], str(dataMass[i])) return result def create_table(self, nameTable, structure, encoding): try: sql = 'CREATE TABLE '+nameTable+' ('+structure+') ENGINE='+encoding self.__cursor.execute(sql) self.__cnx.commit() result = True except mysql.connector.Error as e: result = 'Error: create_table ('+str(e)+')' return result def insert(self, variable, nameTable, rows = None, values = None): try: sql = 'INSERT INTO '+nameTable sql += ' ('+rows+')' if rows != None else '' sql += ' VALUES ('+values+')' variable = {} if variable == None else variable arrElem = re.findall("(\:(\w+)*)", sql) if len(arrElem) > 0: arrayElem = [[],[]] for elem in arrElem: arrayElem[0].append(elem[0]) arrayElem[1].append('%('+str(elem[1])+')s') sql = self.__strReplace(arrayElem[0], arrayElem[1], sql) self.__cursor.execute(sql, variable) result = self.__cursor.lastrowid self.__cnx.commit() except Exception as e: result = 'Error: insert ('+str(e)+')' return result def select(self, variable, what, nameTable, where = None, order = None): try: sql = 'SELECT '+what+' FROM '+nameTable sql += ' WHERE '+where if where != None else '' sql += ' ORDER BY '+order if order != None else '' variable = {} if variable == None else variable arrElem = re.findall("(\:(\w+)*)", sql) if len(arrElem) > 0: arrayElem = [[],[]] for elem in arrElem: arrayElem[0].append(elem[0]) arrayElem[1].append('%('+str(elem[1])+')s') sql = self.__strReplace(arrayElem[0], arrayElem[1], sql) self.__cursor.execute(sql, variable) result = self.__cursor.fetchall() except Exception as e: result = 'Error: select ('+str(e)+')' return result def update(self, variable, nameTable, params, where = None): try: sql = 'UPDATE '+nameTable+' SET '+params sql += ' WHERE '+where if where != None else '' variable = {} if variable == None else variable arrElem = re.findall("(\:(\w+)*)", sql) if len(arrElem) > 0: arrayElem = [[],[]] for elem in arrElem: arrayElem[0].append(elem[0]) arrayElem[1].append('%('+str(elem[1])+')s') sql = self.__strReplace(arrayElem[0], arrayElem[1], sql) self.__cursor.execute(sql, variable) self.__cnx.commit() result = True except Exception as e: result = 'Error: update ('+str(e)+')' return result def delete(self, variable, nameTable, where): try: sql = 'DELETE FROM '+nameTable+' WHERE '+where variable = {} if variable == None else variable arrElem = re.findall("(\:(\w+)*)", sql) if len(arrElem) > 0: arrayElem = [[],[]] for elem in arrElem: arrayElem[0].append(elem[0]) arrayElem[1].append('%('+str(elem[1])+')s') sql = self.__strReplace(arrayElem[0], arrayElem[1], sql) self.__cursor.execute(sql, variable) self.__cnx.commit() result = True except Exception as e: result = 'Error: delete ('+str(e)+')' return result def disconnect(self): try: self.__cnx.close() result = True except Exception as e: result = 'Error: disconnect ('+str(e)+')' return result def __del__(self): try: self.__cnx.close() result = True except Exception as e: result = 'Error: disconnect ('+str(e)+')' # Пример запросов def main(): # Параметры подключения host = "localhost" user = "user_name" password = "password" dbname = "dbname" # Подключение db = dbMySQL(host, user, password, dbname) # CREATE TABLE answer = db.create_table("Staff", "id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255), position VARCHAR(30)", "MyISAM") print(answer) # INSERT variable = {"name": "Ivan", "position": "Cook"} answer = db.insert(variable, "Staff", "name, position", ":name, :position") print(answer) # SELECT variable = {"name": "Ivan"} answer = db.select(variable, "*", "Staff", "name = :name") print(answer) # UPDATE variable = {"name": "Ivan", "position": "Writer"} answer = db.update(variable, "Staff", "position = :position", "name = :name") print(answer) # DELETE variable = {"name": "Ivan"} answer = db.delete(variable, "Staff", "name = :name") print(answer) # Закрыть соединение answer = db.disconnect() if __name__ == "__main__": main()
Исходный код

Заключение

Создание такого класса — отличный способ упростить работу с базой данных в Python. Он делает код чище, безопаснее и удобнее для повторного использования. Если вы часто работаете с MySQL, попробуйте адаптировать этот класс под свои задачи или расширить его функциональность. Удачи в программировании!