Client-server framework for a multiplayer game (part 1)

Now that I’ve got a working game engine it’s time to add some interesting features to it. One that I’ve always desired to implement is online multiplayer, the ability to play with another player (possibly more) over the net. As it turned out, it’s not so trivial, and there are a lot of different concepts involved in making a network application (network protocols, game objects synchronization and replication, multithreading ecc…). I’ve been put off so many times but now I decided to tackle this intricated but very fun aspect of game development. In this first (in a series, hopefully) post I present a very simple client-server framework, that should be the base of the netcode for my game engine. I used Winsock as the socket API, but the API is very similar (I dare to say, identical, since it’s largely based on it) to  the BSD sockets API, so porting the code to Linux should be a breeze. By the way I suggest reading the great Beej’s guide to network programming if you want to get started with sockets programming (which I don’t cover here, as well as I don’t cover client-server architecture concepts), and this series of videos on making a multiplayer MMORPG (by the way, Javidx9 is a great great teacher and its videos are a wonderful resource for game developers, I warmly suggest you follow him), even if it uses the ASIO library, but the underlying concepts are nonetheless the same (though with less socket hair pulling and less concurrency headaches).

Disclaimer: I know my implementation is quite naive, and maybe it’s not lightning fast, and that there’re lots of ways to improve the design out there, but this is my first attempt at a network framework so please bear with me.

A client-server framework

Here’s a diagram that shows the nitty-gritty details of how the framework works. It might seem quite intricate at first, but don’t worry, I’m gonna go over the inner workings of the framework and show you all the code:

Messsages are sent from clients to the server and from the server to the clients (i know, that’s quite an insight isn’t it? more to come). Networking is a highly asynchronous process, messages are sent and received who knows when, connection fails unexpectedly, so when I hear “asynchronous” I understand “threads”. Messages are stored in (threadsafe) queues, and sockets do the hard work of sending and receiving data over the connection in their own thread. Queues act as buffers between connection endpoints so the application can merrily run while messages are sent and received. As shown in the diagram, the framework is made up of the following classes:

  • A message is represented by the Message class. It’s a class template (as every other class in the framework) and the template type parameter is an enum (or enum class) that specifies the different types of messages that will be defined by the custom application.  A message has a header, which contains the type of the message (an enumerator from the enumeration defined by the application) and the size of the payload. The content of the message itself is stored inside a vector of bytes. The Message class defines operations to serialize and deserialize data into and from it (overloading the insertion/output and extraction/input operators), and some overloads for dealing with strings in particular. The header file also defines an OwnedMessage class template, that derives from Message and is simply a standard message which contains the sender, in the form of a pointer to a connection object (used internally by the server to know who sent a particular message, since messages from all clients are stored into the same queue):
#ifndef MESSAGE_H
#define MESSAGE_H

#include "Vector.h"
#include <string>

template <typename T>
class Connection;

template <typename T>
class Message
{
	friend class Connection<T>;
public:
	Message(T type) : mHeader{ type, 0U } {}

	T GetType() const { return mHeader.mType; }

	template <typename D>
	Message<T> &operator<<(D const &data)
	{
		uint32_t oldSize = mHeader.mSize;
		mHeader.mSize += sizeof data;

		mBody.Resize(mHeader.mSize);
		std::memcpy(mBody.Data() + oldSize, &data, sizeof data);

		return *this;
	}

	template <typename D>
	Message<T> &operator>>(D &data) 
	{
		size_t index = mBody.Size() - sizeof data;
		memcpy(&data, mBody.Data() + index, sizeof data);

		mBody.Resize(index);
		mHeader.mSize = mBody.Size();

		return *this;
	}

	Message<T> &operator<<(const std::string &string)
	{
		uint32_t oldSize = mHeader.mSize;

		mHeader.mSize += string.size() + sizeof(uint32_t);

		mBody.Resize(mHeader.mSize);

		for (unsigned int i = oldSize, j = 0U; i < mHeader.mSize - sizeof(uint32_t); i++, j++)
			mBody.Data()[i] = string[j];
		
		mBody.Data()[mHeader.mSize - sizeof(uint32_t)] = string.size();

		return *this;
	}

