
바람의 나라처럼 2차원 평면을 오갈 수 있는 클라이언트들을 지원하는 서버를 만들어 보았다.
깃허브 :
GitHub - yunu95/IOCP_Roam_Server_Test
Contribute to yunu95/IOCP_Roam_Server_Test development by creating an account on GitHub.
github.com
클라이언트 프로그램은 콘솔창으로 제한된 가로 160 x 세로 40의 평면공간을 표시한다. 이 공간의 플레이어 캐릭터들은 각자 한 글자의 특수문자로 표시되며, 클라이언트를 실행하고 있는 플레이어 자기 자신은 적색의 문자로 표시된다. 이 플레이어는 키보드 입력에 따라 평면공간을 상하좌우로 움직인다.

게임 서버의 구조는 위와 같이, 클라이언트의 연결 시도를 받아 소켓과 세션을 생성하는 Accept 스레드를 하나, 게임의 상태를 업데이트하는 스레드를 하나 구동하고, 나머지 대부분의 CPU 사용은 실시간으로 다수의 클라이언트 프로그램과 소통하는 워커 스레드의 구동에 할애한다.
워커 스레드는 클라이언트의 입력 패킷이 들어왔을 때 이를 세션 정보와 대조해 게임 업데이트를 위한 알맞은 메시지를 생성해 메시지 큐에 넣는다. 게임 업데이트는 워커스레드와는 별개로 매 게임 루프가 실행될때마다 워커 스레드가 쌓아놓은 메시지 적재 큐는 게임 업데이트에서 사용하는 메시지 실행 큐와 스왑되고, 메시지 실행 큐로 넘어온 메시지들은 게임의 상태를 갱신하는데에 쓰인다.
void WorkerRoutine() {
DWORD bytes;
ULONG_PTR key;
LPOVERLAPPED lpOverlapped;
while (GetQueuedCompletionStatus(hIocp, &bytes, &key, &lpOverlapped, INFINITE)) {
...
서버의 WorkerThread의 경우, 소켓 통신이나 파일 입출력과 같은 IO작업이 완료된 경우(예 : 패킷 수신/송신) 이벤트가 완료됨에 따라 스레드가 완료통지를 받은 후, 활성화되어 소켓 통신을 처리하는 방식이다. send나 receive와 같은 비동기 방식의 네트워크 송/수신 함수는 OS에 단일 작업을 요청한 후, 해당 단일 작업이 처리될때까지 지루하도록 기다려야 하지만, 비동기적(Async)으로 WSASend, WSAReceive 함수를 호출하는 경우, GetQueuedCompletionStatus 함수에서 다양한 비동기적 이벤트에 대응할 수 있다.

가용한 모든 스레드를 활용해 효율적으로 네트워크 수신에 대응할 수 있는 비동기 루틴을 돌리니, 클라이언트 프로그램을 400 개체까지 늘려도 서버 프로그램의 CPU 점유율이 1%를 넘기지 않는 것을 볼 수 있다.
생각할만한 점...
1. 여러 스레드가 동시에 힙 메모리 할당을 요구할 경우, 상당한 병목이 생긴다고 한다. 동시성 프로그래밍에서 사용되는 메모리풀 기법을 아는 것이 필요하겠다.
2. 모든 플레이어의 정보를 모든 플레이어에게 브로드캐스팅하기에 N^2의 시간복잡도가 발생한다. 필요한 정보만 플레이어에게 보내는 분할 송신 방식이 있을텐데, 알아봐도 좋을것이다.
클라이언트 코드
main.cpp
#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <windows.h>
#include <string>
#include <map>
#include <thread>
#include <mutex>
#include <ctime>
#pragma comment(lib, "ws2_32.lib")
const int ARENA_W = 160;
const int ARENA_H = 40;
struct RemotePlayer {
int id;
char symbol;
float x, y;
};
std::map<int, RemotePlayer> g_world; // 입력받은 플레이어들의 상태
std::mutex g_worldLock; // 플레이어들의 상태를 갱신할때 걸리는 락
SOCKET g_sock; // 서버와의 통신 소켓
int g_myId = -1; // 서버에게 인식되는 클라이언트 자신의 id
bool g_running = true; // 프로그램 동작 여부
bool g_headless = false; // 더미 클라이언트 플래그, true일시 콘솔창 출력없이 프로세스가 진행됨
// 네트워크로부터 수신한 패킷을 해석해 플레이어 객체들의 상태에 반영하는 수신 스레드함수
void NetworkReceiver() {
char buffer[4096];
std::string streamBuffer;
while (g_running) {
int bytesReceived = recv(g_sock, buffer, sizeof(buffer) - 1, 0);
if (bytesReceived <= 0) { g_running = false; break; }
buffer[bytesReceived] = '\0';
streamBuffer += buffer;
size_t pos;
// 패킷이 줄바꿈 기준으로 있는 경우를 충분히 반영한다.
while ((pos = streamBuffer.find('\n')) != std::string::npos) {
std::string line = streamBuffer.substr(0, pos);
streamBuffer.erase(0, pos + 1);
// 자신을 포함한 유저들의 위치 정보 처리
if (line[0] == 'U') {
int id; char sym; float x, y;
if (sscanf_s(line.c_str(), "U %d %c %f %f", &id, &sym, 1, &x, &y) == 4) {
std::lock_guard<std::mutex> lock(g_worldLock);
RemotePlayer& p = g_world[id];
p.id = id; p.symbol = sym; p.x = x; p.y = y;
}
}
// 클라이언트 유저의 자기 자신의 id정보 수신 처리, 이 정보는 자신의 id에 해당하는 문자를 붉게 칠하는 데에 쓰인다.
if (line[0] == 'W') {
int id; char sym; float x, y;
if (sscanf_s(line.c_str(), "W %d %c %f %f", &id, &sym, 1, &x, &y) == 4) {
if (g_myId == -1) g_myId = id;
}
}
// 통신을 끊은 클라이언트를 삭제하는 처리
else if (line[0] == 'D') {
int idToRemove;
if (sscanf_s(line.c_str(), "D %d", &idToRemove) == 1) {
std::lock_guard<std::mutex> lock(g_worldLock);
g_world.erase(idToRemove);
}
}
}
}
}
int main(int argc, char* argv[]) {
// 더미 클라이언트를 같은 시간에 실행하면 시드값이 같게 되어 문제가 생기기에 프로세스 id로 소금을 뿌린다.
srand((unsigned int)time(NULL) ^ _getpid());
// 프로그램 매개변수로 headless가 있으면 봇모드로 구동
for (int i = 0; i < argc; i++) {
if (std::string(argv[i]) == "-headless") {
g_headless = true;
}
}
HWND consoleWindow = GetConsoleWindow();
// 프로그램 매개변수로 headless가 있으면 윈도우를 숨김
if (g_headless) {
ShowWindow(consoleWindow, SW_HIDE);
}
// Windows Sockets Asynchronous Startup. Ws2_32.dll 라이브러리를 초기화함 2,2는 소켓 버전
// WSAStartup은 호출횟수가 레퍼런스 카운팅되며, 호출된만큼 WSACleanup도 불려야 함.
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
g_sock = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &serverAddr.sin_addr);
serverAddr.sin_port = htons(9000);
if (connect(g_sock, (sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) return 0;
// 수신 스레드 따로
std::thread netThread(NetworkReceiver);
netThread.detach();
// 콘솔창 초기화
HANDLE hConsole = NULL;
wchar_t* screen = nullptr;
WORD* colors = nullptr;
if (!g_headless) {
HANDLE hOut = GetStdHandle(STD_OUTPUT_HANDLE);
SMALL_RECT windowSize = { 0, 0, (short)(ARENA_W - 1), (short)(ARENA_H - 1) };
COORD bufferSize = { (short)ARENA_W, (short)ARENA_H };
SetConsoleScreenBufferSize(hOut, bufferSize);
SetConsoleWindowInfo(hOut, TRUE, &windowSize);
screen = new wchar_t[ARENA_W * ARENA_H];
colors = new WORD[ARENA_W * ARENA_H];
hConsole = CreateConsoleScreenBuffer(GENERIC_READ | GENERIC_WRITE, 0, NULL, CONSOLE_TEXTMODE_BUFFER, NULL);
SetConsoleActiveScreenBuffer(hConsole);
}
DWORD dwBytesWritten = 0;
// 유저 인풋 처리, 패킷 송신, 화면 출력 루프
while (g_running) {
// --- INPUT ---
// headless 모드로 실행되거나 창에서 포커스가 빠지면 무작위로 이동
bool useBotLogic = g_headless || (GetForegroundWindow() != consoleWindow);
if (!useBotLogic) {
if (GetAsyncKeyState('W') & 0x8000) send(g_sock, "W", 1, 0);
if (GetAsyncKeyState('S') & 0x8000) send(g_sock, "S", 1, 0);
if (GetAsyncKeyState('A') & 0x8000) send(g_sock, "A", 1, 0);
if (GetAsyncKeyState('D') & 0x8000) send(g_sock, "D", 1, 0);
if (GetAsyncKeyState(VK_ESCAPE)) g_running = false;
}
else {
static int moveTimer = 0;
static int currentBotDir = 0;
if (moveTimer <= 0) {
currentBotDir = rand() % 4;
moveTimer = (10 + (rand() % 20)) * 0.5f;
}
const char* cmds = "WSAD";
send(g_sock, &cmds[currentBotDir], 1, 0);
moveTimer--;
}
// 창 렌더링
if (!g_headless && hConsole) {
for (int i = 0; i < ARENA_W * ARENA_H; i++) {
screen[i] = L' ';
colors[i] = FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE;
}
for (int x = 0; x < ARENA_W; x++) {
int top = x;
int bot = (ARENA_H - 1) * ARENA_W + x;
screen[top] = screen[bot] = L'#';
colors[top] = colors[bot] = FOREGROUND_INTENSITY;
}
for (int y = 0; y < ARENA_H; y++) {
int left = y * ARENA_W;
int right = y * ARENA_W + (ARENA_W - 1);
screen[left] = screen[right] = L'#';
colors[left] = colors[right] = FOREGROUND_INTENSITY;
}
{
std::lock_guard<std::mutex> lock(g_worldLock);
for (auto const& [id, p] : g_world) {
if (id == g_myId) continue;
int idx = (int)p.y * ARENA_W + (int)p.x;
if (idx >= 0 && idx < ARENA_W * ARENA_H) {
screen[idx] = (wchar_t)p.symbol;
colors[idx] = FOREGROUND_GREEN | FOREGROUND_BLUE;
}
}
if (g_world.count(g_myId)) {
RemotePlayer& me = g_world[g_myId];
int idx = (int)me.y * ARENA_W + (int)me.x;
if (idx >= 0 && idx < ARENA_W * ARENA_H) {
screen[idx] = (wchar_t)me.symbol;
colors[idx] = FOREGROUND_RED | FOREGROUND_INTENSITY;
}
}
}
WriteConsoleOutputCharacterW(hConsole, screen, ARENA_W * ARENA_H, { 0,0 }, &dwBytesWritten);
WriteConsoleOutputAttribute(hConsole, colors, ARENA_W * ARENA_H, { 0,0 }, &dwBytesWritten);
}
Sleep(50);
}
return 0;
}
서버 코드
추상 IOCPServer 클래스
#pragma once
#include <winsock2.h>
#include <ws2tcpip.h>
#include <windows.h>
#include <iostream>
#include <thread>
#include <vector>
#pragma comment(lib, "ws2_32.lib")
#define MAX_PLAYERS 2001
// 세션별로 송/수신 overlapped를 따로 관리
struct Session {
WSAOVERLAPPED recvOverlapped;
WSAOVERLAPPED sendOverlapped; // Dedicated overlapped for bundling
SOCKET socket;
char buffer[1024];
WSABUF wsaBuf;
int playerId;
bool isSending; // The "Gate" for this player
};
class IOCPServer {
public:
virtual ~IOCPServer() { Stop(); }
// 포트에 리스너 소켓을 만들고 클라이언트로부터 수신을 받는 워커스레드, 게임 로직을 실행하는 업데이트 스레드 생성
bool Start(int port) {
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
listenSock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
sockaddr_in addr = { AF_INET, htons(port), INADDR_ANY };
bind(listenSock, (sockaddr*)&addr, sizeof(addr));
listen(listenSock, SOMAXCONN);
isRunning = true;
int threads = std::thread::hardware_concurrency();
for (int i = 0; i < threads; i++) std::thread(&IOCPServer::WorkerRoutine, this).detach();
std::thread(&IOCPServer::UpdateRoutine, this).detach();
return true;
}
// 무한루프를 돌며 Accept받는 구간
void Run() {
while (isRunning) {
sockaddr_in cAddr;
int len = sizeof(cAddr);
SOCKET cSock = WSAAccept(listenSock, (sockaddr*)&cAddr, &len, nullptr, NULL);
if (cSock != INVALID_SOCKET) OnAccept(cSock);
}
}
void Stop() { isRunning = false; closesocket(listenSock); WSACleanup(); }
protected:
// 업데이트 스레드를 위해 존재하는 가상 함수
virtual void OnUpdate() = 0;
// 패킷이 도착했을 때 워커 스레드에서 처리되는 가상함수
virtual void OnRawReceive(int playerId, char* data, int len) = 0;
virtual void OnRawConnect(Session* s) = 0;
virtual void OnRawDisconnect(int playerId) = 0;
HANDLE hIocp;
private:
SOCKET listenSock;
bool isRunning = false;
void UpdateRoutine() {
while (isRunning) {
OnUpdate();
Sleep(16);
}
}
// CPU 개수 * 2 숫자로 돌아가는 워커 스레드들이 돌리는 함수
void WorkerRoutine() {
DWORD bytes;
ULONG_PTR key;
LPOVERLAPPED lpOverlapped;
while (GetQueuedCompletionStatus(hIocp, &bytes, &key, &lpOverlapped, INFINITE)) {
if (!lpOverlapped) continue;
Session* s = (Session*)key;
// send가 끝난거면 다시 소켓 게이트를 열어준다.
if (lpOverlapped == &s->sendOverlapped) {
s->isSending = false;
continue;
}
// RECV Completion
if (bytes == 0) {
OnRawDisconnect(s->playerId);
closesocket(s->socket);
delete s;
continue;
}
OnRawReceive(s->playerId, s->buffer, bytes);
DWORD flags = 0;
ZeroMemory(&s->recvOverlapped, sizeof(WSAOVERLAPPED));
s->wsaBuf.buf = s->buffer;
s->wsaBuf.len = sizeof(s->buffer);
WSARecv(s->socket, &s->wsaBuf, 1, NULL, &flags, &s->recvOverlapped, NULL);
}
}
// accept할 경우 세션을 만들어주고 WSA 통신을 위한 세팅 작업을 진행한다.
void OnAccept(SOCKET cSock) {
BOOL bNoDelay = TRUE;
setsockopt(cSock, IPPROTO_TCP, TCP_NODELAY, (const char*)&bNoDelay, sizeof(bNoDelay));
Session* s = new Session();
s->socket = cSock;
s->isSending = false;
ZeroMemory(&s->recvOverlapped, sizeof(WSAOVERLAPPED));
ZeroMemory(&s->sendOverlapped, sizeof(WSAOVERLAPPED));
OnRawConnect(s);
CreateIoCompletionPort((HANDLE)cSock, hIocp, (ULONG_PTR)s, 0);
DWORD flags = 0;
s->wsaBuf.buf = s->buffer;
s->wsaBuf.len = sizeof(s->buffer);
WSARecv(cSock, &s->wsaBuf, 1, NULL, &flags, &s->recvOverlapped, NULL);
}
};
IOCPServer의 파생, RoamServer 클래스
#pragma once
#include "IOCPServer.h"
#include <map>
#include <vector>
#include <mutex>
#include <set>
#include <algorithm>
#define ARENA_W 160
#define ARENA_H 40
// IOCP 서버의 한 갈래로 클라이언트들을 2차원 평면에서 마구 돌아다니게(Roam) 하는 서버
class RoamServer : public IOCPServer {
private:
struct GameCommand { int pId; char input; bool isDisconnect; };
struct Player { int id; char sym; float x, y; Session* session; };
std::map<int, Player*> players;
std::vector<GameCommand> cmdQueue;
std::mutex qMtx;
int idCounter = 1;
// Static buffers for bundled data (N size, not N^2)
char g_bundleBuffers[MAX_PLAYERS][16384];
protected:
void OnRawConnect(Session* s) override {
s->playerId = idCounter++;
const char* syms = "@#$&%?!*VWXYZKMLNHPabcdefghijklmnopqrstuvwxyz0123456789+={}<>";
players[s->playerId] = new Player{ s->playerId, syms[s->playerId % 61], 20.0f, 7.0f, s };
char wMsg[64];
int len = sprintf_s(wMsg, "W %d %c %.1f %.1f\n", s->playerId, syms[s->playerId % 61], 20.0f, 7.0f);
send(s->socket, wMsg, len, 0);
}
void OnRawReceive(int pId, char* data, int len) override {
std::lock_guard<std::mutex> lock(qMtx);
cmdQueue.push_back({ pId, data[0], false });
}
void OnRawDisconnect(int pId) override {
std::lock_guard<std::mutex> lock(qMtx);
cmdQueue.push_back({ pId, 0, true });
}
// 게임 상태를 업데이트하는 루틴
void OnUpdate() override {
std::vector<GameCommand> frameCmds;
{
std::lock_guard<std::mutex> lock(qMtx);
if (cmdQueue.empty()) return;
frameCmds.swap(cmdQueue);
}
std::reverse(frameCmds.begin(), frameCmds.end());
std::set<int> movedThisFrame;
// Master frame buffer to hold all U updates for the current tick
static char masterFrameBuffer[16384];
int masterLen = 0;
for (auto& cmd : frameCmds) {
if (cmd.isDisconnect) {
if (players.count(cmd.pId)) {
char dMsg[32];
int dLen = sprintf_s(dMsg, "D %d\n", cmd.pId);
delete players[cmd.pId];
players.erase(cmd.pId);
for (auto const& [id, p] : players) send(p->session->socket, dMsg, dLen, 0);
}
continue;
}
if (movedThisFrame.count(cmd.pId) || !players.count(cmd.pId)) continue;
movedThisFrame.insert(cmd.pId);
Player* p = players[cmd.pId];
if (cmd.input == 'W') p->y -= 1.0f;
else if (cmd.input == 'S') p->y += 1.0f;
else if (cmd.input == 'A') p->x -= 1.0f;
else if (cmd.input == 'D') p->x += 1.0f;
p->x = (std::max)(1.0f, (std::min)((float)ARENA_W - 2, p->x));
p->y = (std::max)(1.0f, (std::min)((float)ARENA_H - 2, p->y));
// Append this movement to the bundled string
masterLen += sprintf_s(masterFrameBuffer + masterLen, 16384 - masterLen,
"U %d %c %.1f %.1f\n", p->id, p->sym, p->x, p->y);
}
if (masterLen <= 0) return;
// Broadcast the bundle to everyone
for (auto const& [targetId, targetPlayer] : players) {
Session* s = targetPlayer->session;
// Only send if the previous bundle has been fully transmitted
if (!s->isSending) {
memcpy(g_bundleBuffers[targetId], masterFrameBuffer, masterLen);
WSABUF wsaBuf;
wsaBuf.buf = g_bundleBuffers[targetId];
wsaBuf.len = (ULONG)masterLen;
ZeroMemory(&s->sendOverlapped, sizeof(WSAOVERLAPPED));
s->isSending = true;
if (WSASend(s->socket, &wsaBuf, 1, NULL, 0, &s->sendOverlapped, NULL) == SOCKET_ERROR) {
if (WSAGetLastError() != WSA_IO_PENDING) s->isSending = false;
}
}
}
Sleep(20);
}
};
main.cpp
#include <winsock2.h>
#include <ws2tcpip.h>
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <string>
#include <map>
#include <unordered_set>
#include "RoamServer.h"
#pragma comment(lib, "ws2_32.lib")
RoamServer server;
int main() {
if (server.Start(9000)) server.Run();
return 0;
}
















































































































































































































































































