Pull to refresh
92.71
ISPsystem
IT infrastructure management platforms

Асинхронный обмен данными с удалённым приложением через SSH

Reading time8 min
Views8.7K
Доброго времени суток, друзья и коллеги. Меня всё ещё зовут Дмитрий Смирнов, и я всё ещё, к моему вящему удовольствию, являюсь разработчиком ISPsystem. Некоторое время назад я начал работу над совершенно новым проектом, который меня очень вдохновил, поскольку новое — это в нашем случае отсутствие легаси кода и поддержки старых компиляторов. Здравствуй, Boost, C++17 и все прочие радости современной разработки.

Так случилось, что все мои прошлые проекты были многопоточными, соответственно, у меня было крайне мало опыта с асинхронными решениями. Именно это стало самым приятным для меня в этой разработке, помимо современных мощных инструментов.

Одной из последних сопутствующих задач стала необходимость написать обёртку над библиотекой libssh2 в реалиях асинхронного приложения, использующего Boost.Asio, и способного породить не более двух потоков. Об этом и расскажу.



Примечание: автор предполагает, что читатель знаком с основами асинхронной разработки и boost::asio.

Задача


В общих чертах задача стояла следующим образом: подключиться к удалённому серверу, используя rsa-ключ или логин и пароль; загрузить на удалённую машину некий скрипт и запустить его; вычитывать его ответы и присылать ему команды через то же самое соединение. При этом, разумеется, не блокируя потока (который является половиной всего возможного пула).

Дисклеймер: я знаю, что в Poco реализована работа с SSH, но я не нашёл способа поженить его с Asio, да и написать что-то своё было интереснее :-).

Инициализация


Для инициализации и сворачивания библиотеки я решил воспользоваться обычным синглтоном:

Init()
class LibSSH2 {
public:
  static void Init() {
     static LibSSH2 instance;
  }
private:
  explicit LibSSH2() {
     if (libssh2_init(0) != 0) {
        throw std::runtime_error("libssh2 initialization failed");
     }
  }
  ~LibSSH2() {
     std::cout << "shutdown libssh2" << std::endl;
     libssh2_exit();
  }
};



Есть в этом решении, разумеется, и подводные камни, согласно моей любимой настольной книге «Тысяча и один способ выстрелить себе в ногу в C++». Если кто-то порождает поток, который забудут поджойнить, и главный завершится раньше, вполне могут возникнуть интересные спецэффекты. Но в данном случае я не буду учитывать подобную возможность.

Основные сущности


После разбора примера становится понятно, что для нашей небольшой библиотеки понадобятся три несложные сущности: сокет, сессия и канал. Поскольку неплохо иметь и синхронные инструменты, мы пока оставим Asio в стороне.

Начнём с простого сокета:

Сокет
class Socket {
public:
  explicit Socket() : m_sock(socket(AF_INET, SOCK_STREAM, 0)) {
     if (m_sock == -1) {
        throw std::runtime_error("failed to create socket");
     }
  }

  ~Socket() { close(m_sock); }
private:
  int m_sock = -1;
}


Теперь сессия:

Сессия
class Session {
public:
  explicit Session(const bool enable_compression) : m_session(libssh2_session_init()) {
     if (m_session == nullptr) {
        throw std::runtime_error("failed to create libssh2 session");
     }

     libssh2_session_set_blocking(m_session, 0);
     if (enable_compression) {
        libssh2_session_flag(m_session, LIBSSH2_FLAG_COMPRESS, 1);
     }
  }

  ~Session() {
     const std::string desc = "Shutting down libssh2 session";
     libssh2_session_disconnect(m_session, desc.c_str());
     libssh2_session_free(m_session);
  }

private:
  LIBSSH2_SESSION *m_session;
}


Раз уж мы теперь имеем сокет и сессию, неплохо было бы написать функцию ожидания для сокета в реалиях libssh2:

Ожидание сокета
int WaitSocket() const {
  pollfd fds{};
  fds.fd = sock;
  fds.events = 0;

  if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_INBOUND) != 0) { 
     fds.events |= POLLIN;
  }
  if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_OUTBOUND) != 0) {
     fds.events |= POLLOUT;
  }

  return poll(&fds, 1, 10);
}


Собственно, это практически ничем не отличается от приведённого выше примера, кроме того, что там используется select вместо poll.

Остаётся канал. В libssh2 есть несколько видов каналов: простой, SCP, direct tcp. Нас интересует самый простой, базовый канал:

Канал
class SimpleChannel {
public:
  explicit SimpleChannel(session) {
     while ((m_channel = libssh2_channel_open_session(session) == nullptr &&
           GetSessionLastError() == LIBSSH2_ERROR_EAGAIN) {
        WaitSocket();
     }
     if (m_channel == nullptr) {
        throw std::runtime_error("Critical error while opening simple channel");
     }
  }

  void SendEof() {
     while (libssh2_channel_send_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) {
        WaitSocket();
     }

     while (libssh2_channel_wait_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) {
       WaitSocket();
     }
  }

  ~SimpleChannel() {
     CloseChannel();
  }
private:
  void CloseChannel() {
     int rc;
     while ((rc = libssh2_channel_close(m_channel)) == LIBSSH2_ERROR_EAGAIN) {
        WaitSocket();
     }
     libssh2_channel_free(m_channel);
  }

  LIBSSH2_CHANNEL *m_channel;
};


Теперь, когда все основные инструменты готовы, остаётся установить соединение с хостом и производить необходимые нам манипуляции. Асинхронная запись в канал и синхронная, разумеется, будут очень сильно различаться, а вот процесс установления соединения — нет.

Поэтому напишем базовый класс:

Базовое соединение
class BaseConnectionImpl {
protected:
  explicit BaseConnectionImpl(const SshConnectData &connect_data) ///< Это любая удобная структура, содержащая информацию о подключении
        : m_session(connect_data.enable_compression)
        , m_connect_data(connect_data) {

     LibSSH2::Init();
     ConnectSocket();
     HandShake();
     ProcessKnownHosts();
     Auth();
  }

  ///Следующие три метода понадобятся нам чуть позже
  bool CheckSocket(int type) const {
     pollfd fds{};
     fds.fd = m_sock;
     fds.events = type;

     return poll(&fds, 1, 0) == 1;
  }

  bool WantRead() const {
     return CheckSocket(POLLIN);
  }

  bool WantWrite() const {
     return CheckSocket(POLLOUT);
  }

  /*Я не был уверен, что реализация соединения, которая почти полностью взята из примера
 * будет кому-то интересна.
 */
  void ConnectSocket() {...}

  void HandShake() {...}

  void Auth() {...}

  class Socket m_sock;
  class Session m_session;
  class SimpleChannel;
  SshConnectData m_connect_data;
}; 


Вот теперь мы готовы написать простейший класс для соединения с удалённым хостом и выполнения на нем какой-либо команды:

Синхронное соединение

class Connection::Impl : public BaseConnectionImpl {
public:
  explicit Impl(const SshConnectData &connect_data)
        : BaseConnectionImpl(connect_data) {}

  template <typename Begin>
  void WriteToChannel(LIBSSH2_CHANNEL *channel, Begin ptr, size_t size) {
     do {
        int rc;
        while ((rc = libssh2_channel_write(channel, ptr, size)) == LIBSSH2_ERROR_EAGAIN) {
           WaitSocket();
        }
        if (rc < 0) {
           break;
        }
        size -= rc;
        ptr += rc;
     } while (size != 0);
  }

  void ExecuteCommand(const std::string &command, const std::string &in = "") {
     SimpleChannel channel(*this);

     int return_code = libssh2_channel_exec(channel, command.c_str());
     if (return_code != 0 && return_code != LIBSSH2_ERROR_EAGAIN) {
        throw std::runtime_error("Critical error while executing ssh command");
     }

     if (!in.empty()) {
        WriteToChannel(channel, in.c_str(), in.size());
        channel.SendEof();
     }

     std::string response;
     for (;;) {
        int rc;
        do {
           std::array<char, 4096> buffer{};
           rc = libssh2_channel_read(channel, buffer.data(), buffer.size());
           if (rc > 0) {
              boost::range::copy(boost::adaptors::slice(buffer, 0, rc), std::back_inserter(response));
           } else if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN) {
              throw std::runtime_error("libssh2_channel_read error (" + std::to_string(rc) + ")");
           }
        } while (rc > 0);

        if (rc == LIBSSH2_ERROR_EAGAIN) {
           WaitSocket();
        } else {
           break;
        }
     }
   }
};


До сих пор всё, что мы писали, было простым приведением примеров libssh2 к более цивилизованному виду. Но теперь, имея все простые инструменты для синхронной записи данных в канал, мы можем перейти к Asio.

Иметь стандартный сокет — это хорошо, но не слишком практично, если нужно асинхронно дождаться его готовности к чтению\записи, занимаясь в процессе своими делами. Тут на помощь приходит boost::asio::ip::tcp::socket, имеющий замечательный метод:

async_wait(wait_type, WaitHandler)

Он замечательным образом конструируется из обычного сокета, для которого мы уже заблаговременно установили соединение и boost::asio::io_context — контекста выполнения нашего приложения.

Конструктор асинхронного соединения
class AsyncConnection::Impl : public BaseConnectionImpl,
 public std::enable_shared_from_this<AsyncConnection::Impl> {
public:
Impl(boost::asio::io_context &context, const SshConnectData &connect_data)
     : BaseConnectionImpl(connect_data)
     , m_tcp_socket(context, tcp::v4(), m_sock.GetSocket()) {
  m_tcp_socket.non_blocking(true);
}
};



