Многопоточный сервер Qt. Пул потоков. Паттерн Decorator

В предыдущей статье [1] была рассмотрена работа с сокетами в библиотеке Qt. Наш сервер сетевого чата работал в одном потоке. Задача текущей статьи — описание многопоточного сервера.

Однако, если сервер просто принимает сообщение и передает его всем подключенным клиентам — распараллеливать нечего и потоки не особо нужны. В связи с этим, мы немного усложним задачу — наш сервер будет вычислять передаваемые арифметические выражения (для вычисления выражений используем Qt Script [2]).

Серверу (а точнее, сокету) надо добавить новый функционал, и в этом нам поможет шаблон проектирования «Декоратор». Когда сервер начнет выполнять все необходимые нам функции, приступим к распараллеливанию. В статье рассмотрены 2 варианта:

  • создание отдельного потока на каждое подключение;
  • использование стандартного пула потоков библиотеки Qt.

В предыдущих статьях уже был описан шаблон параллельного программирования «поставщик-потребитель» [3]. Пул потоков является родственным паттерном, но мы не будем писать свою реализацию, а используем готовую.

Шаблон проектирования Decorator

Декоратор — структурный шаблон проектирования, который является гибкой альтернативой наследованию. Чаще всего декоратор применяется в случаях, когда объекту требуется добавить функциональность во время выполнения программы, но может применяться в других случаях.

рис. 1 Диаграмма классов шаблона Декоратор

рис. 1 Диаграмма классов шаблона Декоратор

Некоторые компоненты системы реализуют определенный интерфейс , но однажды возникает необходимость добавить компонентам новую функциональность. Например, компонентами могут являться графические объекты, которые могут перемещаться, возвращать свои координаты и размер (набор таких общих операций образует интерфейс). В качестве новой функциональности может быть изменение цвета, масштабирование или привязка к сетке.

Решить проблему можно наследованием, породив подклассы с новой функциональностью от каждого компонента, однако, это не будет гибким решением. Шаблон проектирования декоратор позволяет:

  • добавлять новую функциональность уже существующим объектам прямо во время выполнения. Гамма описывает пример с полосами прокрутки, которые могут появляться лишь при определенных условиях[4];
  • смешивать функциональность. Например, графический элемент может одновременно изменять цвет, иметь привязку к сетке и предоставлять полосу прокрутки — если бы мы решали эту (повседневную и простую, в общем-то) задачу лишь наследованием — иерархия классов получилась бы громадной («комбинаторный взрыв»).

Как показано на рисунке 1, и компонент, и декоратор реализуют один и тот же интерфейс, т.е. декоратор является таким же компонентом, как и все остальные. Тем не менее, Гамма указывает, что в некоторых случаях, декоратор может сильно отличаться от других компонентов и это надо учитываь [4]. Класс Decorator хранит ссылку на декорируемый компонент, а его наследники каким-либо образом переадресуют этому компоненту запросы, но кроме этого, выполняют еще какие-либо действия (рисуют рамку или выводят ползунки прокрутки, например).

Шаблон проектирования «Декоратор». Пример

В нашей задаче многопоточного сервера, декорироваться будет сокет. За основу взят код однопоточного сервера чата [1].

рис. 2 до имплементации шаблона Decorator

рис. 2 до имплементации шаблона Decorator

На рисунке 2 видно, что сервер работает с указателем на ISocketAdapter, по которому находится экземпляр ServerSocketAdapter, который умеет принимать и отправлять текстовые сообщения. Нам требуется научить сервер новым трюкам, при этом, мы не хотим сильно изменять уже написанный и отлаженный код. Очевидно, надо породить нового наследника ISocketAdapter, который будет делать тоже, что и ServerSocketAdapter и еще чуть-чуть.

рис. 3 decorator pattern implementation

рис. 3 decorator pattern implementation

На рисунке 3, помимо декоратора, вычисляющего поступившие в сокет арифметические выражения изображен декоратор, обеспечивающий прием и передачу зашифрованных сообщений. Не трудно представить ситуацию, когда некоторый пользователь решит в рантайме перейти на защищенную передачу сообщений. Декоратор делает реализацию такого перехода очень простой.