	Message<T> &operator<<(const char *cString)
	{
		uint32_t oldSize = mHeader.mSize;

		mHeader.mSize += std::strlen(cString) + sizeof(uint32_t);

		mBody.Resize(mHeader.mSize);

		for (unsigned int i = oldSize, j = 0U; i < mHeader.mSize - 4; i++, j++)
			mBody.Data()[i] = cString[j];

		mBody.Data()[mHeader.mSize - 4] = std::strlen(cString);

		return *this;
	}

	Message<T> &operator>>(std::string &data)
	{
		uint32_t length = mBody.Data()[mHeader.mSize - 4];

		for (unsigned int i = mHeader.mSize - (4 + length); i < mHeader.mSize - 4; i++)
			data.push_back(mBody.Data()[i]);

		mHeader.mSize -= length + 4;

		mBody.Resize(mHeader.mSize);

		return *this;
	}

private:
	Message() = default;  // only Connection<T> can call default ctor

	struct Header
	{
		T mType = T();  // T mType{};       // type of message (enum)
		uint32_t mSize = 0U;                // size of message's body
	} mHeader;

	Vector<uint8_t> mBody;                      // message data
};   

template <typename T>
class OwnedMessage : public Message<T>
{
private:
	using ConnectionPtr = std::shared_ptr<Connection<T>>;
public:
	OwnedMessage(ConnectionPtr sender, const Message<T> &message) : mSender(sender), Message<T>(message) {}

	ConnectionPtr GetSender() const { return mSender; }
private:
	ConnectionPtr mSender;
};

#endif  // MESSAGE_H
  • The Connection class represents a connection endpoint (the other side of the connection, a connected client if the owner of the connection object is the server, or the server if the connection object is owned by a client). A connection object contains,among other things, a socket, a queue of outgoing messages and a reference to a queue of incoming messages, owned by the owner of the connection and passed to the connection object during construction. When a connection is created, a thread is started that manages the sending and receiving of messages: the thread task (the Connection<T>::Run() member function) checks if there is a message to be sent in the queue, dequeues the message and sends it over the socket connection; it then checks if the socket has available data (the socket is in non-blocking mode), receives the message and enqueues it into the incoming message queue. As the diagram shows, the server mantains a list of connections (all clients currently connected), while a client has only one connection (the server):
#ifndef CONNECTION_H
#define CONNECTION_H

#include <memory>
#include <thread>
#include <condition_variable>
#include "ThreadsafeQueue.h"
#include "Message.h"
#include "debug.h"

template <typename T>
class Connection : public std::enable_shared_from_this<Connection<T>>
{
public:
	enum class Owner { CLIENT, SERVER };
private:
	using std::enable_shared_from_this<Connection>::shared_from_this;
public:
	Connection(Owner owner, uint32_t id, const std::string host, uint16_t port, SOCKET socket, ThreadsafeQueue<OwnedMessage<T>> &inMessageQueue, std::condition_variable &condVar);
	~Connection() { Close(); }

	void Send(const Message<T> &message);  
	void Close();

	std::atomic<bool> mIsOpen;

	std::string const &GetHost() const { return mHost; }
	uint16_t GetPort() const { return mPort; }
	uint32_t GetId() const { return mId; }
private:
	std::string mHost;  // other side's endpoint host
	uint16_t mPort;     // other side's endpoint port

	const Owner mOwner;
	uint32_t mId;

	SOCKET mSocket;  

	std::condition_variable &mCondVar;

	ThreadsafeQueue<Message<T>> mOutMessageQueue;
	ThreadsafeQueue<OwnedMessage<T>> &mInMessageQueue;

	std::thread mRunThread;
	void Run();	
};