Теперь нам надо начать выполнение какой-либо команды на удалённом хосте и, по мере поступления данных от нее, отдавать их в некоторый коллбэк.

void AsyncRun(const std::string &command, CallbackType &&callback) {
  m_read_callback = std::move(callback);

  auto ec = libssh2_channel_exec(*m_channel, command.c_str());
  TryRead();
}

Таким образом, запустив команду, мы передаем управление методу TryRead().

void TryRead() {
  if (m_read_in_progress) {
     return;
  }
  m_tcp_socket.async_wait(tcp::socket::wait_read, [this, self = shared_from_this()](auto ec) {
     if (WantRead()) {
        ReadHandler(ec);
     }
     if (m_complete) {
        return;
     }
     TryRead();
  });
}

Первым делом мы проверяем, не запущен ли уже процесс чтения каким-то предыдущим вызовом. Если нет, то начинаем ожидать готовности сокета для чтения. В качестве хендлера ожидания используется обычная лямбда с захватом shared_from_this().

Обратите внимание на вызов WantRead(). Async_wait, как оказалось, тоже имеет свои изъяны, и может просто вернуться по таймауту. Чтобы в этом случае не производить лишних действий, я решил проверять сокет через poll без таймаута — действительно ли сокет хочет читать сейчас. Если нет, то мы просто снова запускаем TryRead() и ждём. В противном случае мы сразу приступаем к чтению и передаче данных в коллбэк.

void ReadHandler(const boost::system::error_code &error) {
  if (error != boost::system::errc::success) {
     return;
  }
  m_read_in_progress = true;
  int ec = LIBSSH2_ERROR_EAGAIN;
  std::array<char, 4096> buffer {};
  while ((ec = libssh2_channel_read(*m_channel, buffer.data(), buffer.size())) > 0) {
     std::string tmp;
     boost::range::copy(boost::adaptors::slice(buffer, 0, ec), std::back_inserter(tmp));
     if (m_read_callback != nullptr) {
        m_read_callback(tmp);
     }
  }
   m_read_in_progress = false;
}

Таким образом, запускается бесконечный асинхронный цикл чтения от запущенного приложения. Следующим шагом для нас станет отправление инструкций приложению:

void AsyncWrite(const std::string &data, WriteCallbackType &&callback) {
  m_input += data;
  m_write_callback = std::move(callback);
  TryWrite();
}

Переданные в асинхронную запись данные и коллбэк мы сохраним внутри соединения. И запустим очередной цикл, только на этот раз записи:

Цикл записи
void TryWrite() {
  if (m_input.empty() || m_write_in_progress) {
     return;
  }
  m_tcp_socket.async_wait(tcp::socket::wait_write, [this, self = shared_from_this()](auto ec) {
     if (WantWrite()) {
        WriteHandler(ec);
     } 
     if (m_complete) {
        return;
     }

     TryWrite();
  });
}

void WriteHandler(const boost::system::error_code &error) {
  if (error != boost::system::errc::success) {
     return;
  }

  m_write_in_progress = true;

  int ec = LIBSSH2_ERROR_EAGAIN;
  while (!m_input.empty()) {
     auto ptr = m_input.c_str();
     auto read_size = m_input.size();

     while ((ec = libssh2_channel_write(*m_channel, ptr, read_size)) > 0) {
        read_size -= ec;
        ptr += ec;
     }

     AssertResult(ec);
     m_input.erase(0, m_input.size() - read_size);
     if (ec == LIBSSH2_ERROR_EAGAIN) {
        break;
     }
  }
  if (m_input.empty() && m_write_callback != nullptr) {
     m_write_callback();
  }
  m_write_in_progress = false;
}


Таким образом, мы будем записывать в канал данные до тех пор, пока они все не будут успешно переданы. Затем вернём управление вызывающей стороне, чтобы можно было передать новую порцию данных. Так можно не только слать инструкции какому-то приложению на хосте, но и, например, загружать небольшими порциями файлы любого размера, при этом не блокируя тред, что немаловажно.

С помощью этой библиотеки мне удалось успешно запускать на удалённом сервере скрипт, отслеживающий изменения файловой системы, одновременно вычитывая его вывод и посылая различные команды. В целом: очень ценный опыт адаптации си-стайл библиотеки для современного C++ проекта, использующего Boost.

Буду рад прочитать советы более опытных пользователей Boost.Asio, чтобы познать больше и улучшить свое решение :-).
Tags:
Hubs:
+18
Comments8

Articles

Information

Website
www.ispsystem.com
Registered
Founded
Employees
101–200 employees
Location
Россия