C++ socket 编程及多线程编程,聊天室实现

注:本文默认读者学过计网、操作系统、数据结构等知识

关于socket

​ 学过计网的应该知道TCP、UDP等协议,如果我们手写这些协议的话,会非常麻烦,因此有socket,socket介于应用层与传输层之间,socket一组接口,为程序员提供了方便,如下图:

socket层次

socket编程的思路如下图,注意,这里的消息传输是单工通信,当我们掌握单工通信之后,再在该基础上,实现全双工通信。

socket编程

上图中,WSAStartup()与closesocket()函数是在Windows下需要使用的库函数,否则将无法进行socket通信的启动与关闭,如果你在linux上进行socket通信,你可以忽略WSAStartup(),closesocket()也可以改为用close()

socket提供的函数

常用 Berkeley Sockets API 一览表

函数名称 函数简单描述 附加说明
socket 创造某种类型的套接字
bind 将一个 socket 绑定一个ip与端口的二元组上
listen 将一个 socket 变为侦听状态
connect 试图建立一个 TCP 连接 一般用于客户端
accept 尝试接收一个连接 一般用于服务端
send 通过一个socket发送数据
recv 通过一个socket收取数据
select 判断一组socket上的读事件
gethostbyname 通过域名获取机器地址
close 关闭一个套接字,回收该 socket 对应的资源 Windows 系统中对应的是 closesocket
shutdown 关闭 socket 收或发通道
setsockopt 设置一个套接字选项
getsockopt 获取一个套接字选项

socket()

函数原型

int socket(int af, int type, int protocol) // 函数调用成功,会返回一个标识这个套接字的文件描述符,失败则返回-1
// 展开
int socket(
  int af, //地址族规范。
  int type,//新套接字的类型规范。
  int protocol//要使用的协议。
);

socket函数对应于普通文件的打开操作。普通文件的打开操作返回一个文件描述字,而socket()用于创建一个socket描述符(socket descriptor),它唯一标识一个socket。这个socket描述字跟文件描述字一样,后续的操作都有用到它,把它作为参数,通过它来进行一些读写操作。

正如可以给fopen的传入不同参数值,以打开不同的文件。创建socket的时候,也可以指定不同的参数创建不同的socket描述符,socket函数的三个参数分别为:

  • af:即协议域,又称为协议族(family)。常用的协议族有,AF_INET、AF_INET6、AF_LOCAL(或称AF_UNIX,Unix域socket)、AF_ROUTE等等。协议族决定了socket的地址类型,在通信中必须采用对应的地址,如AF_INET决定了要用ipv4地址(32位的)与端口号(16位的)的组合、AF_UNIX决定了要用一个绝对路径名作为地址。
  • type:指定socket类型。常用的socket类型有,SOCK_STREAM、SOCK_DGRAM、SOCK_RAW、SOCK_PACKET、SOCK_SEQPACKET等。
  • protocol:故名思意,就是指定协议。常用的协议有,IPPROTO_TCP、IPPTOTO_UDP、IPPROTO_SCTP、IPPROTO_TIPC等,它们分别对应TCP传输协议、UDP传输协议、STCP传输协议、TIPC传输协议。

注意:并不是上面的type和protocol可以随意组合的,如SOCK_STREAM不可以跟IPPROTO_UDP组合。当protocol为0时,会自动选择type类型对应的默认协议。

当我们调用socket创建一个socket时,返回的socket描述字它存在于协议族(address family,AF_XXX)空间中,但没有一个具体的地址。如果想要给它赋值一个地址,就必须调用bind()函数,否则就当调用connect()、listen()时系统会自动随机分配一个端口。

bind()函数

正如上面所说bind()函数把一个地址族中的特定地址赋给socket。例如对应AF_INET、AF_INET6就是把一个ipv4或ipv6地址和端口号组合赋给socket。

int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

函数的三个参数分别为:

  • sockfd:即socket描述字,它是通过socket()函数创建了,唯一标识一个socket。bind()函数就是将给这个描述字绑定一个名字。

  • addr:一个const struct sockaddr *指针,指向要绑定给sockfd的协议地址。这个地址结构根据地址创建socket时的地址协议族的不同而不同。

    如ipv4对应的是:

    struct sockaddr_in {
      sa_family_t    sin_family; /* address family: AF_INET */
      in_port_t      sin_port;   /* port in network byte order */
      struct in_addr sin_addr;   /* internet address */
    };
    
    /* Internet address. */
    struct in_addr {
      uint32_t       s_addr;     /* address in network byte order */
    };
    

    ipv6对应的是:

    struct sockaddr_in6 { 
      sa_family_t     sin6_family;   /* AF_INET6 */ 
      in_port_t       sin6_port;     /* port number */ 
      uint32_t        sin6_flowinfo; /* IPv6 flow information */ 
      struct in6_addr sin6_addr;     /* IPv6 address */ 
      uint32_t        sin6_scope_id; /* Scope ID (new in 2.4) */ 
    };
    
    struct in6_addr { 
      unsigned char   s6_addr[16];   /* IPv6 address */ 
    };
    
  • addrlen:对应的是地址的长度。