template <typename T>
Connection<T>::Connection(Owner owner, uint32_t id, const std::string host, uint16_t port, SOCKET socket, ThreadsafeQueue<OwnedMessage<T>> &inMessageQueue, std::condition_variable &condVar)
	: mOwner(owner), mId(id), mHost(host), mPort(port), mSocket(socket), mInMessageQueue(inMessageQueue), mCondVar(condVar)
{
	unsigned long socketMode = 1U;
	if (ioctlsocket(socket, FIONBIO, &socketMode) != 0)   // set non blocking socket
		Error("error setting socket i/o mode");

	mIsOpen = true;
	mRunThread = std::thread(&Connection<T>::Run, this);
}

template <typename T>
void Connection<T>::Send(const Message<T> &message)
{
	mOutMessageQueue.EnQueue(message);
}

template <typename T>
void Connection<T>::Close()
{
	if (mIsOpen)
		mIsOpen = false;

	if (mRunThread.joinable())
		mRunThread.join();

	closesocket(mSocket);
}

template <typename T>
void Connection<T>::Run()
{
	while (mIsOpen)
	{
		if (!mOutMessageQueue.Empty())   // while there are outgoing messages in queue send 
		{
			Message<T> outMessage = mOutMessageQueue.Front();
			mOutMessageQueue.DeQueue();

			int sendFlags = 0;
			int totalBytesSent = 0;

			while (totalBytesSent < sizeof(Message<T>::Header))
			{
				int bytesSent = send(mSocket, reinterpret_cast<char*>(&outMessage.mHeader) + totalBytesSent, sizeof(Message<T>::Header) - totalBytesSent, sendFlags);
				totalBytesSent += bytesSent;

				if (bytesSent == SOCKET_ERROR)
					Error("error sending message");
			}

			totalBytesSent = 0;
			while (totalBytesSent < outMessage.mBody.Size())
			{
				int bytesSent = send(mSocket, reinterpret_cast<char*>(outMessage.mBody.Data()) + totalBytesSent, outMessage.mBody.Size() - totalBytesSent, sendFlags);
				totalBytesSent += bytesSent;

				if (bytesSent == SOCKET_ERROR)
					Error("error sending message");
			}
		}

		thread_local static bool skip = false;  // used in non blocking mode

		skip = false;

		Message<T> inMessage;

		int recvFlags = 0;
		int totalBytesReceived = 0;

		while (totalBytesReceived < sizeof(Message<T>::Header))  // receive message header
		{
			int bytesReceived = recv(mSocket, reinterpret_cast<char*>(&inMessage.mHeader) + totalBytesReceived, sizeof(Message<T>::Header) - totalBytesReceived, recvFlags);

			if (bytesReceived == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK)
			{
				skip = true;
				break;
			}

			if (bytesReceived == SOCKET_ERROR || bytesReceived == 0)  // other side closed connection
			{
				mIsOpen = false;
				skip = true;
				mCondVar.notify_one();

				break;
			}

			totalBytesReceived += bytesReceived;
		}

		if (skip)
			continue;

		inMessage.mBody.Resize(inMessage.mHeader.mSize);

		totalBytesReceived = 0;
		while (totalBytesReceived < inMessage.mBody.Size())  // receive message body
		{
			int bytesReceived = recv(mSocket, reinterpret_cast<char*>(inMessage.mBody.Data()) + totalBytesReceived, inMessage.mBody.Size() - totalBytesReceived, recvFlags);

			if (bytesReceived == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK)
				continue;

			if (bytesReceived == SOCKET_ERROR || bytesReceived == 0)  // other side closed connection
			{
				mIsOpen = false;
				skip = true;
				mCondVar.notify_one();

				break;
			}

			totalBytesReceived += bytesReceived;
		}

		if (skip)
			continue;

		if (mOwner == Owner::SERVER)
			mInMessageQueue.EnQueue(OwnedMessage<T>(shared_from_this(), inMessage));  // put received message into incoming queue	
		else
			mInMessageQueue.EnQueue(OwnedMessage<T>(nullptr, inMessage));
	}
}

#endif  // CONNECTION_H
  • The Server class is the base abstract class to be derived from by a custom server application (which will provide the implementation for all the pure virtual member functions callbacks). It contains a message queue of incoming messages (a queue of OwnedMessages) and a list of connection objects. It has a constructor that lets the user specify on which port the server is going to run. It has a Start() member function that starts the server and puts it in listening mode, ready to accept connections from clients. Whenever a client connects, the server creates a connection object and stores it in the list of connections (the server continuosly listens for connections in a dedicated thread that is started in the Start() member function). The ProcessMessage() member function gets a message from the incoming queue and calls the OnMessage() member function (overridden by the custom application), that will perform the necessary operations based on the type of message. The Available() method checks if the queue has messages and it should be called before trying to get a message from the queue. The Send() and SendAll() member functions are used respectively to send messages to a specific client or to broadcast a message to all clients:
#ifndef SERVER_H
#define SERVER_H

#include <WinSock2.h>
#include <WS2tcpip.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <string>
#include "Vector.h"
#include "Connection.h"
#include "ThreadsafeQueue.h"
#include "Message.h"
#include "debug.h"

template <typename T>
class Server
{
protected:
	using ConnectionPtr = std::shared_ptr<Connection<T>>;  // type alias for a shared pointer to a connection object
public:
	Server(uint16_t port);
	~Server();

	void Start();
	void Stop();
	void Send(ConnectionPtr connection, const Message<T> &message) const;
	void Send(uint32_t connectionId, const Message<T> &message) const;
	void SendAll(const Message<T> &message, ConnectionPtr ignore = nullptr) const;

	bool Available() const { return !mInMessageQueue.Empty(); }
	void ProcessMessage();
protected:
	virtual void OnStart() = 0;
	virtual void OnListen() = 0;
	virtual bool OnClientConnect(ConnectionPtr connection) = 0;
	virtual void OnClientAccepted(ConnectionPtr connection) = 0;
	virtual void OnClientDisconnect(ConnectionPtr connection) = 0;
	virtual void OnMessage(ConnectionPtr sender, Message<T> &message) = 0;

	std::string mHost;
	uint16_t mPort;
private:
	mutable std::mutex mMutex;        // guards the list of connections (listen thread / removal thread)
	std::condition_variable mCondVar;

	Vector<ConnectionPtr> mConnections;
	ThreadsafeQueue<OwnedMessage<T>> mInMessageQueue;
	
	SOCKET mListenSocket;

	std::thread mListenThread;
	void Listen();

	std::thread mRemoveConnectionsThread;
	void RemoveConnections();

	static const uint8_t sMaxNumConnections = 10;
	bool mIsRunning;
};

template <typename T>
Server<T>::Server(uint16_t port) :mListenSocket(INVALID_SOCKET), mIsRunning(false)
{
	WSAData wsa;
	WSAStartup(MAKEWORD(2, 2), &wsa);

	addrinfo hints, *address;

	memset(&hints, 0, sizeof hints);
	hints.ai_family = AF_UNSPEC;
	hints.ai_socktype = SOCK_STREAM;
	hints.ai_protocol = IPPROTO_TCP;
	//hints.ai_flags = AI_PASSIVE;

	if (getaddrinfo(nullptr, std::to_string(port).c_str(), &hints, &address) != 0)
		Error("cannot get server address");

	char stringBuf[INET6_ADDRSTRLEN];
	if (address->ai_family == AF_INET)
		inet_ntop(AF_INET, &reinterpret_cast<sockaddr_in *>(address->ai_addr)->sin_addr, stringBuf, sizeof stringBuf);
	else  // address->ai_family == AF_INET6
		inet_ntop(AF_INET6, &reinterpret_cast<sockaddr_in6 *>(address->ai_addr)->sin6_addr, stringBuf, sizeof stringBuf);

	mHost = stringBuf;

	mPort = ntohs(address->ai_family == AF_INET ? reinterpret_cast<sockaddr_in *>(address->ai_addr)->sin_port : reinterpret_cast<sockaddr_in6 *>(address->ai_addr)->sin6_port);

	if ((mListenSocket = socket(address->ai_family, address->ai_socktype, address->ai_protocol)) == INVALID_SOCKET)
		Error("cannot create server socket");

	if (bind(mListenSocket, address->ai_addr, address->ai_addrlen) != 0)  
		Error("cannot bind server socket");
}

