net/clientnb.cpp | 3 - net/loolnb.cpp | 125 +++++++++++++++++++------------------------------------ net/socket.hpp | 55 ++++++++++++++++++------ 3 files changed, 88 insertions(+), 95 deletions(-)
New commits: commit ded9607faf8e3e1fa122b200353a3465c94347ae Author: Michael Meeks <[email protected]> Date: Tue Feb 14 23:45:24 2017 +0000 Implement basic buffering. The socket now buffers input, and output, updates its poll record too. We pass a simple message from client to server and back using lamers HTTP. Sub-classed ClientSocket to provide a simple message handler. not very convinced by templatization here, but made it consistent. more ideal to have some virtual socket pieces. diff --git a/net/clientnb.cpp b/net/clientnb.cpp index e4a7fc3..ec7c578 100644 --- a/net/clientnb.cpp +++ b/net/clientnb.cpp @@ -82,7 +82,8 @@ public: std::cerr << "try to get response\n"; std::istream& responseStream = session->receiveResponse(response); - std::cerr << "Got response '" << responseStream << "'\n"; + std::string result(std::istreambuf_iterator<char>(responseStream), {}); + std::cerr << "Got response '" << result << "'\n"; } catch (const Poco::Exception &e) { diff --git a/net/loolnb.cpp b/net/loolnb.cpp index 952c44a..d457471 100644 --- a/net/loolnb.cpp +++ b/net/loolnb.cpp @@ -24,6 +24,34 @@ constexpr int PortNumber = 9191; +class SimpleResponseClient : public ClientSocket +{ +public: + SimpleResponseClient(const int fd) : + ClientSocket(fd) + { + } + virtual void handleIncomingMessage() override + { + std::cerr << "message had size " << _inBuffer.size() << "\n"; + std::ostringstream oss; + oss << "HTTP/1.1 200 OK\r\n" + << "Date: Once, Upon a time GMT\r\n" // Mon, 27 Jul 2009 12:28:53 GMT + << "Server: madeup string (Linux)\r\n" + << "Content-Length: " << _inBuffer.size() << "\r\n" + << "Content-Type: text/plain\r\n" + << "Connection: Closed\r\n" + << "\r\n" + ; + std::string str = oss.str(); + _outBuffer.insert(_outBuffer.end(), str.begin(), str.end()); + + // append the content we got: + _outBuffer.insert(_outBuffer.end(), _inBuffer.begin(), _inBuffer.end()); + _inBuffer.clear(); + } +}; + /// Handles non-blocking socket event polling. /// Only polls on N-Sockets and invokes callback and /// doesn't manage buffers or client data. @@ -102,7 +130,7 @@ public: /// Insert a new socket to be polled. /// Sockets are removed only when the handler return false. - void insertNewSocket(const std::shared_ptr<ClientSocket>& newSocket) + void insertNewSocket(const std::shared_ptr<T>& newSocket) { std::lock_guard<std::mutex> lock(_mutex); @@ -138,8 +166,7 @@ private: for (size_t i = 0; i < size; ++i) { _pollFds[i].fd = _pollSockets[i]->getFD(); - //TODO: Get from the socket. - _pollFds[i].events = POLLIN | POLLOUT; + _pollFds[i].events = _pollSockets[i]->getPollEvents(); _pollFds[i].revents = 0; } @@ -153,10 +180,10 @@ private: /// main-loop wakeup pipe int _wakeup[2]; /// The sockets we're controlling - std::vector<std::shared_ptr<ClientSocket>> _pollSockets; + std::vector<std::shared_ptr<T>> _pollSockets; /// Protects _newSockets std::mutex _mutex; - std::vector<std::shared_ptr<ClientSocket>> _newSockets; + std::vector<std::shared_ptr<T>> _newSockets; /// The fds to poll. std::vector<pollfd> _pollFds; }; @@ -197,39 +224,7 @@ private: Poco::Net::SocketAddress addr("127.0.0.1", PortNumber); -void client(const int timeoutMs) -{ - const auto client = std::make_shared<ClientSocket>(); - if (!client->connect(addr, timeoutMs) && errno != EINPROGRESS) - { - const std::string msg = "Failed to call connect. (errno: "; - throw std::runtime_error(msg + std::strerror(errno) + ")"); - } - - std::cout << "Connected " << client->getFD() << std::endl; - - client->send("1", 1); - int sent = 1; - while (sent > 0 && client->pollRead(5000)) - { - char buf[1024]; - const int recv = client->recv(buf, sizeof(buf)); - if (recv <= 0) - { - perror("recv"); - break; - } - else - { - const std::string msg = std::string(buf, recv); - const int num = stoi(msg); - const std::string new_msg = std::to_string(num + 1); - sent = client->send(new_msg.data(), new_msg.size()); - } - } -} - -void server(SocketPoll<ClientSocket>& poller) +void server(SocketPoll<SimpleResponseClient>& poller) { // Start server. auto server = std::make_shared<ServerSocket>(); @@ -250,7 +245,7 @@ void server(SocketPoll<ClientSocket>& poller) { if (server->pollRead(30000)) { - std::shared_ptr<ClientSocket> clientSocket = server->accept(); + std::shared_ptr<SimpleResponseClient> clientSocket = server->accept<SimpleResponseClient>(); if (!clientSocket) { const std::string msg = "Failed to accept. (errno: "; @@ -264,44 +259,21 @@ void server(SocketPoll<ClientSocket>& poller) } /// Poll client sockets and do IO. -void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop) +void pollAndComm(SocketPoll<SimpleResponseClient>& poller, std::atomic<bool>& stop) { while (!stop) { - poller.poll(5000, [](const std::shared_ptr<ClientSocket>& socket, const int events) + poller.poll(5000, [](const std::shared_ptr<SimpleResponseClient>& socket, const int events) { if (events & POLLIN) - { - char buf[1024]; - const int recv = socket->recv(buf, sizeof(buf)); - if (recv <= 0) - { - perror("recv"); - return false; - } + socket->readIncomingData(); - if (events & POLLOUT) - { - const std::string msg = std::string(buf, recv); - const int num = stoi(msg); - if ((num % (1<<16)) == 1) - { - std::cout << "Client #" << socket->getFD() << ": " << msg << std::endl; - } - const std::string new_msg = std::to_string(num + 1); - const int sent = socket->send(new_msg.data(), new_msg.size()); - if (sent != static_cast<int>(new_msg.size())) - { - perror("send"); - return false; - } - } - else - { - // Normally we'd buffer the response, but for now... - std::cerr << "Client #" << socket->getFD() - << ": ERROR - socket not ready for write." << std::endl; - } + if (events & POLLOUT) + socket->writeOutgoingData(); + + if (events & (POLLHUP | POLLERR | POLLNVAL)) + { + // FIXME - close and remove the socket ... } return true; @@ -309,17 +281,10 @@ void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop) } } -int main(int argc, const char**) +int main(int, const char**) { - if (argc > 1) - { - // We are now the client application. - client(0); - return 0; - } - // Used to poll client sockets. - SocketPoll<ClientSocket> poller; + SocketPoll<SimpleResponseClient> poller; // Start the client polling thread. Thread threadPoll([&poller](std::atomic<bool>& stop) diff --git a/net/socket.hpp b/net/socket.hpp index 83202b4..55ca6b7 100644 --- a/net/socket.hpp +++ b/net/socket.hpp @@ -223,25 +223,51 @@ public: // Now check if we connected, not, or not yet. return (getError() == 0 || errno == EINPROGRESS); } + protected: + std::vector< unsigned char > _inBuffer; + std::vector< unsigned char > _outBuffer; + public: + void readIncomingData() + { + ssize_t len; + unsigned char buf[4096]; + do { + len = ::read(getFD(), buf, sizeof(buf)); + } while (len < 0 && errno == EINTR); + if (len > 0) + { + assert (len < ssize_t(sizeof(buf))); + _inBuffer.insert(_inBuffer.end(), &buf[0], &buf[len]); + handleIncomingMessage(); + } + // else poll will handle errors. + } - /// Send data to our peer. - /// Returns the number of bytes sent, -1 on error. - int send(const void* buf, const size_t len) + void writeOutgoingData() { - // Don't SIGPIPE when the other end closes. - const int rc = ::send(getFD(), buf, len, MSG_NOSIGNAL); - return rc; + assert (_outBuffer.size() > 0); + ssize_t len; + do { + len = ::write(getFD(), &_outBuffer[0], _outBuffer.size()); + } while (len < 0 && errno == EINTR); + if (len > 0) + { + _outBuffer.erase(_outBuffer.begin(), + _outBuffer.begin() + len); + } + // else poll will handle errors } - /// Receive data from our peer. - /// Returns the number of bytes received, -1 on error, - /// and 0 when the peer has performed an orderly shutdown. - int recv(void* buf, const size_t len) + int getPollEvents() { - const int rc = ::recv(getFD(), buf, len, 0); - return rc; + int pollFor = POLLIN | POLLPRI; + if (_outBuffer.size() > 0) + pollFor |= POLLOUT; + return pollFor; } + virtual void handleIncomingMessage() = 0; + protected: ClientSocket(const int fd) : Socket(fd) @@ -284,12 +310,13 @@ public: /// Accepts an incoming connection (Servers only). /// Does not retry on error. /// Returns a valid Socket shared_ptr on success only. - std::shared_ptr<ClientSocket> accept() + template <typename T> + std::shared_ptr<T> accept() { // Accept a connection (if any) and set it to non-blocking. // We don't care about the client's address, so ignored. const int rc = ::accept4(getFD(), nullptr, nullptr, SOCK_NONBLOCK); - return std::shared_ptr<ClientSocket>(rc != -1 ? new ClientSocket(rc) : nullptr); + return std::shared_ptr<T>(rc != -1 ? new T(rc) : nullptr); } }; _______________________________________________ Libreoffice-commits mailing list [email protected] https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits
