GitHub - Tinkoff/stream-client: Lightweight, header-only, Boost-based socket pool library (original) (raw)

Stream-client

Language C++ Github releases Coverage Status License

This is a lightweight, header-only, Boost-based library providing client-side network primitives to easily organize and implement data transmission with remote endpoints.

This library:

Why

If you are writing software on C++, which communicates with other services as a client, you probably already came-across with the problem - need to implement connectivity layer to provide network transport to all related services. This is exactly what this library's aim to solve.

How to use

Sockets

Sockets are implemented on top of boost::asio::basic_socket and provide classes with timeout control, so in most cases it's enough to just call send()/receive()/write_some()/read_some() with a deadline or timeout. Data supplied to I/O operations should be wrapped into boost::asio::buffer. Basically these clients are timeout-wrapped boost::asio::ip sockets and have the same interface.

Client streams classes:

Example

const boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 12345); const std::chrono::milliseconds connect_timeout(1000); const std::chrono::milliseconds io_timeout(100);

// this makes connected socket and may take up to connect_timeout time stream_client::tcp_client client(endpoint, connect_timeout, io_timeout);

// send() operation will transmit all contents of send_data or throw an error; // also it will throw boost::asio::error::timed_out after io_timeout. const std::string send_data = "test data"; client.send(boost::asio::buffer(send_data.data(), send_data.size()));

// receive() operation will read send_data.size() size or throw an error; // also it will throw boost::asio::error::timed_out after io_timeout. std::string recv_data(send_data.size(), '\0'); client.receive(boost::asio::buffer(&recv_data[0], send_data.size()));

// there are also non-throw overloads to these functions which // will set boost::system::error_code instead of exception;

DNS resolver

Resolver is implemented around the same idea - make sync, timed DNS resolver, therefore it uses boost::asio::ip::basic_resolver to perform actual resolution within specified timeouts.

It is the class template with a single parameter - result protocol, also you can specify which type of IP endpoints you are interested in - IPv4, IPv6 or both. Resolution performed with resolve() call which has a similar signature as sockets operations - you are free to specify timeout/deadline or use default one.

Resolver classes:

Example

const std::chrono::milliseconds resolve_timeout(5000);

stream_client::resolver::tcp_resolver resolver("localhost", "12345", resolve_timeout, stream_client::resolver::ip_family::ipv4);

// resolve() guaranteed to return at least one endpoint or will throw an error; // it will throw boost::asio::error::timed_out after resolve_timeout. auto endpoints_it = resolver.resolve();

// returned iterator is an instance of boost::asio::ip::basic_resolver::iterator

Connector

Connector uses resolver to perform DNS resolution and update the list of endpoints and uses them if requested with new_session(), to create a new socket of the specified protocol. The work of DNS resolver is wrapped into background thread which triggered upon creation or if there was an error in opening a new socket.

Target endpoint to open a new socket selected randomly from the DNS results, which makes it balanced connector to a remote host. Also, the connector allows specifying separate timeouts for resolution, connection, and I/O operation of new sockets.

Connector classes:

Example

const std::chrono::milliseconds resolve_timeout(5000); const std::chrono::milliseconds connect_timeout(1000); const std::chrono::milliseconds io_timeout(100);

// this will return immediately, starting background thread for name resolution stream_client::connector::tcp_connector connector("localhost", "12345", resolve_timeout, connect_timeout, io_timeout, stream_client::resolver::ip_family::ipv4);

// this will acquire new socket or throw; // it will throw boost::asio::error::timed_out after connect_timeout std::unique_ptr<stream_client::tcp_client> client = connector.new_session();

// use the client const std::string send_data = "test data"; std::string recv_data(send_data.size(), '\0'); client->send(boost::asio::buffer(send_data.data(), send_data.size())); client->receive(boost::asio::buffer(&recv_data[0], send_data.size()));

Connection pool

Represents container occupied with opened sockets. Uses connector to open new sockets in the background thread which is triggered once there are vacant places in the pool. User can call get_session() to obtain a socket from the pool and return_session() to give it back.

There are two strategies to refill the pool:

Both of them are defined in terms of stream_client::connector::pool_strategy interface, so you are free to implement new one.