template <typename T>
Server<T>::~Server()
{
	mIsRunning = false;

	if (mRemoveConnectionsThread.joinable())
		mRemoveConnectionsThread.join();

	if (mListenThread.joinable())
		mListenThread.join();  // TODO: accept non-blocking
		 
	closesocket(mListenSocket);

	WSACleanup();
}

template <typename T>
void Server<T>::Start()
{
	if (mIsRunning)  // TODO: error check
		return;

	mIsRunning = true;

	OnStart();

	mListenThread = std::thread(&Server::Listen, this);                        // thread that listens for and accepts new connections
	mRemoveConnectionsThread = std::thread(&Server::RemoveConnections, this);  // thread that waits for clients to disconnect
}
template <typename T>
void Server<T>::Stop()
{
	mIsRunning = false;

	if (mListenThread.joinable())  // TODO: non-blocking listening socket 
		mListenThread.join();

	for (std::shared_ptr<Connection<T>> &connection : mConnections)  // lock mutex
		connection->Close();

	if (mRemoveConnectionsThread.joinable())
		mRemoveConnectionsThread.join();
}

template <typename T>
void Server<T>::Listen()
{
	if (listen(mListenSocket, sMaxNumConnections) != 0)
		Error("listen error");

	OnListen();

	static uint32_t connectionId = 1000U;

	while (mIsRunning)
	{
		sockaddr_storage clientAddress;
		int clientAddressLength = sizeof(sockaddr_storage);
		SOCKET clientSocket = accept(mListenSocket, reinterpret_cast<sockaddr*>(&clientAddress), &clientAddressLength);    // accept connections (blocking)

		if (clientSocket == INVALID_SOCKET)
			Error("cannot create client socket");

		char clientHost[INET6_ADDRSTRLEN];
		if (clientAddress.ss_family == AF_INET)
			inet_ntop(AF_INET, &reinterpret_cast<sockaddr_in *>(&clientAddress)->sin_addr, clientHost, sizeof clientHost);
		else  // clientAddress.ss_family == AF_INET6
			inet_ntop(AF_INET6, &reinterpret_cast<sockaddr_in6 *>(&clientAddress)->sin6_addr, clientHost, sizeof clientHost);

		uint16_t clientPort = clientAddress.ss_family == AF_INET ? ntohs(reinterpret_cast<sockaddr_in *>(&clientAddress)->sin_port) : ntohs(reinterpret_cast<sockaddr_in6 *>(&clientAddress)->sin6_port);

		std::lock_guard<std::mutex> guard(mMutex);

		ConnectionPtr newConnection(new Connection<T>(Connection<T>::Owner::SERVER, connectionId++, clientHost, clientPort, clientSocket, mInMessageQueue, mCondVar));

		if (OnClientConnect(newConnection))            // callback called on new connections (TODO: refusal doesn't work, connect and then disconnect?)
		{
			mConnections.InsertLast(newConnection);    // if connection is accepted 
			OnClientAccepted(mConnections.Last());
		}
	}
}

template <typename T>
void Server<T>::Send(ConnectionPtr connection, const Message<T> &message) const
{
	if (connection->mIsOpen)
		connection->Send(message);
}

template <typename T>
void Server<T>::Send(uint32_t connectionId, const Message<T> &message) const
{
	std::lock_guard<std::mutex> guard(mMutex);

	for (const ConnectionPtr &connection : mConnections)
	{
		if (connection->GetId() == connectionId)
			connection->Send(message);
	}
}

template <typename T>
void Server<T>::SendAll(const Message<T> &message, ConnectionPtr ignore) const
{
	std::lock_guard<std::mutex> guard(mMutex);

	for (const ConnectionPtr &connection : mConnections)
	{
		if (connection != ignore && connection->mIsOpen)
			connection->Send(message);
	}
}

template <typename T>
void Server<T>::ProcessMessage()
{
	OwnedMessage<T> message(mInMessageQueue.Front());
	mInMessageQueue.DeQueue();

	OnMessage(message.GetSender(), message);
}

template <typename T>
void Server<T>::RemoveConnections()
{
	while (mIsRunning)
	{
		std::unique_lock<std::mutex> guard(mMutex);
		mCondVar.wait(guard);   // this thread waits for a connection to notify that a client has disconnected

		typename Vector<ConnectionPtr>::Iterator it = mConnections.Begin();   // poll which client disconnected from server and remove connection
		while (it != mConnections.End())
			if (!(*it)->mIsOpen)
			{
				OnClientDisconnect(*it);
				it = mConnections.Remove(it);  // calls Connection's destructor which closes the connection
			}
			else
				++it;
	}
}

#endif  // SERVER_H
  • The Client class is the base abstract class to be derived from by a custom client application (which, like the concrete server class, will implement the pure virtual event handlers). Like the Server class, it contains a queue of incoming messages but unlike the server it has a single connection object. A client connects to a server with the Connect() method, and can disconnect from the server using the Disconnect() member function. Similarly to the Server class, a client can send messages with Send(), and process messages with ProcessMessage(), after checking if the queue is not empty with a call to Available():
#ifndef CLIENT_H
#define CLIENT_H

#include <WinSock2.h>
#include <WS2tcpip.h>
#include <string>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include "ThreadsafeQueue.h"
#include "Message.h"
#include "Connection.h"
#include "debug.h"

template <typename T>
class Client
{
public:
	Client();
	~Client();

	void Connect(const std::string &host, uint16_t port);  
	void Disconnect();
	void Send(const Message<T> &message);

	bool Available() const { return !mInMessageQueue.Empty(); }
	void ProcessMessage();

	bool IsConnected() const { std::lock_guard<std::mutex> guard(mMutex);  return mConnection != nullptr; }
protected:
	virtual void OnConnect(const std::string host, uint16_t port) = 0;
	virtual void OnDisconnect() = 0;
	virtual void OnConnectionLost() = 0;
	virtual void OnMessage(Message<T> &message) = 0;

	uint32_t mId;
private:
	mutable std::mutex mMutex;
	std::condition_variable mCondVar;

	std::unique_ptr<Connection<T>> mConnection;     
	ThreadsafeQueue<OwnedMessage<T>> mInMessageQueue;

	std::thread mCheckConnectionLostThread;
	void CheckConnectionLostThread()   
	{
		std::unique_lock<std::mutex> lock(mMutex);
		mCondVar.wait(lock, [&] { return mConnection == nullptr ? true : !mConnection->mIsOpen.load(); }); 

		if (mConnection)   // server closed connection (notify from connection) otherwise connection is null and client closed connection (notify from Disconnect or from destructor)
		{
			mConnection.reset();   // destroys the connection (calls Connection<T>::Close()) and sets pointer to null
			OnConnectionLost();
		}
	}
};

template <typename T>
Client<T>::Client()
{
	WSAData wsa;
	WSAStartup(MAKEWORD(2, 2), &wsa);
}

template <typename T>
Client<T>::~Client() 
{	
	Disconnect();
        WSACleanup();
}