通常服务器在启动的时候都会绑定一个众所周知的地址(如ip地址+端口号),用于提供服务,客户就可以通过它来接连服务器;而客户端就不用指定,有系统自动分配一个端口号和自身的ip地址组合。这就是为什么通常服务器端在listen之前会调用bind(),而客户端就不会调用,而是在connect()时由系统随机生成一个。

listen()、connect()

如果作为一个服务器,在调用socket()、bind()之后就会调用listen()来监听这个socket,如果客户端这时调用connect()发出连接请求,服务器端就会接收到这个请求。

int listen(int sockfd, int backlog);
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

listen函数的第一个参数即为要监听的socket描述字,第二个参数为相应socket可以排队的最大连接个数。socket()函数创建的socket默认是一个主动类型的,listen函数将socket变为被动类型的,等待客户的连接请求。

connect函数的第一个参数即为客户端的socket描述字,第二参数为服务器的socket地址,第三个参数为socket地址的长度。客户端通过调用connect函数来建立与TCP服务器的连接。

accept()

TCP服务器端依次调用socket()、bind()、listen()之后,就会监听指定的socket地址了。TCP客户端依次调用socket()、connect()之后就想TCP服务器发送了一个连接请求。TCP服务器监听到这个请求之后,就会调用accept()函数取接收请求,这样连接就建立好了。之后就可以开始网络I/O操作了,即类同于普通文件的读写I/O操作。

int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

accept函数的第一个参数为服务器的socket描述字,第二个参数为指向struct sockaddr *的指针,用于返回客户端的协议地址,第三个参数为协议地址的长度。如果accpet成功,那么其返回值是由内核自动生成的一个全新的描述字,代表与返回客户的TCP连接。

注意:accept的第一个参数为服务器的socket描述字,是服务器开始调用socket()函数生成的,称为监听socket描述字;而accept函数返回的是已连接的socket描述字。一个服务器通常通常仅仅只创建一个监听socket描述字,它在该服务器的生命周期内一直存在。内核为每个由服务器进程接受的客户连接创建了一个已连接socket描述字,当服务器完成了对某个客户的服务,相应的已连接socket描述字就被关闭。

recv()、send()

至此服务器与客户已经建立好连接了。可以调用网络I/O进行读写操作了,即实现了网咯中不同进程之间的通信!网络I/O操作有下面几组:

  • read()/write()
  • recv()/send()
  • readv()/writev()
  • recvmsg()/sendmsg()
  • recvfrom()/sendto()

如上函数的操作都大同小异,具体细节这里不做展开,这里以recv()/send()为例。

ssize_t recv(int sockfd, void *buf, size_t len, int flags);
ssize_t send(int sockfd, const void *buf, size_t len, int flags);

recv函数是负责从fd中读取内容.当读成功时,read返回实际所读的字节数,如果返回的值是0表示已经读到文件的结束了,小于0表示出现了错误。如果错误为EINTR说明读是由中断引起的,如果是ECONNREST表示网络连接出了问题。

send函数将buf中的nbytes字节内容写入文件描述符fd.成功时返回写的字节数。失败时返回-1,并设置errno变量。 在网络程序中,当我们向套接字文件描述符写时有俩种可能。

  • send的返回值大于0,表示写了部分或者是全部的数据。

  • 返回的值小于0,此时出现了错误。我们要根据错误类型来处理。如果错误为EINTR表示在写的时候出现了中断错误。如果为EPIPE表示网络连接出现了问题(对方已经关闭了连接)。

close()

在服务器与客户端建立连接之后,会进行一些读写操作,完成了读写操作就要关闭相应的socket描述字,好比操作完打开的文件要调用fclose关闭打开的文件。

int close(int fd);

close一个TCP socket的缺省行为时把该socket标记为以关闭,然后立即返回到调用进程。该描述字不能再由调用进程使用,也就是说不能再作为read或write的第一个参数。

注意:close操作只是使相应socket描述字的引用计数-1,只有当引用计数为0的时候,才会触发TCP客户端向服务器发送终止连接请求。

示例

单工通信

默认以windows为环境,编写该程序,流程如图

程序介绍:该程序实现从客户端向服务器发送一次数据传输,并在服务器显示出来

socket编程

// Serve.cpp
#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>

#pragma comment(lib, "ws2_32.lib") // windows下socket编程需要引入该库

#define DEFAULT_PORT 5270 // 服务器端口
#define DEFAULT_ADDR "127.0.0.1" // 服务器地址
#define BUFFER_SIZE 1500 // 最大传输长度

using namespace std;

