반응형
- 클라이언트 접속이 많은 경우, 부하 조정. 쓰레드 풀을 만들어 성능 관리. 동시 실행 쓰레드 수를 제한하여 시스템 과부하 방지.
- 쓰레드 생성 오버헤드 제거. 동시 접속자수 제한으로 서버 안정성 확보. 쓰레드 풀 크기 조정을 하여 성능 조정.
- ^C로 종료.
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <WinSock2.h>
#include <WS2tcpip.h>
#include <windows.h>
#include <string>
#include <sstream>
#include <atomic>
#include <csignal>
#include <chrono>
#pragma comment(lib, "ws2_32.lib")
#define SERVER_PORT 30000
#define BUFFER_SIZE 1024
#define MAX_THREADS 5 // 최대 스레드 수; CPU 코어 수 만큼 설정
#define MAX_QUEUE_SIZE 500 // 최대 큐 크기. 대기 클라이언트 수 만큼 설정.
#define MAX_CONNECTIONS (MAX_QUEUE_SIZE+MAX_THREADS+10) // 최대 연결 수
#define SOCKET_TIMEOUT 3000 // 소켓 connect 타임아웃 (밀리초)
#define IDLE_TIMEOUT 10000 // 클라이언트 유휴 타임아웃 (밀리초)
// 디버그 및 에러 로그 매크로 정의
static bool g_debug_mode = false;
static bool g_error_mode = false;
static bool g_info_mode = true;
#define DEBUG_LOG(msg) \
if (g_debug_mode) { \
std::stringstream ss; \
ss << "[DEBUG] " << msg; \
std::cout << ss.str() << std::endl; \
}
#define ERROR_LOG(msg) \
if (g_error_mode) { \
std::stringstream ss; \
ss << "[ERROR] " << msg; \
std::cerr << ss.str() << std::endl; \
}
#define INFO_LOG(msg) \
if (g_info_mode) { \
std::stringstream ss; \
ss << "[INFO] " << msg; \
std::cout << ss.str() << std::endl; \
}
// 전역 변수
static std::atomic<size_t> g_total_requests{0};
static size_t g_last_printed_count{0};
static std::chrono::steady_clock::time_point g_last_print_time;
static SOCKET g_serverSocket = INVALID_SOCKET;
class ThreadPool {
public:
std::atomic<size_t> current_connections{0}; // 현재 총 연결 수
std::atomic<size_t> active_threads{0}; // 현재 작업 중인 스레드 수
size_t GetQueueSize() {
std::lock_guard<std::mutex> lock(queue_mutex);
return tasks.size(); // 대기 큐에 있는 클라이언트 수
}
private:
std::vector<std::thread> workers;
std::queue<std::pair<SOCKET, sockaddr_in>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
std::atomic<bool> stop{false};
void HandleClient(SOCKET clientSocket, sockaddr_in clientAddr) {
active_threads++; // 작업 시작
try {
// 소켓 타임아웃 설정
struct timeval timeout;
timeout.tv_sec = IDLE_TIMEOUT / 1000;
timeout.tv_usec = (IDLE_TIMEOUT % 1000) * 1000;
if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0) {
ERROR_LOG("Failed to set receive timeout");
}
if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char*)&timeout, sizeof(timeout)) < 0) {
ERROR_LOG("Failed to set send timeout");
}
char buffer[BUFFER_SIZE];
char clientIP[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &clientAddr.sin_addr, clientIP, INET_ADDRSTRLEN);
DEBUG_LOG("Client connected - IP: " << clientIP
<< ", Port: " << ntohs(clientAddr.sin_port));
int bytesReceived = recv(clientSocket, buffer, BUFFER_SIZE, 0);
if (bytesReceived == SOCKET_ERROR) {
if (WSAGetLastError() == WSAETIMEDOUT) {
ERROR_LOG("Client " << clientIP << " timed out");
} else {
ERROR_LOG("Receive error from " << clientIP);
}
} else if (bytesReceived > 0) {
buffer[bytesReceived] = '\0';
DEBUG_LOG("Received from " << clientIP << ": " << buffer);
// 요청 처리 카운터 증가
g_total_requests.fetch_add(1, std::memory_order_relaxed);
std::string response;
if (strcmp(buffer, "sysinfo") == 0) {
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
response = "Processor count: " + std::to_string(sysInfo.dwNumberOfProcessors);
}
else if (strcmp(buffer, "netinfo") == 0) {
char hostname[256];
gethostname(hostname, sizeof(hostname));
response = "Hostname: " + std::string(hostname);
}
else if (strcmp(buffer, "diskinfo") == 0) {
DWORD sectorsPerCluster, bytesPerSector, freeClusters, totalClusters;
GetDiskFreeSpace(NULL, §orsPerCluster, &bytesPerSector,
&freeClusters, &totalClusters);
response = "Disk total space: " +
std::to_string((double)totalClusters * sectorsPerCluster *
bytesPerSector / (1024 * 1024 * 1024)) + " GB";
}
else {
response = "Unknown command";
}
send(clientSocket, response.c_str(), response.length(), 0);
}
DEBUG_LOG("Client disconnected - IP: " << clientIP);
} catch (const std::exception& e) {
ERROR_LOG("Exception in HandleClient: " << e.what());
}
closesocket(clientSocket);
current_connections--;
active_threads--; // 작업 완료
}
public:
~ThreadPool() {
Shutdown();
}
void Shutdown() {
DEBUG_LOG("ThreadPool shutdown initiated");
stop = true;
condition.notify_all();
// 스레드 종료 대기 시작 시간 기록
auto start = std::chrono::steady_clock::now();
bool timeout = false;
// worker 스레드들의 종료를 기다림
for (std::thread& worker : workers) {
if (worker.joinable()) {
try {
// 현재 경과 시간 체크
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start).count();
if (elapsed >= 3) { // 3초 초과
timeout = true;
break;
}
// 남은 시간만큼만 대기
worker.join();
} catch (const std::exception& e) {
ERROR_LOG("Thread join error: " << e.what());
}
}
}
if (timeout) {
ERROR_LOG("Thread shutdown timeout - forcing termination");
// 남은 스레드들을 강제 분리
for (std::thread& worker : workers) {
if (worker.joinable()) {
worker.detach();
}
}
}
// 남은 작업들의 소켓을 정리
std::unique_lock<std::mutex> lock(queue_mutex);
while (!tasks.empty()) {
auto& task = tasks.front();
closesocket(task.first);
tasks.pop();
}
current_connections = 0;
DEBUG_LOG("ThreadPool shutdown completed");
}
// worker 스레드 로직 수정
ThreadPool() {
for (int i = 0; i < MAX_THREADS; ++i) {
workers.emplace_back([this] {
while (true) {
std::pair<SOCKET, sockaddr_in> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] {
return stop || !tasks.empty();
});
if (stop && tasks.empty()) {
DEBUG_LOG("Worker thread exiting");
return;
}
if (!tasks.empty()) {
task = std::move(tasks.front());
tasks.pop();
}
}
if (task.first != INVALID_SOCKET) {
HandleClient(task.first, task.second);
}
}
});
}
}
bool EnqueueClient(SOCKET clientSocket, sockaddr_in clientAddr) {
if (current_connections >= MAX_CONNECTIONS) {
ERROR_LOG("Maximum connections reached");
closesocket(clientSocket);
return false;
}
current_connections++;
std::unique_lock<std::mutex> lock(queue_mutex);
if (tasks.size() >= MAX_QUEUE_SIZE) {
char clientIP[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &clientAddr.sin_addr, clientIP, INET_ADDRSTRLEN);
ERROR_LOG("Queue is full. Connection rejected from " << clientIP
<< ":" << ntohs(clientAddr.sin_port));
closesocket(clientSocket);
current_connections--; // 연결 거부 시 카운터 감소
return false;
}
tasks.push({clientSocket, clientAddr});
lock.unlock();
condition.notify_one();
return true;
}
};
// ThreadPool 클래스 전방 선언
class ThreadPool;
static ThreadPool* g_threadPool = nullptr;
static std::atomic<bool> g_running{true};
// 모니터링 함수
void MonitorRequests() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(5));
if (g_threadPool != nullptr) {
size_t current_count = g_total_requests.load();
size_t current_conns = g_threadPool->current_connections.load();
size_t active_threads = g_threadPool->active_threads.load();
size_t waiting_clients = g_threadPool->GetQueueSize();
INFO_LOG("Server Status:"
"\n Total requests processed: " << current_count <<
"\n Current connections: " << current_conns <<
"\n Active threads: " << active_threads << "/" << MAX_THREADS <<
"\n Waiting clients: " << waiting_clients);
}
}
}
void SignalHandler(int signal) {
static std::atomic<bool> shutting_down{false};
if (shutting_down.exchange(true)) {
return;
}
INFO_LOG("Signal " << signal << " received. Shutting down...");
g_running = false;
try {
if (g_threadPool) {
g_threadPool->Shutdown();
// delete g_threadPool;
g_threadPool = nullptr;
}
} catch (const std::exception& e) {
ERROR_LOG("Exception in SignalHandler: " << e.what());
}
if (g_serverSocket != INVALID_SOCKET) {
closesocket(g_serverSocket);
g_serverSocket = INVALID_SOCKET;
}
WSACleanup();
std::exit(0);
}
void GracefulShutdown() {
INFO_LOG("Initiating graceful shutdown");
g_running = false;
// 새로운 연결 거부
if (g_serverSocket != INVALID_SOCKET) {
shutdown(g_serverSocket, SD_BOTH);
closesocket(g_serverSocket);
}
}
int main() {
signal(SIGINT, SignalHandler);
signal(SIGTERM, SignalHandler);
// 디버그 모드 설정 (환경변수나 커맨드 라인 인자로도 설정 가능)
g_debug_mode = false; // 또는 true
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
ERROR_LOG("WSAStartup failed");
return 1;
}
SOCKET serverSocket = socket(AF_INET, SOCK_STREAM, 0);
g_serverSocket = serverSocket; // 전역 변수에 저장
int reuseAddr = 1; // 소켓 재사용 허용
setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR,
(const char*)&reuseAddr, sizeof(reuseAddr));
struct linger lin;
lin.l_onoff = 1;
lin.l_linger = 0; // 즉시 종료, TIME_WAIT 상태 방지
setsockopt(serverSocket, SOL_SOCKET, SO_LINGER,
(const char*)&lin, sizeof(lin));
sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(SERVER_PORT);
serverAddr.sin_addr.s_addr = INADDR_ANY;
if (bind(serverSocket, (sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {
ERROR_LOG("Bind failed");
closesocket(serverSocket);
WSACleanup();
return 1;
}
if (listen(serverSocket, SOMAXCONN) == SOCKET_ERROR) {
ERROR_LOG("Listen failed");
closesocket(serverSocket);
WSACleanup();
return 1;
}
// 타임아웃 설정
struct timeval timeout;
timeout.tv_sec = SOCKET_TIMEOUT / 1000; // 초 단위
timeout.tv_usec = (SOCKET_TIMEOUT % 1000) * 1000; // 마이크로초 단위
setsockopt(serverSocket, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout));
setsockopt(serverSocket, SOL_SOCKET, SO_SNDTIMEO, (char*)&timeout, sizeof(timeout));
// 모니터링 스레드 시작
g_last_print_time = std::chrono::steady_clock::now();
std::thread monitor_thread(MonitorRequests);
monitor_thread.detach(); // 메인 스레드와 분리
INFO_LOG("Server started on port " << SERVER_PORT);
ThreadPool pool;
g_threadPool = &pool; // 전역 변수에 저장
while (g_running) {
sockaddr_in clientAddr;
int clientAddrSize = sizeof(clientAddr);
SOCKET clientSocket = accept(serverSocket, (sockaddr*)&clientAddr, &clientAddrSize);
if (clientSocket == INVALID_SOCKET) {
if (!g_running) break; // 정상적인 종료 상황
ERROR_LOG("Accept failed");
continue;
}
if (!pool.EnqueueClient(clientSocket, clientAddr)) {
char clientIP[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &clientAddr.sin_addr, clientIP, INET_ADDRSTRLEN);
ERROR_LOG("Connection rejected from " << clientIP
<< ":" << ntohs(clientAddr.sin_port));
}
}
INFO_LOG("Server end.");
// 정상 종료 처리
closesocket(serverSocket);
WSACleanup();
return 0;
}
'Develop > C&CPP' 카테고리의 다른 글
[service] 윈도우 서비스 프로그램 (0) | 2025.01.05 |
---|---|
[Server] work, cache, lock, 동시접근문제, 성능문제 (0) | 2025.01.05 |
[알고리즘] 정렬 알고리즘 비교/성능측정 (1) | 2019.12.16 |
[알고리즘] 정렬5: quick sort (0) | 2019.12.15 |
[알고리즘] 정렬4: merge sort (0) | 2019.12.15 |