template <typename T>
void Client<T>::Connect(std::string const &host, uint16_t port)
{
	addrinfo hints, *serverAddress;

	memset(&hints, 0, sizeof hints);
	hints.ai_family = AF_UNSPEC;
	hints.ai_socktype = SOCK_STREAM;
	hints.ai_protocol = IPPROTO_TCP;
	//hints.ai_flags = AI_PASSIVE;

	if (getaddrinfo(host.c_str(), std::to_string(port).c_str(), &hints, &serverAddress) != 0)
		Error("cannot get connection address");

	SOCKET connectionSocket;
	if ((connectionSocket = socket(serverAddress->ai_family, serverAddress->ai_socktype, serverAddress->ai_protocol)) == INVALID_SOCKET)
		Error("cannot create connection socket");

	if (connect(connectionSocket, serverAddress->ai_addr, serverAddress->ai_addrlen) != 0)
		Error("connection error");

	char serverHost[INET6_ADDRSTRLEN];
	if (serverAddress->ai_family == AF_INET)
		inet_ntop(AF_INET, &reinterpret_cast<sockaddr_in*>(serverAddress->ai_addr)->sin_addr, serverHost, sizeof serverHost);
	else  // serverAddress->ai_family == AF_INET6
		inet_ntop(AF_INET6, &reinterpret_cast<sockaddr_in6*>(serverAddress->ai_addr)->sin6_addr, serverHost, sizeof serverHost);

	uint16_t serverPort = serverAddress->ai_family == AF_INET ? ntohs(reinterpret_cast<sockaddr_in*>(serverAddress->ai_addr)->sin_port) : ntohs(reinterpret_cast<sockaddr_in6*>(serverAddress->ai_addr)->sin6_port);

	mConnection = std::make_unique<Connection<T>>(Connection<T>::Owner::CLIENT, 0U, serverHost, serverPort, connectionSocket, mInMessageQueue, mCondVar);
	mCheckConnectionLostThread = std::thread(&Client::CheckConnectionLostThread, this);    // started after connection is created (notify always after wait)
	OnConnect(mConnection->GetHost(), mConnection->GetPort());
}

template <typename T>
void Client<T>::Disconnect()
{
	if (!mConnection)
		return;

	{
		std::lock_guard<std::mutex> lock(mMutex);
		mConnection.reset();      // destroys the connection (calls Connection<T>::Close()) and sets pointer to null
	}

	mCondVar.notify_one();       // notify the thread waiting for server side 

	if (mCheckConnectionLostThread.joinable())
		mCheckConnectionLostThread.join();
}

template <typename T>
void Client<T>::Send(const Message<T> &message)
{
	if (mConnection)
		mConnection->Send(message);
}

template <typename T>
void Client<T>::ProcessMessage()
{
	OwnedMessage<T> message = mInMessageQueue.Front();
	mInMessageQueue.DeQueue();

	OnMessage(message);
}

#endif

Both the Client and Server classes internally use a thread to check wheter a connection is closed from the other side (a client disconnecting from server, or a server crashing, for example). This thread waits on a condition variable and is notified by a connection object if the other side disconnects or a network error occurs.

The FIFO queue used to store incoming and outgoing messages is a (quite naive) implementation of a thread-safe queue (since it is manipulated from various threads). It’s not the most efficient implementation (it just synchronizes all operations on a mutex and returns elements by value), but it gets the work done.

Testing the framework: a simple client-server application

A custom application provides the enumeration that defines all the types of messages, and derives from the Server and Client abstract classes to provide implementation for the pure virtual member functions/event handlers in the base classes. The main thread of the example application here just loops infinitely processing messages:

// server application

#include "Server.h"

enum class MyMessages : uint8_t
{
	SERVER_ACCEPT, SERVER_REFUSE, TEXT_MSG,
};

class MyServer : public Server<MyMessages>
{
public:
	MyServer(uint16_t port) : Server(port) {}

	void OnStart() override
	{
		PRINTLN("server running");
	}

	void OnListen() override
	{
		PRINT("server listening @ ");
		PRINT(mHost);
		PRINT(" on port ");
		PRINTLN(std::to_string(mPort));
	}

	bool OnClientConnect(ConnectionPtr connection) override
	{   

		Message<MyMessages> message(MyMessages::SERVER_ACCEPT);
		message << connection->GetId();

		Send(connection, message);

		return true;
	}

	void OnClientAccepted(ConnectionPtr client) override
	{
		PRINT("accepted connection from ");
		PRINT(client->GetHost());
		PRINT(" on port ");
		PRINTLN(client->GetPort());
	}

	void OnClientDisconnect(ConnectionPtr connection)
	{
		PRINT("removed connection: "); 
		PRINTLN(connection->GetId());
	}