int main(){
    // 初始化winsock库
    WORD wVersionRequested;
    WSADATA wsaData;
    wVersionRequested = MAKEWORD(2, 2);
    int err = WSAStartup(wVersionRequested, &wsaData);
    if (err != 0) {
        printf("初始化winsock失败,错误代码: %d\n", err);
        return 1;
    }

    // 创建socket
    int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (sock == -1) {
        cout << "创建socket失败!" << endl;
        return 1;
    }

    // 配置socket信息
    struct sockaddr_in serv_addr;
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(DEFAULT_PORT);
    serv_addr.sin_addr.s_addr = inet_addr(DEFAULT_ADDR);

    // 绑定ip与端口
    int bind_res = bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
    if (bind_res == -1) {
        cout << "绑定IP和端口号失败!" << endl;
        return 1;
    }

    // 监听 ip:port
    listen(sock, 20);

    // 等待一个链接
    struct sockaddr_in server_addr;
    socklen_t server_addr_size = sizeof(server_addr);
    int client_sock = accept(sock, (struct sockaddr *) &server_addr, &server_addr_size);

    if (client_sock == -1) {
        cout << "接收客户端连接失败。" << endl;
        closesocket(client_sock);
        return 1;
    }

    // 等待一个数据传回
    char buffer[BUFFER_SIZE]{0};
    int bytesRead = recv(client_sock, buffer, sizeof(buffer), 0);
    if (bytesRead <= 0) {
        cout << "与客户端的连接断开或出错。" << endl;
        return 1;
    }

    cout << buffer << endl;

    // 关闭
    closesocket(client_sock);
    closesocket(sock);
    WSACleanup();

    return 0;
}
// Client.cpp
#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>

#pragma comment(lib, "ws2_32.lib") // windows下socket编程需要引入该库

#define DEFAULT_PORT 5270 // 服务器端口
#define DEFAULT_ADDR "127.0.0.1" // 服务器地址
#define BUFFER_SIZE 1500 // 最大传输长度

using namespace std;

int main(){
    // 初始化winsock库
    WORD wVersionRequested;
    WSADATA wsaData;
    wVersionRequested = MAKEWORD(2, 2);
    int err = WSAStartup(wVersionRequested, &wsaData);
    if (err != 0) {
        printf("初始化winsock失败,错误代码: %d\n", err);
        return 1;
    }

    // 创建socket
    int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (sock == -1) {
        cout << "创建socket失败!" << endl;
        return 1;
    }

    // 配置socket信息
    struct sockaddr_in client_addr;
    client_addr.sin_family = AF_INET;
    client_addr.sin_port = htons(DEFAULT_PORT);
    client_addr.sin_addr.s_addr = inet_addr(DEFAULT_ADDR);

    // 连接服务器
    int connect_res = connect(sock, (struct sockaddr *) &client_addr, sizeof(client_addr));
    if (connect_res == -1) {
        cout << "连接服务器失败!可能服务器未开启。" << endl;
        return 1;
    }

    // 向服务器发送数据
    char buffer[BUFFER_SIZE]{0};
    cin >> buffer;

    if (send(sock, buffer, strlen(buffer), 0) == -1) {
        cout << "无法向服务器发送消息。" << endl;
        return 0;
    }

    cout << "发送成功!" << endl;

    // 关闭
    closesocket(connect_res);
    closesocket(sock);
    WSACleanup();

    return 0;
}

运行截图

image-20240104235725884

全双工通信-一对一

一服务器,一客户端

介绍全双工之前,先介绍多线程,由于recv和send不是并行的,不能同步发生作用,也就是存在阻塞,因此可以考虑引入多线程的概念(也有其他方法解决该问题,但是多线程更为方便),将发生与接受分离开,使用一个单独的线程进行管理,即可实现全双工通信,只需在单工通信的基础上修改一部分代码即可。流程图如下(这个图不是非常准确,因为在我的程序中send和recv是并行的,只不过懒得重绘,从网络中拿了一个类似的)

socket过程

首先对server.cpp中的如下片段进行修改

// 等待一个数据传回
char buffer[BUFFER_SIZE]{0};
int bytesRead = recv(client_sock, buffer, sizeof(buffer), 0);
if (bytesRead <= 0) {
    cout << "与客户端的连接断开或出错。" << endl;
    return 1;
}

改为

vector<thread> clientThreads;
clientThreads.emplace_back(ServerSend, client_sock); // 注意,这里的client_sock由connect()的返回值得出
clientThreads.emplace_back(ServerRecv, client_sock);

// 再在外部定义函数
void ServerSend(int client_sock) {
    char buffer[BUFFER_SIZE];
    while(true){
        memset(buffer, 0, sizeof(buffer));
        cin.getline(buffer, sizeof(buffer));
        getchar(); // 接受末尾回车符
        if(send(client_sock, buffer, sizeof(buffer), 0) == -1){
            cout << "无法向服务器发送消息。" << endl;
            break;
        }
        cout << "发送成功!" << endl;
    }
}