class SocketDecorator : public ISocketAdapter {
  Q_OBJECT
public:
  explicit SocketDecorator(ISocketAdapter *sock, QObject *parent = 0);
  virtual ~SocketDecorator();
  virtual void sendString(const QString& str) = 0;
    //!< отправка сообщения через сокет
protected slots:
  void on_disconnected();
    //!< соединение разорвано (пришел сигнал от m_pSock)
  virtual void on_send_request(QString) = 0;
    //!< слот для отправки данных через сокет
  virtual void on_message(QString) = 0;
    //!< слот обработки сообщений, поступивших в сокет
protected:
  ISocketAdapter *m_pSock;
};

// ...

SocketDecorator::SocketDecorator(ISocketAdapter *sock, QObject *parent) :
  ISocketAdapter(parent), m_pSock(sock) {
  connect(m_pSock, SIGNAL(disconnected()), SLOT(on_disconnected()));
  connect(m_pSock, SIGNAL(message(QString)), SLOT(on_message(QString)));
}

SocketDecorator::~SocketDecorator() {
}

void SocketDecorator::on_disconnected() {
  emit disconnected();
}

Видно, что базовый класс декоратора остается абстрактным, наследует интерфейс сокета и, одновременно, агрегирует сокет и соединяет свои слоты с его сигналами.

 class CalcSockDecorator : public SocketDecorator {
  Q_OBJECT
public:
  explicit CalcSockDecorator(ISocketAdapter *sock, QObject *parent = 0);
  virtual void sendString(const QString& str);
protected slots:
  virtual void on_send_request(QString);
  virtual void on_message(QString);
};

// ...

CalcSockDecorator::CalcSockDecorator(ISocketAdapter *sock, QObject *parent) :
  SocketDecorator(sock, parent) {
}

void CalcSockDecorator::on_message(QString msg) {
  QScriptEngine engine;

  QString result = msg + " = " + engine.evaluate(msg).toString();
  if (engine.hasUncaughtException())
    m_pSock->sendString("bad request");
  else
    emit message(result);
}

void CalcSockDecorator::sendString(const QString &str) {
  m_pSock->sendString(str);
}

void CalcSockDecorator::on_send_request(QString msg) {
  m_pSock->sendString(msg);
}

CalcSockDecorator по-особенному обрабатывает поступающие сообщения (слот on_message), все остальные к запросы к себе он транслирует агрегируемому объекту. При поступлении сообщения, он пытается выполнить его как выражение ECMAScript, в случае возникновения исключительной ситуации возвращает строку с ошибкой, иначе — результат вычисления. Именно слот on_message выполняет самую сложную работу сервере, которую мы будем пытаться выполнять в отдельном потоке в остальной части статьи.

void Server::on_newConnection() {
  QTextStream(stdout) << "new connection" << endl;

  CalcSockDecorator *decoratedSock = new CalcSockDecorator
      (new ServerSocketAdapter(m_ptcpServer->nextPendingConnection(), this), this);

  decoratedSock->sendString("connect");

  connect(decoratedSock, SIGNAL(disconnected()), SLOT(on_disconnected()));
  connect(decoratedSock, SIGNAL(message(QString)), SLOT(on_message(QString)));
  connect(this, SIGNAL(message(QString)), decoratedSock, SLOT(on_send_request(QString)));
}

Коренным образом изменилось только создания сокета — создается экземпляр CalcSockDecorator вместо ServerSocketAdapter. Кроме того, на листинг 3 виден подводный камень — с использованием механизмов библиотеки Qt нам придется следить за тем, чтобы клиентский код был связан с сигналами и слотами декораторов самого верхнего уровня. Иными словами, если пользователь решит передавать сообщения зашифрованными — сервер должен будет:

  1. создать соответствующий декоратор сокета, передав старый сокет в качестве аргумента;
  2. отсоединить весь клиентский код от старого сокета;
  3. соединить клиентский код с декорированным сокетом.

Все это может быть очень трудно реализовываться и, кажется, возможны случаи, в которых из за сигналов и слотов использовать паттерн декоратор будет почти не возможно. Это надо учитывать при проектировании.

