На главную Наши проекты:
Журнал   ·   Discuz!ML   ·   Wiki   ·   DRKB   ·   Помощь проекту
ПРАВИЛА FAQ Помощь Участники Календарь Избранное RSS
msm.ru
Модераторы: Qraizer, Hsilgos
  
> RAII и базы данных , (попытка номер два)
    Всем привет!

    Некоторое количество лет назад, в разделе "Холиваров" подобный вопрос мною уже поднимался. Но тогда меня прогнали "сцаными тряпками" и сказали, что я недостоин таких знаний. Шютка. Но теперь, уже по прошествии N-лет, хотелось бы этот вопрос обсудить ещё раз. Но уже без личностных оценок и холиваров. Уточню что значит "без холиваров". Это значит - или внятные аргументы, или "ти кто такой, давай до свидания!".

    Напомню контекст вопроса

    RAII предполагает захват ресурсов при инициализации + освобождение при "финализации". Очень удобный механизм. Но он очень удобен только в том случае, когда "захваченные ресурсы" не могут быть "потеряны" до финализации. Простой пример - коннект к БД, который может быть потерян вследствие внешних обстоятельств, допустим - сетевых ошибок.

    Меня этот вопрос как-то слегка задел. И вот настала пора использования ИИ в качестве консультанта. И я вернулся и задал вопрос вполне коммерческому "ChatGPT 04-mini-high":

    Цитата
    Напиши мне на С++ код поддержки пула соединений с БД, с поддержкой транзакций и вложенных транзакций с точками откатов (как у PostgreSQL). Нужно использовать RAII, каждое соединение должно создавать отдельную нить. Для многопоточности нужно использовать возможности С++17 и выше (если выше - должны быть комментарии). В методах выполнение SQL-запроса - запрос должен производиться либо самостоятельно, либо в рамках основной или вложенной транзакции. Нужно предусмотреть разрывы соединений с БД, соответственно, понимать что разрыв влечет за собой полный откат всей транзакции, в том числе и вложенных. И нужно организовать логирование и реконнект. Нужен работающий пример, а не только набор классов. Ответ нужно реализовать - текст и объяснения в виде raw markdown, а С++ код - виде кода.

    И я получил ответ. И объяснения, и сам код. Все это будет ниже под "катом". Кому не лениво - зацените и поучаствуйте. Особенно в плане, что ИИ сделал не так. Для того, кому тут "слишком много букф" - постарайтесь просто развидеть этот пост.

    Объяснялка

    Ниже приведён полный пример на C++ (с использованием возможностей C++17 и выше), реализующий пул соединений к PostgreSQL с поддержкой:

    Скрытый текст
    Описание решения

    Ниже приведена объяснялка к полному примеру на C++ (с использованием возможностей C++17 и выше), реализующий пул соединений к PostgreSQL с поддержкой:
    • транзакций и вложенных транзакций (savepoint-ов), реализованных через RAII;
    • отдельного потока (std::thread) для каждого соединения, обрабатывающего SQL-запросы из внутренней очереди;
    • автоматического восстановления соединения после разрыва (с полным откатом всех незавершённых транзакций);
    • логирования ключевых событий (ошибок соединения, начала/коммита/отката транзакций и т. д.);
    • пула соединений (ConnectionPool), позволяющего нескольким потокам приложения получать и возвращать соединения.

    Пример состоит из следующих компонентов:
    1. Класс PGConnection, обёртка над PGconn* (libpq), которая:
      • При создании устанавливает соединение с БД через PQconnectdb.
      • Запускает рабочий поток, который:
        1. Ожидает задач из внутренней очереди tasks.
        2. Для каждой задачи выполняет SQL-запрос через PQexec.
        3. Если соединение разорвано (PQstatus != CONNECTION_OK), автоматически вызывает rollbackAll(), пытается восстановить соединение в цикле и возобновляет обработку.
    2. Предоставляет метод execute(const std::string &sql), возвращающий PGresult* или бросающий исключение.
    3. Реализует методы управления основной транзакцией (beginTransaction(), commitTransaction(), rollbackTransaction()) и вложенными транзакциями через savepoint (beginSavepoint(), commitSavepoint(), rollbackToSavepoint()).
    4. При разрыве соединения методом disconnect() или обнаружении закрытого соединения внутри execute() откатывает все активные транзакции и восстанавливает соединение.
  • RAII-классы:
    • Transaction: в конструкторе вызывает beginTransaction(), в методе commit() выполняет commitTransaction(), а в деструкторе при отсутствии вызова commit() автоматически вызывает rollbackTransaction().
    • Savepoint: в конструкторе создаёт savepoint (beginSavepoint()), метод release() выполняет commitSavepoint(), а в деструкторе при отсутствии release() автоматически выполняет rollbackToSavepoint().
  • Классы ConnectionPool и ConnectionHandle:
    • ConnectionPool хранит вектор pool из N объектов PGConnection и очередь свободных индексов. Метод acquire() блокируется до тех пор, пока не появится свободное соединение, и возвращает std::shared_ptr<ConnectionHandle>.
    • ConnectionHandle оборачивает указатель на PGConnection и при уничтожении автоматически возвращает индекс соединения обратно в пул.
  • Пример использования в main(), демонстрирующий:
    • Создание пула из 4 соединений.
    • Запуск двух потоков, каждый из которых:
      1. Получает соединение из пула.
      2. Создаёт таблицу test_table, если ещё нет.
      3. Начинает основную транзакцию (Transaction tx(conn)).
      4. Выполняет INSERT.
      5. Создаёт вложенный savepoint (Savepoint sp(conn)), выполняет второй INSERT и допускает автоматический откат вложенного изменения.
      6. Выполняет ещё INSERT и коммитит основную транзакцию.
      7. Читает все строки из таблицы и выводит их в лог.
      8. Опционально (закомментировано) можно вызвать conn.disconnect(), чтобы продемонстрировать автоматический откат и реконнект.

  • Код написан так, чтобы демонстрировать:
    • Использование std::mutex и std::condition_variable для синхронизации очереди задач.
    • std::thread для каждого соединения.
    • std::promise/std::future для передачи результатов выполнения SQL-запросов из рабочего потока в вызывающий.
    • Возможности C++17: std::optional, structured bindings (при необходимости), std::shared_ptr/std::enable_shared_from_this.

    Цитата
    Сборка примера:
    ExpandedWrap disabled
      g++ -std=c++17 -pthread -lpq -o db_pool_example main.cpp

    Убедитесь, что установлена библиотека libpq (PostgreSQL C API). Для запуска нужен настроенный PostgreSQL-сервер с подходящей базой данных и пользователем.



    Код

    Скрытый текст
    ExpandedWrap disabled
      #include <iostream>
      #include <string>
      #include <vector>
      #include <queue>
      #include <thread>
      #include <mutex>
      #include <condition_variable>
      #include <functional>
      #include <future>
      #include <memory>
      #include <chrono>
      #include <atomic>
      #include <sstream>
      #include <libpq-fe.h>
       
      // Функция логирования с отметкой времени
      static void log(const std::string &level, const std::string &msg) {
          auto now = std::chrono::system_clock::now();
          auto now_c = std::chrono::system_clock::to_time_t(now);
          std::tm tm {};
      #if defined(_WIN32)
          localtime_s(&tm, &now_c);
      #else
          localtime_r(&now_c, &tm);
      #endif
          char buf[32];
          std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm);
          std::cerr << "[" << buf << "] [" << level << "] " << msg << std::endl;
      }
       
      struct Task {
          std::function<void()> func;
      };
       
      class PGConnection {
      public:
          PGConnection(const std::string &conninfo)
              : conninfo_(conninfo),
                conn_(nullptr),
                stopFlag_(false),
                transactionDepth(0),
                nextSavepointId(1)
          {
              connect();
              workerThread_ = std::thread(&PGConnection::workerLoop, this);
          }
       
          ~PGConnection() {
              {
                  std::lock_guard<std::mutex> lock(tasksMutex_);
                  stopFlag_ = true;
                  tasksCond_.notify_one();
              }
              if (workerThread_.joinable()) {
                  workerThread_.join();
              }
              disconnectPG();
          }
       
          PGresult* execute(const std::string &sql) {
              auto promPtr = std::make_shared<std::promise<PGresult*>>();
              std::future<PGresult*> fut = promPtr->get_future();
       
              {
                  std::lock_guard<std::mutex> lock(tasksMutex_);
                  tasks_.push(Task{[this, sql, promPtr]() {
                      if (PQstatus(conn_) != CONNECTION_OK) {
                          log("ERROR", "Соединение потеряно, пытаюсь восстановить...");
                          rollbackAll();
                          reconnectLoop();
                      }
                      PGresult *res = PQexec(conn_, sql.c_str());
                      if (PQresultStatus(res) != PGRES_COMMAND_OK &&
                          PQresultStatus(res) != PGRES_TUPLES_OK) {
                          std::string err = PQerrorMessage(conn_);
                          PQclear(res);
                          promPtr->set_exception(
                              std::make_exception_ptr(std::runtime_error("SQL error: " + err)));
                      } else {
                          promPtr->set_value(res);
                      }
                  }});
                  tasksCond_.notify_one();
              }
       
              return fut.get();
          }
       
          void beginTransaction() {
              executeSimple("BEGIN;");
              transactionDepth = 1;
              savepoints.clear();
              log("INFO", "BEGIN TRANSACTION");
          }
       
          void commitTransaction() {
              if (transactionDepth > 0) {
                  executeSimple("COMMIT;");
                  log("INFO", "COMMIT TRANSACTION");
                  transactionDepth = 0;
                  savepoints.clear();
              }
          }
       
          void rollbackTransaction() {
              if (transactionDepth > 0) {
                  executeSimple("ROLLBACK;");
                  log("INFO", "ROLLBACK TRANSACTION");
                  transactionDepth = 0;
                  savepoints.clear();
              }
          }
       
          void beginSavepoint() {
              if (transactionDepth == 0) {
                  throw std::runtime_error("Нет открытой транзакции для создания savepoint");
              }
              std::string spName = "sp_" + std::to_string(nextSavepointId++);
              executeSimple("SAVEPOINT " + spName + ";");
              savepoints.push_back(spName);
              ++transactionDepth;
              log("INFO", "SAVEPOINT " + spName);
          }
       
          void commitSavepoint() {
              if (transactionDepth <= 1 || savepoints.empty()) {
                  throw std::runtime_error("Нет вложенного savepoint для коммита");
              }
              std::string spName = savepoints.back();
              executeSimple("RELEASE SAVEPOINT " + spName + ";");
              savepoints.pop_back();
              --transactionDepth;
              log("INFO", "RELEASE SAVEPOINT " + spName);
          }
       
          void rollbackToSavepoint() {
              if (transactionDepth <= 1 || savepoints.empty()) {
                  throw std::runtime_error("Нет вложенного savepoint для отката");
              }
              std::string spName = savepoints.back();
              executeSimple("ROLLBACK TO SAVEPOINT " + spName + "; RELEASE SAVEPOINT " + spName + ";");
              savepoints.pop_back();
              --transactionDepth;
              log("INFO", "ROLLBACK TO SAVEPOINT " + spName);
          }
       
          void disconnect() {
              std::lock_guard<std::mutex> lock(tasksMutex_);
              tasks_.push(Task{[this]() {
                  log("INFO", "Принудительный disconnect()");
                  rollbackAll();
                  disconnectPG();
              }});
              tasksCond_.notify_one();
          }
       
      private:
          std::string conninfo_;
          PGconn *conn_;
          std::thread workerThread_;
          std::queue<Task> tasks_;
          std::mutex tasksMutex_;
          std::condition_variable tasksCond_;
          bool stopFlag_;
       
          int transactionDepth;
          int nextSavepointId;
          std::vector<std::string> savepoints;
       
          void connect() {
              conn_ = PQconnectdb(conninfo_.c_str());
              if (PQstatus(conn_) != CONNECTION_OK) {
                  std::string err = PQerrorMessage(conn_);
                  PQfinish(conn_);
                  conn_ = nullptr;
                  throw std::runtime_error("Не удалось подключиться к БД: " + err);
              }
              log("INFO", "Подключение к БД установлено");
          }
       
          void disconnectPG() {
              if (conn_) {
                  PQfinish(conn_);
                  conn_ = nullptr;
                  log("INFO", "Соединение к БД закрыто");
              }
          }
       
          void reconnectLoop() {
              disconnectPG();
              while (true) {
                  try {
                      log("INFO", "Попытка реконнекта к БД...");
                      connect();
                      log("INFO", "Реконнект к БД успешен");
                      break;
                  } catch (const std::exception &ex) {
                      log("ERROR", std::string("Ошибка при реконнекте: ") + ex.what());
                      std::this_thread::sleep_for(std::chrono::seconds(1));
                  }
              }
          }
       
          void rollbackAll() {
              if (transactionDepth > 0) {
                  executeSimple("ROLLBACK;");
                  log("INFO", "ROLLBACK ALL TRANSACTIONS (включая savepoints)");
                  transactionDepth = 0;
                  savepoints.clear();
              }
          }
       
          void executeSimple(const std::string &sql) {
              PGresult *res = execute(sql);
              PQclear(res);
          }
       
          void workerLoop() {
              while (true) {
                  Task task;
                  {
                      std::unique_lock<std::mutex> lock(tasksMutex_);
                      tasksCond_.wait(lock, [this]() { return stopFlag_ || !tasks_.empty(); });
                      if (stopFlag_ && tasks_.empty()) {
                          break;
                      }
                      task = std::move(tasks_.front());
                      tasks_.pop();
                  }
                  try {
                      task.func();
                  } catch (const std::exception &ex) {
                      log("ERROR", std::string("Исключение в workerLoop: ") + ex.what());
                  }
              }
          }
      };
       
      class Transaction {
      public:
          explicit Transaction(PGConnection &conn)
              : conn_(conn), active_(true)
          {
              conn_.beginTransaction();
          }
       
          Transaction(const Transaction &) = delete;
          Transaction &operator=(const Transaction &) = delete;
       
          Transaction(Transaction &&other) noexcept
              : conn_(other.conn_), active_(other.active_)
          {
              other.active_ = false;
          }
       
          ~Transaction() {
              if (active_) {
                  try {
                      conn_.rollbackTransaction();
                  } catch (...) {}
              }
          }
       
          void commit() {
              if (active_) {
                  conn_.commitTransaction();
                  active_ = false;
              }
          }
       
      private:
          PGConnection &conn_;
          bool active_;
      };
       
      class Savepoint {
      public:
          explicit Savepoint(PGConnection &conn)
              : conn_(conn), released_(false)
          {
              conn_.beginSavepoint();
          }
       
          Savepoint(const Savepoint &) = delete;
          Savepoint &operator=(const Savepoint &) = delete;
       
          Savepoint(Savepoint &&other) noexcept
              : conn_(other.conn_), released_(other.released_)
          {
              other.released_ = true;
          }
       
          ~Savepoint() {
              if (!released_) {
                  try {
                      conn_.rollbackToSavepoint();
                  } catch (...) {}
              }
          }
       
          void release() {
              if (!released_) {
                  conn_.commitSavepoint();
                  released_ = true;
              }
          }
       
      private:
          PGConnection &conn_;
          bool released_;
      };
       
      class ConnectionPool;
      class ConnectionHandle {
      public:
          ConnectionHandle(std::shared_ptr<ConnectionPool> pool, int idx,
                           std::shared_ptr<PGConnection> conn)
              : parentPool_(pool), index_(idx), connection_(conn)
          {}
       
          ~ConnectionHandle();
       
          PGConnection &get() {
              return *connection_;
          }
       
      private:
          std::shared_ptr<ConnectionPool> parentPool_;
          int index_;
          std::shared_ptr<PGConnection> connection_;
      };
       
      class ConnectionPool : public std::enable_shared_from_this<ConnectionPool> {
      public:
          ConnectionPool(const std::string &conninfo, size_t poolSize)
              : conninfo_(conninfo)
          {
              for (size_t i = 0; i < poolSize; ++i) {
                  auto conn = std::make_shared<PGConnection>(conninfo_);
                  pool_.push_back(conn);
                  freeIndices_.push(static_cast<int>(i));
              }
          }
       
          ~ConnectionPool() = default;
       
          std::shared_ptr<ConnectionHandle> acquire() {
              std::unique_lock<std::mutex> lock(poolMutex_);
              poolCond_.wait(lock, [this]() { return !freeIndices_.empty(); });
              int idx = freeIndices_.front();
              freeIndices_.pop();
              return std::make_shared<ConnectionHandle>(shared_from_this(), idx, pool_[idx]);
          }
       
      private:
          friend class ConnectionHandle;
       
          void release(int index) {
              {
                  std::lock_guard<std::mutex> lock(poolMutex_);
                  freeIndices_.push(index);
              }
              poolCond_.notify_one();
          }
       
          std::string conninfo_;
          std::vector<std::shared_ptr<PGConnection>> pool_;
          std::queue<int> freeIndices_;
          std::mutex poolMutex_;
          std::condition_variable poolCond_;
      };
       
      ConnectionHandle::~ConnectionHandle() {
          if (parentPool_) {
              parentPool_->release(index_);
          }
      }
       
      int main() {
          try {
              const std::string conninfo = "host=localhost port=5432 dbname=testdb user=testuser password=testpass";
              auto pool = std::make_shared<ConnectionPool>(conninfo, 4);
       
              std::vector<std::thread> threads;
              for (int t = 0; t < 2; ++t) {
                  threads.emplace_back([pool, t]() {
                      try {
                          auto handle = pool->acquire();
                          PGConnection &conn = handle->get();
       
                          {
                              std::string createTableSQL =
                                  "CREATE TABLE IF NOT EXISTS test_table ("
                                  "id SERIAL PRIMARY KEY, "
                                  "thread_id INT, "
                                  "msg TEXT"
                                  ");";
                              PGresult *res = conn.execute(createTableSQL);
                              PQclear(res);
                              log("INFO", "Thread " + std::to_string(t) + ": Таблица создана / или уже есть");
                          }
       
                          {
                              Transaction tx(conn);
       
                              {
                                  std::ostringstream oss;
                                  oss << "INSERT INTO test_table (thread_id, msg) VALUES ("
                                      << t << ", 'first insert');";
                                  PGresult *res = conn.execute(oss.str());
                                  PQclear(res);
                                  log("INFO", "Thread " + std::to_string(t) + ": INSERT 1 выполнен");
                              }
       
                              {
                                  Savepoint sp(conn);
                                  std::ostringstream oss2;
                                  oss2 << "INSERT INTO test_table (thread_id, msg) VALUES ("
                                       << t << ", 'nested insert');";
                                  PGresult *res2 = conn.execute(oss2.str());
                                  PQclear(res2);
                                  log("INFO", "Thread " + std::to_string(t) + ": INSERT 2 выполнен (nested)");
                              }
       
                              log("INFO", "Thread " + std::to_string(t) + ": После rollback вложенного, продолжаем");
       
                              {
                                  std::ostringstream oss3;
                                  oss3 << "INSERT INTO test_table (thread_id, msg) VALUES ("
                                       << t << ", 'third insert');";
                                  PGresult *res3 = conn.execute(oss3.str());
                                  PQclear(res3);
                                  log("INFO", "Thread " + std::to_string(t) + ": INSERT 3 выполнен");
                              }
       
                              tx.commit();
                              log("INFO", "Thread " + std::to_string(t) + ": Основная транзакция COMMIT");
                          }
       
                          {
                              PGresult *res = conn.execute("SELECT id, thread_id, msg FROM test_table;");
                              int nrows = PQntuples(res);
                              for (int i = 0; i < nrows; ++i) {
                                  char *id = PQgetvalue(res, i, 0);
                                  char *tid = PQgetvalue(res, i, 1);
                                  char *msg = PQgetvalue(res, i, 2);
                                  std::ostringstream oss;
                                  oss << "Thread " << t << ": row #" << id
                                      << ", thread_id=" << tid << ", msg=\"" << msg << "\"";
                                  log("INFO", oss.str());
                              }
                              PQclear(res);
                          }
       
                          // Пример искусственного разрыва соединения:
                          // if (t == 0) {
                          //     log("INFO", "Thread 0: принудительный disconnect() для демонстрации реконнекта");
                          //     conn.disconnect();
                          //     PGresult *resBad = conn.execute("SELECT 1;");
                          //     PQclear(resBad);
                          // }
       
                      } catch (const std::exception &ex) {
                          log("ERROR", std::string("Внутри потока ") + std::to_string(t) + ": " + ex.what());
                      }
                  });
              }
       
              for (auto &th : threads) {
                  if (th.joinable()) {
                      th.join();
                  }
              }
       
              log("INFO", "Все потоки завершены, программа завершается");
          } catch (const std::exception &ex) {
              log("ERROR", std::string("Ошибка в main: ") + ex.what());
              return EXIT_FAILURE;
          }
       
          return EXIT_SUCCESS;
      }


    Послесловие

    Вопрос, ИМХО, действительно интересный. Но не простой. Кода получилось много. Кому действительно интересно - прошу высказываться, найти стрёмные места! Для тех, кто решил потешить свое самолюбие, постебаться, не заглянув у тему и в код - я могу выслать компас наложным платежом с надёжным направлением следования. Таков путь!

    ЗЫ: Вопрос действительно не совсем простой, и предполагаемая реализация такая же. Просто подумайте, надо ли вам это!
      ИИ, чё. Сборная солянка из паттернов в одной куче, слабо сочетаемых друг с другом. Win11 ровно так и написана, видать, что ни апдейт, то трабли. Писать код ИИ более-менее научился, а проектировать учить некому.
        Qraizer, ну понятное дело - код от ИИ. Но хотелось бы более подробных коментов про "проектировать учить некому". Что конкретно плохо, и как это сделать хорошо?
          Цитата Qraizer @
          ИИ, чё. Сборная солянка из паттернов в одной куче, слабо сочетаемых друг с другом. Win11 ровно так и написана, видать, что ни апдейт, то трабли. Писать код ИИ более-менее научился, а проектировать учить некому.

          Говорят, довольно успешно выходит, если сделать разных агентов для проектирования и программирования. Я сам пока не пробовал.

          Пока на своей практике заметил, что лучше давать ИИ простые рутинные задачи, которые можно было доверить условному джуну. И небольшие по объему (т.е. самому разбивать на мелкие). Тогда реально производительность растет.

          Ну и просто copilot в vscode уже много времени экономит.
          Сообщение отредактировано: D_KEY -
            Цитата D_KEY @
            Ну и просто copilot в vscode уже много времени экономит.

            Надо будет глянуть. На мой взгляд - самый достойный кандидат на ИИ-проганье "ChatGPT o4-mini-high" (платная хрень), а после него ИИ от Илона Маска "Grok" (бесплатная хрень, но строго через евро-vpn).
            1 пользователей читают эту тему (1 гостей и 0 скрытых пользователей)
            0 пользователей:


            Рейтинг@Mail.ru
            [ Script execution time: 0,0530 ]   [ 15 queries used ]   [ Generated: 16.06.25, 06:22 GMT ]