void ServerRecv(int client_sock) {
    char buffer[BUFFER_SIZE];
    while(true){
        memset(buffer, 0, sizeof(buffer));
        if(recv(client_sock, buffer, sizeof(buffer), 0) == -1){
            cout << "无法向服务器发送消息。" << endl;
            break;
        }
        cout << "Client: " << buffer << endl;
    }
}

Client.cpp也做相同修改,具体修改这里不指出,需要注意的点如下

clientThreads.emplace_back(ServerSend, sock); // 这里的sock由socket()的的返回值得出
clientThreads.emplace_back(ServerRecv, sock);

完整代码如下:

// Server.cpp
#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <thread>
#include <vector>

#pragma comment(lib, "ws2_32.lib") // windows下socket编程需要引入该库

#define DEFAULT_PORT 5270 // 服务器端口
#define DEFAULT_ADDR "127.0.0.1" // 服务器地址
#define BUFFER_SIZE 1500 // 最大传输长度

using namespace std;

vector<thread> clientThreads;

// 再在外部定义函数
void ServerSend(int sock) {
    char buffer[BUFFER_SIZE];
    while(true){
        memset(buffer, 0, sizeof(buffer));
        cin.getline(buffer, sizeof(buffer));
        if(send(sock, buffer, sizeof(buffer), 0) == -1){
            cout << "无法向客户端发送消息。" << endl;
            break;
        }
        cout << "发送成功!" << endl;
    }
}

void ServerRecv(int sock) {
    char buffer[BUFFER_SIZE];
    while(true){
        memset(buffer, 0, sizeof(buffer));
        int bytesRead = recv(sock, buffer, sizeof(buffer), 0);
        if (bytesRead <= 0) {
            cout << "与客户端的连接断开或出错。" << endl;
            break;
        }
        cout << "Client: " << buffer << endl;
    }
}

int main(){
    system("chcp 65001");
    // 初始化winsock库
    WORD wVersionRequested;
    WSADATA wsaData;
    wVersionRequested = MAKEWORD(2, 2);
    int err = WSAStartup(wVersionRequested, &wsaData);
    if (err != 0) {
        printf("初始化winsock失败,错误代码: %d\n", err);
        return 1;
    }

    // 创建socket
    int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (sock == -1) {
        cout << "创建socket失败!" << endl;
        return 1;
    }

    // 配置socket信息
    struct sockaddr_in serv_addr;
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(DEFAULT_PORT);
    serv_addr.sin_addr.s_addr = inet_addr(DEFAULT_ADDR);

    // 绑定ip与端口
    int bind_res = bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
    if (bind_res == -1) {
        cout << "绑定IP和端口号失败!" << endl;
        return 1;
    }

    // 监听 ip:port
    listen(sock, 20);

    // 等待一个链接
    struct sockaddr_in server_addr;
    socklen_t server_addr_size = sizeof(server_addr);
    int client_sock = accept(sock, (struct sockaddr *) &server_addr, &server_addr_size);

    if (client_sock == -1) {
        cout << "接收客户端连接失败。" << endl;
        closesocket(client_sock);
        return 1;
    }

    clientThreads.emplace_back(ServerSend, client_sock);
    clientThreads.emplace_back(ServerRecv, client_sock);

    // 结束进程
    for(auto & clientThread : clientThreads){
        clientThread.join();
    }

    // 关闭
    closesocket(client_sock);
    closesocket(sock);
    WSACleanup();

    system("pause");
    return 0;
}
// Client.cpp
#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <thread>
#include <vector>

#pragma comment(lib, "ws2_32.lib") // windows下socket编程需要引入该库

#define DEFAULT_PORT 5270 // 服务器端口
#define DEFAULT_ADDR "127.0.0.1" // 服务器地址
#define BUFFER_SIZE 1500 // 最大传输长度

using namespace std;

vector<thread> clientThreads;

// 再在外部定义函数
void ClientSend(int sock) {
    char buffer[BUFFER_SIZE];
    while(true){
        memset(buffer, 0, sizeof(buffer));
        cin.getline(buffer, sizeof(buffer));
        if(send(sock, buffer, sizeof(buffer), 0) == -1){
            cout << "无法向服务器发送消息。" << endl;
            break;
        }
        cout << "发送成功!" << endl;
    }
}

void ClientRecv(int sock) {
    char buffer[BUFFER_SIZE];
    while(true){
        memset(buffer, 0, sizeof(buffer));
        int bytesRead = recv(sock, buffer, sizeof(buffer), 0);
        if (bytesRead <= 0) {
            cout << "与服务器的连接断开或出错。" << endl;
            break;
        }

        cout << "Server: " << buffer << endl;
    }
}

