Now lets test on windows

pull/3441/head
LowLevelLore 2 months ago
parent 287333ee00
commit de0d0dc3f4
No known key found for this signature in database
GPG Key ID: 1E21A3708E5E6689

@ -59,11 +59,11 @@ public:
SOCKET fd() const { return socket_; }
// try to connect or throw on failure
void connect(const std::string &host, int port) {
void connect(const std::string &host, int port, int timeout_ms) {
if (is_connected()) {
close();
}
struct addrinfo hints {};
struct addrinfo hints{};
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_UNSPEC; // To work with IPv4, IPv6, and so on
@ -90,12 +90,49 @@ public:
WSACleanup();
continue;
}
if (::connect(socket_, rp->ai_addr, (int)rp->ai_addrlen) == 0) {
break;
// set non-blocking if timeout specified
u_long mode = (timeout_ms > 0) ? 1UL : 0UL;
::ioctlsocket(socket_, FIONBIO, &mode);
rv = ::connect(socket_, rp->ai_addr, (int)rp->ai_addrlen);
if (rv == SOCKET_ERROR) {
last_error = WSAGetLastError();
if (timeout_ms > 0 && last_error == WSAEWOULDBLOCK) {
fd_set wfds;
FD_ZERO(&wfds);
FD_SET(socket_, &wfds);
timeval tv;
tv.tv_sec = timeout_ms / 1000;
tv.tv_usec = (timeout_ms % 1000) * 1000;
rv = ::select(0, nullptr, &wfds, nullptr, &tv);
if (rv > 0) {
int so_error = 0;
int len = sizeof(so_error);
::getsockopt(socket_, SOL_SOCKET, SO_ERROR, (char *)&so_error, &len);
if (so_error == 0)
rv = 0;
else {
last_error = so_error;
rv = SOCKET_ERROR;
}
} else {
last_error = ::WSAGetLastError();
close();
last_error = (rv == 0 ? WSAETIMEDOUT : WSAGetLastError());
rv = SOCKET_ERROR;
}
}
}
// restore blocking mode
mode = 0UL;
::ioctlsocket(socket_, FIONBIO, &mode);
if (rv == 0) {
last_error = 0;
break;
}
::closesocket(socket_);
socket_ = INVALID_SOCKET;
}
::freeaddrinfo(addrinfo_result);
if (socket_ == INVALID_SOCKET) {
@ -103,6 +140,10 @@ public:
throw_winsock_error_("connect failed", last_error);
}
DWORD tv = static_cast<DWORD>(timeout_ms);
::setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof(tv));
::setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, (const char *)&tv, sizeof(tv));
// set TCP_NODELAY
int enable_flag = 1;
::setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char *>(&enable_flag),

@ -39,16 +39,84 @@ public:
~tcp_client() { close(); }
int connect_socket_with_timeout(int sockfd,
const struct sockaddr *addr,
socklen_t addrlen,
const timeval &tv) {
// Blocking connect if timeout is zero
if (tv.tv_sec == 0 && tv.tv_usec == 0) {
int rv = ::connect(sockfd, addr, addrlen);
if (rv < 0 && errno == EISCONN) {
// already connected, treat as success
return 0;
}
return rv;
}
// Non-blocking path
int orig_flags = ::fcntl(sockfd, F_GETFL, 0);
if (orig_flags < 0) {
return -1;
}
if (::fcntl(sockfd, F_SETFL, orig_flags | O_NONBLOCK) < 0) {
return -1;
}
int rv = ::connect(sockfd, addr, addrlen);
if (rv == 0 || (rv < 0 && errno == EISCONN)) {
// immediate connect or already connected
::fcntl(sockfd, F_SETFL, orig_flags);
return 0;
}
if (errno != EINPROGRESS) {
::fcntl(sockfd, F_SETFL, orig_flags);
return -1;
}
// wait for writability
fd_set wfds;
FD_ZERO(&wfds);
FD_SET(sockfd, &wfds);
struct timeval tv_copy = tv;
rv = ::select(sockfd + 1, nullptr, &wfds, nullptr, &tv_copy);
if (rv <= 0) {
// timeout or error
::fcntl(sockfd, F_SETFL, orig_flags);
if (rv == 0) errno = ETIMEDOUT;
return -1;
}
// check socket error
int so_error = 0;
socklen_t len = sizeof(so_error);
if (::getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &so_error, &len) < 0) {
::fcntl(sockfd, F_SETFL, orig_flags);
return -1;
}
::fcntl(sockfd, F_SETFL, orig_flags);
if (so_error != 0 && so_error != EISCONN) {
errno = so_error;
return -1;
}
return 0;
}
// try to connect or throw on failure
void connect(const std::string &host, int port) {
void connect(const std::string &host, int port, int timeout_ms = 0) {
close();
struct addrinfo hints {};
struct addrinfo hints{};
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; // To work with IPv4, IPv6, and so on
hints.ai_socktype = SOCK_STREAM; // TCP
hints.ai_flags = AI_NUMERICSERV; // port passed as as numeric value
hints.ai_protocol = 0;
struct timeval tv;
tv.tv_sec = timeout_ms / 1000;
tv.tv_usec = (timeout_ms % 1000) * 1000;
auto port_str = std::to_string(port);
struct addrinfo *addrinfo_result;
auto rv = ::getaddrinfo(host.c_str(), port_str.c_str(), &hints, &addrinfo_result);
@ -69,8 +137,9 @@ public:
last_errno = errno;
continue;
}
rv = ::connect(socket_, rp->ai_addr, rp->ai_addrlen);
if (rv == 0) {
::fcntl(socket_, F_SETFD, FD_CLOEXEC);
if (connect_socket_with_timeout(socket_, rp->ai_addr, rp->ai_addrlen, tv) == 0) {
last_errno = 0;
break;
}
last_errno = errno;
@ -82,6 +151,10 @@ public:
throw_spdlog_ex("::connect failed", last_errno);
}
// Set timeouts for send and recv
::setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof(tv));
::setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, (const char *)&tv, sizeof(tv));
// set TCP_NODELAY
int enable_flag = 1;
::setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char *>(&enable_flag),

@ -31,6 +31,7 @@ namespace sinks {
struct tcp_sink_config {
std::string server_host;
int server_port;
int timeout_ms = 0;
bool lazy_connect = false; // if true connect on first log call instead of on construction
tcp_sink_config(std::string host, int port)
@ -44,10 +45,22 @@ public:
// connect to tcp host/port or throw if failed
// host can be hostname or ip address
explicit tcp_sink(const std::string &host,
int port,
int timeout_ms = 0,
bool lazy_connect = false)
: config_{host, port} {
config_.timeout_ms = timeout_ms;
config_.lazy_connect = lazy_connect;
if (!config_.lazy_connect) {
client_.connect(config_.server_host, config_.server_port, config_.timeout_ms);
}
}
explicit tcp_sink(tcp_sink_config sink_config)
: config_{std::move(sink_config)} {
if (!config_.lazy_connect) {
this->client_.connect(config_.server_host, config_.server_port);
client_.connect(config_.server_host, config_.server_port, config_.timeout_ms);
}
}
@ -58,7 +71,7 @@ protected:
spdlog::memory_buf_t formatted;
spdlog::sinks::base_sink<Mutex>::formatter_->format(msg, formatted);
if (!client_.is_connected()) {
client_.connect(config_.server_host, config_.server_port);
client_.connect(config_.server_host, config_.server_port, config_.timeout_ms);
}
client_.send(formatted.data(), formatted.size());
}

Loading…
Cancel
Save