Limitations:

  1. Sockets that are already in the pool are not checked or maintained in any way. Hence, the pool doesn't guarantee that all sockets are opened at an arbitrary point in time due to the complexity of such checks for all supported protocols.
  2. Nothing specific done with sockets upon their return within return_session(). Therefore, if they have or will have pending data to read, it will stay there until reading.

Considering this, the best strategy to use a connection pool is such:

  1. Create it, specifying all timeouts as you need.
  2. Once created, use get_session() to obtain opened socket.
  3. Do needed I/O operations on the socket.
  4. If 2-3 succeed, return it back with return_session(), else repeat point 2.

It is important to discard sockets on failure and not reuse them, or request-response management will get nasty.

Connection pools:

There are also aliases for the same pools but using conservative reconnection strategy:

Example

const std::chrono::milliseconds resolve_timeout(5000); const std::chrono::milliseconds connect_timeout(1000); const std::chrono::milliseconds io_timeout(100);

auto connector_pool = std::make_shared<stream_client::connector::tcp_pool>("localhost", "12345", resolve_timeout, connect_timeout, io_timeout, stream_client::resolver::ip_family::ipv4); const size_t threads_num = 10; std::vectorstd::thread threads; const std::string send_data = "test data"; std::mutex stdout_mutex;

for (size_t i = 0; i < threads_num; ++i) { threads.emplace_back(&send_data, &stdout_mutex, connector_pool { std::string recv_data(send_data.size(), '\0');

    auto client = connector_pool->get_session();
    // both these calls will throw exception on error or timeout
    client->send(boost::asio::buffer(send_data.data(), send_data.size()));
    client->receive(boost::asio::buffer(&recv_data[0], send_data.size()));
    // if there was no exception, return it
    connector_pool->return_session(std::move(client));

    const std::lock_guard<std::mutex> lk(stdout_mutex);
    std::cout << std::this_thread::get_id() << ": " << recv_data << std::endl;
});

}

for (auto& t : threads) { t.join(); }

Logging

The library uses basic logger interface implemented internally. You can modify logger level using stream_client::set_log_level() or stream_client::get_log_level() functions.

These levels are supported:

enum class log_level : int { trace = 0, debug, info, warning, error, };

By default library prints messages to stdout with decent formatting. If you want to overwrite this behavior you can set you own logger via:

void stream_client::set_logger(std::shared_ptr<stream_client::log_interface> logger);

void stream_client::set_logger(stream_client::log_level level, stream_client::log_func_type log_func);

Which allows to either overwrite logger instance of use a callback of with proper signature. These types are defines as:

class log_interface { public: virtual void set_level(log_level level) noexcept = 0; virtual log_level get_level() const noexcept = 0; virtual void message(log_level level, const std::string& location, const std::string& message) const = 0; };

using log_func_type = std::function<void(log_level level, const std::string& location, const std::string& message)>;

For more information please look inside logger.hpp.

How to build

This library supposed to be somewhat multi-platform, however, it was tested and mainly used on ubuntu and macOS.
Prefer out-of-source building.

Ubuntu dependencies

sudo apt update sudo apt install build-essential cmake libboost-dev libboost-system-dev libssl-dev

macOS dependencies

brew install cmake pkg-config icu4c openssl boost

To build:

cmake -H. -Bbuild cmake --build ./build

To install (sudo may be required):

cmake -H. -Bbuild -DSTREAMCLIENT_BUILD_TESTING=OFF -DSTREAMCLIENT_BUILD_DOCS=OFF -DSTREAMCLIENT_BUILD_EXAMPLES=OFF cmake --build ./build --target install

Or test:

cmake -H. -Bbuild -DSTREAMCLIENT_BUILD_TESTING=ON -DSTREAMCLIENT_BUILD_DOCS=OFF -DSTREAMCLIENT_BUILD_EXAMPLES=OFF cmake --build ./build cmake -E chdir ./build ctest --output-on-failure

All these commands assume you are in stream-client root folder

Cmake options

License

Developed at Tinkoff.ru in 2020.
Distibuted under Apache License 2.0 LICENSE. You may also obtain this license at https://www.apache.org/licenses/LICENSE-2.0.

Contacts

Author - i.s.vovk@tinkoff.ru
Current maintainers - i.s.vovk@tinkoff.ru, n.suboch@tinkoff.ru