int main(){
    system("chcp 65001");
    // 初始化winsock库
    WORD wVersionRequested;
    WSADATA wsaData;
    wVersionRequested = MAKEWORD(2, 2);
    int err = WSAStartup(wVersionRequested, &wsaData);
    if (err != 0) {
        printf("初始化winsock失败,错误代码: %d\n", err);
        return 1;
    }

    // 创建socket
    int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (sock == -1) {
        cout << "创建socket失败!" << endl;
        return 1;
    }

    // 配置socket信息
    struct sockaddr_in client_addr;
    client_addr.sin_family = AF_INET;
    client_addr.sin_port = htons(DEFAULT_PORT);
    client_addr.sin_addr.s_addr = inet_addr(DEFAULT_ADDR);

    // 连接服务器
    int connect_res = connect(sock, (struct sockaddr *) &client_addr, sizeof(client_addr));
    if (connect_res == -1) {
        cout << "连接服务器失败!可能服务器未开启。" << endl;
        return 1;
    }

    clientThreads.emplace_back(ClientSend, sock);
    clientThreads.emplace_back(ClientRecv, sock);

    // 结束进程
    for(auto & clientThread : clientThreads){
        clientThread.join();
    }

    // 关闭
    closesocket(connect_res);
    closesocket(sock);
    WSACleanup();

    system("pause");
    return 0;
}

运行截图

image-20240106004131479

全双工通信-一对多(聊天室)

一服务器,多客户端,说白了也就是一个聊天室

相信你看到这里,已经对socket通信的整个流程比较熟悉了,因此这里不过多的介绍,只简单提供一下思路,具体的代码将会在之后的GitHub链接中给出。

实现该功能也非常简单,我们只要对accept()套上一个循环,这样就可以对每一个connect()请求都响应,同时connect请求也会为每一个客户端分配一个唯一的client_sock,但新的问题又出现了,我们不能为每一个客户端都设置一个单独的send/recv线程,这回极大的消耗服务器资源,因此更好的做法,是使用一个总管函数,用来管理消息的发送(广播消息让每一个客户端都能收到其他客户端的消息)

全双工逻辑.png

为了区分不同的客户端,我们可以使用一个结构体来存储客户端的名称,以及他发送的消息,使用专业的称呼,即TCP报文(UDP的称之为数据报)

// 客户端报文
struct ClientMsg {
    in_addr ipAddress; // 存储客户端的IP地址
    unsigned int time, online; // 存储时间戳,在线人数
    char SourceName[20], Message[1024]; // 存储发送者姓名,发送的消息
};

同时,socket的数据传输是面向字节流的,因此我们不能直接将ClientMsg报文传输,而是在发送前将其编码为字节流,接收后将其解码为结构化的数据。编码和解码的方式也非常简单,直接字符填充即可,如下所示:

// 将结构体序列化为字节流
void decode(char *buffer, ClientMsg &obj) {
    memcpy(buffer, &obj, sizeof(ClientMsg));
}

// 将字节流反序列化为结构体
void encode(char *buffer, ClientMsg &obj) {
    memcpy(&obj, buffer, sizeof(ClientMsg));
}

使用单独的handle处理每个connect()请求,也就是使用单独的线程与每个客户端进行通信

// 单独处理每个加入的客户端
void handleClient(int client_sock, const ClientInfo &cInfo) {
    // 将新连接的客户端套接字加入全局列表
    connectedClients.insert(client_sock);
    cout << "[" << timeToTimeStr(time(NULL)) << " "
         << inet_ntoa(cInfo.ipAddress) << ":" << cInfo.port << " "
         << cInfo.userName << "] 加入了连接。当前在线:"
         << connectedClients.size() << " 人" << endl;

    auto broadcast = [&](int x){
        // 广播新用户 [加入 or 离开] 聊天室通知
        ClientMsg joinLink{cInfo.ipAddress, (unsigned int) time(NULL), (unsigned int) connectedClients.size()};
        memcpy(joinLink.SourceName, cInfo.userName.c_str(), sizeof(joinLink.SourceName));
        string str = string((x == 0) ? "] 加入" : "] 离开") + "了聊天室。当前在线:";
        string strTemp = str + to_string(connectedClients.size() - x) + " 人";
        memcpy(joinLink.Message, strTemp.c_str(), strlen(strTemp.c_str()));
        broadcastMessage(joinLink, client_sock);
    };

    broadcast(0);

    char buffer[BUFFER_SIZE];

    while (true) {
        memset(buffer, 0, sizeof(buffer));
        int bytesRead = recv(client_sock, buffer, sizeof(buffer), 0);
        // 连接断开
        if (bytesRead <= 0) {
            cout << "[" << timeToTimeStr(time(NULL)) << " "
                 << inet_ntoa(cInfo.ipAddress) << ":" << cInfo.port << " "
                 << cInfo.userName << "] 断开了连接。当前在线:"
                 << connectedClients.size() - 1 << " 人" << endl;

            broadcast(1);
            break;
        }

        ClientMsg cmsg;
        encode(buffer, cmsg);
        memset(cmsg.SourceName, 0, sizeof(cmsg.SourceName));

        cout << "[" << timeToTimeStr(cmsg.time) << " "
             << inet_ntoa(cInfo.ipAddress) << ":" << cInfo.port << " "
             << cInfo.userName << cmsg.Message << endl;

        // 待广播包
        cmsg.ipAddress = cInfo.ipAddress;
        cmsg.online = connectedClients.size();
        memcpy(cmsg.SourceName, cInfo.userName.c_str(), cInfo.userName.length());

        // 广播消息给所有客户端
        broadcastMessage(cmsg, client_sock);
    }

    // 删除当前套接字
    connectedClients.erase(client_sock);
    // 关闭连接
    closesocket(client_sock);
}