	void OnMessage(ConnectionPtr sender, Message<MyMessages> &message)
	{
		switch (message.GetType())
		{
			case MyMessages::TEXT_MSG:
			{
				uint32_t senderId;
				message >> senderId;

				uint32_t recipientId;
				message >> recipientId;

				std::string s;
				message >> s;
				
				message << s;
				message << recipientId;
				message << senderId;

				PRINT("[");
				PRINT(std::to_string(senderId));
				PRINT("] : ");
				PRINTLN(s);

				if (recipientId == 0U)
					break;
				else if (recipientId == (uint32_t)-1)
					SendAll(message);
				else
					Send(recipientId, message);
			}
				break;
		}
	}
};

int main(int argc, char **argv)
{
	MyServer server(60005);
	server.Start();

	while (true)
	{
		if (server.Available())
			server.ProcessMessage();
	}

	return 0;
}

The server application defines the message enumeration and implements the callbacks in the base class. The OnMessage() override handles different types of messages inside a switch statement. The client side of the application is similar, it just spawns a new thread that accepts input from the console, in order to send text from client to server and between clients:

// client side application

#include "Client.h"
#include <thread>
#include <iostream>

enum class MyMessages: uint8_t
{
	SERVER_ACCEPT, SERVER_REFUSE, TEXT_MSG,
};

class MyClient : public Client<MyMessages>
{
public:
	~MyClient()
	{
		if (mConsoleThread.joinable())
			mConsoleThread.join();
	}

	void OnConnect(const std::string host, uint16_t port) override
	{
		PRINT("connected to server running @ ");
		PRINT(host);
		PRINT(" on port ");
		PRINTLN(port);
	}

	void OnDisconnect() override
	{

	}

	void OnConnectionLost() override
	{
		PRINTLN("lost connection with server");
	}

	void OnMessage(Message<MyMessages> &message) override
	{
		switch (message.GetType())
		{
			case MyMessages::SERVER_ACCEPT:
			{
				uint32_t id;
				message >> mId;

				Message<MyMessages> cmsg(MyMessages::TEXT_MSG);
				cmsg << "hello from client ";
				cmsg << 0U;
				cmsg << mId;

				Send(cmsg);

				mConsoleThread = std::thread(&MyClient::ConsoleThread, this);
			}
			break;

			case MyMessages::SERVER_REFUSE:
			{
				std::string reason;
				message >> reason;

				PRINT("server refused connection: ");  PRINTLN(reason);
				
				Disconnect();
			}
			break;

			case MyMessages::TEXT_MSG:
			{
				uint32_t senderId;
				message >> senderId;

				uint32_t recipientId;
				message >> recipientId;

				std::string s;
				message >> s;

				PRINT("["); PRINT(std::to_string(senderId)); PRINT("] : "); PRINTLN(s);
			}
			break;
		}
	}
private:
	std::thread mConsoleThread;

	void ConsoleThread()
	{
		while (IsConnected())
		{
			std::string s;

			std::getline(std::cin, s);

			if (!std::cin)
				continue;

			if (s == "exit")
			{
				Disconnect();
				break;
			}

			Message<MyMessages> cmsg(MyMessages::TEXT_MSG);
			cmsg << s;

			if (s.find("all:", 0, 4) != std::string::npos)
				cmsg << (uint32_t)-1;
			else if (s.find("to:", 0, 3) != std::string::npos)
			{
				size_t index = 3;
				cmsg << (uint32_t)std::stoul(&s[3], &index);
			}
			else
				cmsg << 0U;

			cmsg << mId;

			Send(cmsg);
		}
	}
};

int main(int argc, char **argv)
{
	MyClient client;
	client.Connect("localhost", 60005);

	while (client.IsConnected())
	{
		if (client.Available())
			client.ProcessMessage();
	}

	return 0;
}

Running the server and a couple of clients on the same machine we can send text messages from one client application to another, with the server routing the messages between clients.

This is it for now, a basic implementation of a network framework that I’m going to use in my game engine. There’s a lot to be improved: for example I don’t think I’ll end up using TCP for my game, maybe I’ll go for UDP with a custom-tailored protocol on top, and maybe I’ll get rid of some threads if I can. Synchronizing the state of the game among clients is another quite fun challenge.

Leave a Reply

Your email address will not be published. Required fields are marked *