Исходный код примера сервера с декорированным сокетом можно скачать.

Многопоточный сервер. Поток на каждого клиента

Очевидно, если мы хотим работать с каждым клиентом в отдельном потоке, то поток должен создаваться в момент подключения. Изменения коснутся лишь метода сервера on_newConnection.

void Server::on_newConnection() {
  QTextStream(stdout) << "new connection" << endl;

  CalcSockDecorator *decoratedSock = new CalcSockDecorator
      (new ServerSocketAdapter(m_ptcpServer->nextPendingConnection(), this), this);

  QThread *sockThread = new QThread(this);
  decoratedSock->moveToThread(sockThread);
  sockThread->start();

  decoratedSock->sendString("connect");

  connect(decoratedSock, SIGNAL(disconnected()), SLOT(on_disconnected()));
  connect(decoratedSock, SIGNAL(message(QString)), SLOT(on_message(QString)));
  connect(this, SIGNAL(message(QString)), decoratedSock, SLOT(on_send_request(QString)));
}

В строках 7-9 описано создание потока и перемещение в него объекта-обработчика сокета. Ничего нового в этом нет и, очевидно, это не лучшее решение. Поток Qt — это поток операционной системы, который, в свою очередь, является достаточно крупным объектом. Если к нашему чату подключится тысяча пользователей, серверу будет очень не легко, даже если пользователи почти не будут общаться. В связи с этим, количество потоков на сервере стараются ограничить.

Можно, например, завести на сервере определенное количество потоков (пул потоков) и направлять новых клиентов к наименее загруженным потокам. Примерно такой вариант мы уже использовали при написании парсера проектов с биржи фриланса [3], но в библиотеке Qt есть готовый пул потоков, который используется чуть-чуть иначе (ниже описано подробнее).

Важно отметить, что в некоторых языках (Erlang, например) поддерживаются легковесные потоки. Для этих языков решение с отдельным потоком на каждого клиента было бы не только самым простым, но правильным и естественным.

Исходный код многопоточного сервера тоже можно скачать.

Пул потоков Qt

Пул потоков Qt (QThreadPool) управляет набором потоков (QThread). Количество потоков задается методом maxThreadCount(), по умолчанию их ровно столько, сколько ядер имеется у вашего процессора. Когда мы писали свой пул, потоки там существовали вечно, но в QThreadPool поток удаляется если в течении определенного времени в него не поступают задачи, задать такой таймаут можно методом setExpiryTimeout(). При поступлении задачи поток вновь будет создан.

Мы могли бы создать на сервере экземпляр QThreadPool и добавлять задачи в него, однако, каждая программа, написанная с использованием Qt уже имеет запущенный пул потоков (глобальный пул программы). Обратиться к глобальному пулу можно с помощью статической функции QThreadPool::globalInstance(). С этим пулом мы и будем работать.

Задача, добавляемая в пул должна наследовать класс QRunnable — это абстрактный класс с чисто виртуальным методом run(). Метод run() должен содержать код, который мы хотим вынести в отдельный поток. После завершения работы, объект может быть автоматически удален, если установлен соответствующий флажок (по умолчанию установлен). Установить или снять такой флажок можно методом QRunnable::setAutoDelete().

Чтобы передать результаты выполнения задачи во внешний мир можно использовать механизм сигналов и слотов, но QRunnable не является наследником QObject, поэтому наша задача для пула будет использовать множественное наследование. Передавать задаче данные, которые должны быть обработаны, удобно во время конструирования объекта.

В QThreadPool мы будем добавлять задачи, связанные с вычислением выражения, полученного через сокет.

class Calc : public QObject, public QRunnable {
  Q_OBJECT
public:
  explicit Calc(QString expr, QObject *parent = 0);
  void run();
signals:
 void result(QString);
protected:
 QString m_expr;
};

// ...

Calc::Calc(QString expr, QObject *parent)
  : QObject(parent), m_expr(expr) {
}

void Calc::run() {
  QScriptEngine engine;
  QString res = m_expr + " = " + engine.evaluate(m_expr).toString();
  if (engine.hasUncaughtException())
    emit result("");
  else
    emit result(res);
}

Теперь при получении сообщения, объект CalcSockDecorator создает объект Calc и добавляет его в пул вызовом QThreadPool::start(), после чего, ожидает сигнала с результатами от этого объекта.

void CalcSockDecorator::on_message(QString msg) {
  Calc *calc = new Calc(msg, this);
  connect(calc, SIGNAL(result(QString)), SLOT(on_result(QString)));

  QThreadPool::globalInstance()->start(calc);
}

void CalcSockDecorator::on_result(QString res) {
  if (res == "")
    m_pSock->sendString("bad request");
  else
    emit message(res);
}

Вот таким не хитрым образом, почти без изменения уже написанного кода можно распараллелить программу с использованием пула потоков библиотеки Qt.

Скачать исходный код такого замечательного сервера тоже можно.

В одной из следующих статей я наверное опишу работу с QtConcurrent — это довольно специфическая штука, но тоже используется для распараллеливания (пока что у нас не было примеров, где ее можно красиво использовать).

Полезная литература по теме

  1. Работа с сетью в Qt. Сокеты. Паттерн Adapter \\ https://pro-prof.com/archives/1372
  2. Cистема плагинов Qt, построение графиков и Qt Script \\ https://pro-prof.com/archives/1316
  3. Получение данных с сайта. Шаблон Producer/Consumer \\ https://pro-prof.com/archives/1034
  4. Э. Гамма Приемы объектно-ориентированного проектирования. Паттерны проектирования / Э. Гамма, Р. Хелм, Р. Джонсон, Д. Влиссидес. – СПб.: Питер, 2009. – 366 с.
  5. Литература по проектированию.

3 thoughts on “Многопоточный сервер Qt. Пул потоков. Паттерн Decorator

  1. Роман

    Ваш пример выдает ошибку:

    ASEERT failure in QCoreApplication::sendEvent: «Cannot send events to objects owned by a different thread. Current thread xxxxxx. Receiver » (of type ‘CalcSocketDecorator’) was created in thread yyyyy, file kernel\qcoreapplication.cpp, line 521

    Qt 5.3.1 mvsc2012 compiler, Qt собрано из исходников.

    Соответственно, если этот assert проигнорировать, то bad request

    Reply
    1. admin Post author

      При каких обстоятельствах происходит ошибка? (не известно в какой строке?).
      Вместо Current thread так и выводится xxxx или Вы сами так заменили? — Вообще в коде что-нибудь меняли?

      Что говорит трассировка стека? (окошко Сall Stack в Qt Creator).

      Попробуй изменить код так:

      CalcSockDecorator *decoratedSock = new CalcSockDecorator
            (new ServerSocketAdapter(m_ptcpServer->nextPendingConnection(), this), 0);
      

      Я не знаю в чем именно ошибка, поэтому гадаю. Мои объекты обмениваются сигналами, которые по умолчанию соединяются через AutoConnection и корректно обрабатываются многопоточно. Откуда там взялись события я не знаю. Мне кажется такая ошибка могла бы возникнуть если в отдельный поток оказался бы перенесен виджет, так как любое GUI в Qt должно работать в главном потоке. В моем коде такой ошибки точно нет — у меня вообще нет GUI вроде бы.

      Вроде бы при разрушении родительского объекта все дочерние должны получить событие разрушения, поэтому я предположил что замена this на 0 может помочь — событие не придет (из другого потока).

      Проверить не могу, у меня на линуксе все работает гладко. Я не смог повторить ошибку, очень прошу помочь протестить и исправить статью :).

      Reply
      1. Роман

        Конечно это я заменил потоки на xxx и yyy, чтобы показать, что они разные. Менял имена классов (как мне нравится). Полностью набрал код в msvc 2012. Да, виджета нет, сервер консольный.

        Попробую ваши советы, отпишусь. Стек вызовов предоставлю. Мне интересно стало запустить ваш пример. Заодно могу позже проверить и под линуксом. Надо только актуализировать на нем версию qt до 5.3.2

        Reply

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

*

code