同时,服务器也可以发送广播消息给客户端,使用如下线程解决

// 服务器主动发送广播报文
void ServerBroadcastSend(void) {
    char buffer[BUFFER_SIZE];
    while(true) {
        memset(buffer, 0, sizeof(buffer));
        cin.getline(buffer, sizeof(buffer));
//        getchar(); // 接受遗留的换行符

        in_addr serverAddr;
        inet_pton(AF_INET, DEFAULT_ADDR, &serverAddr);
        ClientMsg sendBag{serverAddr, (unsigned long)time(NULL),
                          (unsigned long)connectedClients.size(), "Server]: "};
        memcpy(sendBag.Message, buffer, strlen(buffer));
        memset(buffer, 0, sizeof(buffer));
        decode(buffer, sendBag);

        for(int clientSock : connectedClients){
            send(clientSock, buffer, sizeof(buffer), 0);
        }
    }
}

项目完整代码如下:

// Server.cpp
#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <thread>
#include <string>
#include <vector>
#include <algorithm>
#include <unordered_set>
#include <time.h>

#pragma comment(lib, "ws2_32.lib")

#define DEFAULT_PORT 5270
#define DEFAULT_ADDR "127.0.0.1"
#define BUFFER_SIZE 1500

using namespace std;

// client 标记
struct ClientInfo {
    string userName;
    in_addr ipAddress;
    int port;
};

// 客户端报文
struct ClientMsg {
    in_addr ipAddress;
    unsigned int time, online;
    char SourceName[20], Message[1024];
};

// 将结构体序列化为字节流
void decode(char *buffer, ClientMsg &obj) {
    memcpy(buffer, &obj, sizeof(ClientMsg));
}

// 将字节流反序列化为结构体
void encode(char *buffer, ClientMsg &obj) {
    memcpy(&obj, buffer, sizeof(ClientMsg));
}

// 时间戳转时间字符串
string timeToTimeStr(time_t timeStamp) {
    struct tm *timeinfo = nullptr;
    char buffer[80];
    timeinfo = localtime(&timeStamp);
    strftime(buffer, 80, "%H:%M:%S", timeinfo);
    return string(buffer);
}

vector<thread> clientThreads; // 为每个用户分配一个线程
unordered_set<int> connectedClients; // 存储所有连接的客户端套接字

// 广播消息给所有客户端
void broadcastMessage(ClientMsg &message, int currentSock) {
    char buffer[BUFFER_SIZE]{0};
    decode(buffer, message);
    for (int clientSock: connectedClients) {
        if (clientSock != currentSock) {
            send(clientSock, buffer, sizeof(buffer), 0);
        }
    }
}

// 服务器主动发送广播报文
void ServerBroadcastSend(void) {
    char buffer[BUFFER_SIZE];
    while(true) {
        memset(buffer, 0, sizeof(buffer));
        cin.getline(buffer, sizeof(buffer));
//        getchar(); // 接受遗留的换行符

        in_addr serverAddr;
        inet_pton(AF_INET, DEFAULT_ADDR, &serverAddr);
        ClientMsg sendBag{serverAddr, (unsigned long)time(NULL),
                          (unsigned long)connectedClients.size(), "Server]: "};
        memcpy(sendBag.Message, buffer, strlen(buffer));
        memset(buffer, 0, sizeof(buffer));
        decode(buffer, sendBag);

        for(int clientSock : connectedClients){
            send(clientSock, buffer, sizeof(buffer), 0);
        }
    }
}

// 单独处理每个加入的客户端
void handleClient(int client_sock, const ClientInfo &cInfo) {
    // 将新连接的客户端套接字加入全局列表
    connectedClients.insert(client_sock);
    cout << "[" << timeToTimeStr(time(NULL)) << " "
         << inet_ntoa(cInfo.ipAddress) << ":" << cInfo.port << " "
         << cInfo.userName << "] 加入了连接。当前在线:"
         << connectedClients.size() << " 人" << endl;

    auto broadcast = [&](int x){
        // 广播新用户 [加入 or 离开] 聊天室通知
        ClientMsg joinLink{cInfo.ipAddress, (unsigned int) time(NULL), (unsigned int) connectedClients.size()};
        memcpy(joinLink.SourceName, cInfo.userName.c_str(), sizeof(joinLink.SourceName));
        string str = string((x == 0) ? "] 加入" : "] 离开") + "了聊天室。当前在线:";
        string strTemp = str + to_string(connectedClients.size() - x) + " 人";
        memcpy(joinLink.Message, strTemp.c_str(), strlen(strTemp.c_str()));
        broadcastMessage(joinLink, client_sock);
    };

    broadcast(0);

    char buffer[BUFFER_SIZE];

    while (true) {
        memset(buffer, 0, sizeof(buffer));
        int bytesRead = recv(client_sock, buffer, sizeof(buffer), 0);
        // 连接断开
        if (bytesRead <= 0) {
            cout << "[" << timeToTimeStr(time(NULL)) << " "
                 << inet_ntoa(cInfo.ipAddress) << ":" << cInfo.port << " "
                 << cInfo.userName << "] 断开了连接。当前在线:"
                 << connectedClients.size() - 1 << " 人" << endl;

            broadcast(1);
            break;
        }

        ClientMsg cmsg;
        encode(buffer, cmsg);
        memset(cmsg.SourceName, 0, sizeof(cmsg.SourceName));

        cout << "[" << timeToTimeStr(cmsg.time) << " "
             << inet_ntoa(cInfo.ipAddress) << ":" << cInfo.port << " "
             << cInfo.userName << cmsg.Message << endl;

        // 待广播包
        cmsg.ipAddress = cInfo.ipAddress;
        cmsg.online = connectedClients.size();
        memcpy(cmsg.SourceName, cInfo.userName.c_str(), cInfo.userName.length());

        // 广播消息给所有客户端
        broadcastMessage(cmsg, client_sock);
    }

    // 删除当前套接字
    connectedClients.erase(client_sock);
    // 关闭连接
    closesocket(client_sock);
}


int main() {
    system("chcp 65001");

    // 定义变量->获取winsock版本->加载winsock库->初始化->创建套接字->设置套接字选项->特定操作->关闭套接字->卸载winsock库

    // 初始化winsock库
    WORD wVersionRequested;
    WSADATA wsaData;
    wVersionRequested = MAKEWORD(2, 2);
    int err = WSAStartup(wVersionRequested, &wsaData);
    if (err != 0) {
        printf("初始化winsock失败,错误代码: %d\n", err);
        return 1;
    }

    // 创建socket
    int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (sock == -1) {
        cout << "创建socket失败!" << endl;
        return 0;
    }

    // 配置socket信息
    struct sockaddr_in serv_addr;
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(DEFAULT_PORT);
    serv_addr.sin_addr.s_addr = inet_addr(DEFAULT_ADDR);

    // 绑定ip与端口
    int bind_res = bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
    if (bind_res == -1) {
        cout << "绑定IP和端口号失败!" << endl;
        return 0;
    }

    // 监听 ip:port
    cout << "服务器启动,开始监听5270端口。" << endl;
    listen(sock, 20);

    thread ServerSend(ServerBroadcastSend); // 服务器广播报文

    while (true) {
        struct sockaddr_in server_addr;
        socklen_t server_addr_size = sizeof(server_addr);
        int client_sock = accept(sock, (struct sockaddr *) &server_addr, &server_addr_size);

        if (client_sock == -1) {
            cout << "接收客户端连接失败。" << endl;
            closesocket(client_sock);
            break;
        }

        char userName[BUFFER_SIZE]{0};
        int bytesRead = recv(client_sock, userName, sizeof(userName), 0);
        if (bytesRead <= 0) {
            cout << "与客户端的连接断开或出错。" << endl;
            break;
        }
        // 保存客户端信息
        ClientInfo cInfo{userName, server_addr.sin_addr, ntohs(server_addr.sin_port)};
        // 启动一个新的线程处理客户端连接
        clientThreads.emplace_back(handleClient, client_sock, cInfo);
    }

    // 等待所有线程结束
    for (auto &thread: clientThreads) {
        thread.join();
    }

    ServerSend.join();

    closesocket(sock);
    WSACleanup();

    return 0;
}
// Client.cpp
#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <thread>
#include <string>
#include <unistd.h>

#pragma comment(lib, "ws2_32.lib")

#define DEFAULT_PORT 5270
#define DEFAULT_ADDR "127.0.0.1"
#define BUFFER_SIZE 1500

using namespace std;

// 客户端报文
struct ClientMsg {
    in_addr ipAddress;
    unsigned int time, online;
    char SourceName[20], Message[1024];
};

// 将结构体序列化为字节流
void decode(char *buffer, ClientMsg &obj) {
    memcpy(buffer, &obj, sizeof(ClientMsg));
}

// 将字节流反序列化为结构体
void encode(char *buffer, ClientMsg &obj) {
    memcpy(&obj, buffer, sizeof(ClientMsg));
}

string timeToTimeStr(time_t timeStamp) {
    struct tm *timeinfo = nullptr;
    char buffer[80];
    timeinfo = localtime(&timeStamp);
    strftime(buffer, 80, "%H:%M:%S", timeinfo);
    return string(buffer);
}

// 接收线程
void receiveThread(int sock) {
    char buffer[BUFFER_SIZE];
    while (true) {
        memset(buffer, 0, sizeof(buffer));
        int bytesRead = recv(sock, buffer, sizeof(buffer), 0);
        if (bytesRead <= 0) {
            cout << "与服务器的连接断开或出错。" << endl;
            break;
        }

        ClientMsg cmsg;
        encode(buffer, cmsg);
        cout << "[" << timeToTimeStr(cmsg.time) << " "
             << inet_ntoa(cmsg.ipAddress) << " "
             << cmsg.SourceName << cmsg.Message << endl;
    }
}

// 发送线程
void sendThread(int sock) {
    char buffer[BUFFER_SIZE];
    ClientMsg cmsg;
    while (true) {
        cin.sync(); // 清空输入缓存
        // 清空缓存
        memset(buffer, 0, sizeof(buffer));
        memset(cmsg.SourceName, 0, sizeof(cmsg.SourceName));
        memset(cmsg.Message, 0, sizeof(cmsg.Message));

        cin.getline(buffer, sizeof(buffer));
//        getchar();

        if(strlen(buffer) == 0){
            cout << "禁止非空输入!" << endl;
            continue;
        }

        if (strcmp(buffer, "quit") == 0) {
            exit(0);
        }

        string userInput(string("]: ") + buffer);
        cmsg.time = time(NULL);
        memcpy(cmsg.Message, userInput.c_str(), userInput.length());

        decode(buffer, cmsg);
        if (send(sock, buffer, sizeof(buffer), 0) == -1) {
            cout << "无法向服务器发送消息。" << endl;
            break;
        } else {
            cout << "发送成功!" << endl;
        }
    }
}

int main() {
    system("chcp 65001");

    // 初始化winsock库
    WORD wVersionRequested;
    WSADATA wsaData;
    wVersionRequested = MAKEWORD(2, 2);
    int err = WSAStartup(wVersionRequested, &wsaData);
    if (err != 0) {
        printf("初始化winsock失败,错误代码: %d\n", err);
        return 1;
    }

    // 创建socket
    int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (sock == -1) {
        cout << "创建socket失败!" << endl;
        return 0;
    }

    // 配置socket信息
    struct sockaddr_in client_addr;
    client_addr.sin_family = AF_INET;
    client_addr.sin_port = htons(DEFAULT_PORT);
    client_addr.sin_addr.s_addr = inet_addr(DEFAULT_ADDR);

    int connect_res = connect(sock, (struct sockaddr *) &client_addr, sizeof(client_addr));
    if (connect_res == -1) {
        cout << "连接服务器失败!可能服务器未开启。" << endl;
        closesocket(connect_res);
        closesocket(sock);
        WSACleanup();
        system("pause");
        return 0;
    }

    cout << "\n请输入你的昵称:";
    char userName[BUFFER_SIZE];
    cin >> userName;

    if (send(sock, userName, strlen(userName), 0) == -1) {
        cout << "无法向服务器发送消息。" << endl;
        closesocket(connect_res);
        closesocket(sock);
        WSACleanup();
        return 0;
    }
    cout << "您好:" << userName << ",欢迎使用聊天室!" << endl;

    // 启动接收和发送线程
    thread receiveThreadObj(receiveThread, sock);
    thread sendThreadObj(sendThread, sock);

    // 等待线程结束
    receiveThreadObj.join();
    sendThreadObj.join();

    closesocket(connect_res);
    closesocket(sock);
    WSACleanup();

    return 0;
}

输出展示:

image-20240106220055147

真实服务器运行

如果你有服务器的话,那么你也完全可以在服务器上运行该项目,只需要将Server.cpp中的DEFAULT_ADDR修改为服务器的地址,并同时在服务器上设置防火墙对DEFAULT_PORT放行,如下图

1.拥有一个服务器

image-20240106221021803

2.放行端口

image-20240106221042095

3.在服务器上重编译Server.cpp,因为当前项目的环境是windows的,不能直接在ubuntu上运行,需要稍作修改,这并不麻烦,请自行搜索windows迁移ubuntu的方法,之后在ubutu上重编译在运行即可,如下图

image-20240106221406871

4.修改本地项目的DEFAULT_ADDR的地址为服务器的公网IP地址,重编译,在运行即可,当然也可以叫上你远方的小伙伴(不在同一局域网内)和你一同测试,如下图,其中一个伙伴在济南,而我在武汉,我将程序发给他之后,也能进行通信

image-20240106222104012

完整项目代码

DaYangtuo247/Cpp_socket_multithreading_project: C++ socket multithreading (github.com)

作者:WuQiling
文章链接:https://www.wqlblog.cn/c-socket-编程及多线程编程,聊天室实现/
文章采用 CC BY-NC-SA 4.0 协议进行许可,转载请遵循协议
暂无评论

发送评论 编辑评论


				
默认
贴吧
上一篇
下一篇