epoll
//server.cpp
// CXXFLAGS = -std=c++2a
#include <sys/epoll.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>
#include <thread>
#include <mutex>
#include <string>
#include <vector>
#define PRINT_LOG(fmt, ...) do \
{ \
? printf("%s:%d:%s: " fmt "\n", \
? ? ? __FILE__, __LINE__, __func__, ##__VA_ARGS__); \
} while (0) /* ; no trailing semicolon here */
#define PERROR(fmt, ...) do \
{ \
? printf("%s:%d:%s: %s. " fmt "\n", \
? ? ? __FILE__, __LINE__, __func__, strerror(errno), ##__VA_ARGS__); \
} while (0) /* ; no trailing semicolon here */
int efd;
std::vector<int> vfd;
std::mutex lock;
int time_to_write = 0;
int set_nonblock(int fd)
{
? int flags = fcntl(fd, F_GETFL);
? if (flags == -1) {
? ? PERROR("fcntl");
? ? return -1;
? }
? flags |= O_NONBLOCK;
? if (fcntl(fd, F_SETFL, flags) == -1) {
? ? PERROR("fcntl");
? ? return -1;
? }
? return 0;
}
void update_events()
{
? struct epoll_event ev;
? int ret = 0;
? int err = 0;
? for (;;){
? ? sleep(1); //test
? ? time_to_write = !time_to_write;
? ? std::lock_guard<std::mutex> guard(lock);
? ? // PRINT_LOG("vfd: %lu, time_to_write: %d", vfd.size(), time_to_write);
? ? for (auto fd : vfd){
? ? ? // PRINT_LOG("fd: %d, time_to_write: %d", fd, time_to_write);
? ? ? ev.events = EPOLLIN | EPOLLET;
? ? ? ev.data.fd = fd;
? ? ? if (time_to_write){
? ? ? ? ev.events = EPOLLOUT | EPOLLET;
? ? ? }
? ? ? ret = epoll_ctl(efd, EPOLL_CTL_MOD, fd, &ev);
? ? ? err = errno;
? ? ? if (err == EBADF){
? ? ? ? continue;
? ? ? }
? ? ? if (ret == -1) {
? ? ? ? PERROR("epoll_ctl");
? ? ? }
? ? }
? }
}
int main()
{
? struct epoll_event ev, events[10];
? int listenfd, connfd, nfds;
? /* set up listening socket */
? int port = 8000;
? int ret = 0;
? struct sockaddr_in my_addr, peer_addr;
? socklen_t addrlen = sizeof(peer_addr);
? int sockopt = 1;
? int seq = 0;
? memset(&my_addr, 0, sizeof(my_addr));
? my_addr.sin_port = htons(port);
? my_addr.sin_addr.s_addr = htons(INADDR_ANY);
? my_addr.sin_family = AF_INET;
? listenfd = socket(AF_INET, SOCK_STREAM, 0);
? if (listenfd == -1) {
? ? PERROR("socket");
? ? return -1;
? }
? ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &sockopt,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? sizeof(sockopt));
? if (ret == -1){
? ? PERROR("setsockopt");
? ? close(listenfd);
? ? return -1;
? }
? ret = bind(listenfd, (struct sockaddr*)&my_addr, sizeof(my_addr));
? if (ret == -1) {
? ? PERROR("bind");
? ? close(listenfd);
? ? return -1;
? }
? ret = listen(listenfd, SOMAXCONN);
? if (ret == -1) {
? ? PERROR("listen");
? ? close(listenfd);
? ? return -1;
? }
? PRINT_LOG("listen on port: %d", port);
? signal(SIGPIPE, SIG_IGN);
? /* set up epoll */
? efd = epoll_create1(0);
? if (efd == -1) {
? ? PERROR("epoll_create1");
? ? return -1;
? }
? ev.events = EPOLLIN;
? ev.data.fd = listenfd;
? if (epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &ev) == -1) {
? ? PERROR("epoll_ctl");
? ? return -1;
? }
? PRINT_LOG("waiting for data input from clients");
? std::thread(update_events).detach();
? for (;;) {
? ? nfds = epoll_wait(efd, events, sizeof(events) / sizeof(events[0]),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 1000); //1 sec
? ? // PRINT_LOG("epoll_wait");
? ? if (nfds == -1) {
? ? ? PERROR("epoll_wait");
? ? ? for (int i = 0; i != sizeof(events) / sizeof(events[0]); i++){
? ? ? ? PRINT_LOG();
? ? ? ? close(events[i].data.fd);
? ? ? }
? ? ? return -1;
? ? }
? ? for (int i = 0; i < nfds; i++) {
? ? ? if (events[i].data.fd == listenfd) {
? ? ? ? connfd = accept(listenfd,
? ? ? ? ? ? ? ? (struct sockaddr *) &peer_addr, &addrlen);
? ? ? ? if (connfd == -1) {
? ? ? ? ? PERROR("accept");
? ? ? ? ? continue;
? ? ? ? }
? ? ? ? if (set_nonblock(connfd)){
? ? ? ? ? PRINT_LOG("set_nonblock failed");
? ? ? ? ? continue;
? ? ? ? }
? ? ? ? PRINT_LOG("accept client %s:%u", inet_ntoa(peer_addr.sin_addr),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? peer_addr.sin_port);
? ? ? ? ev.events = EPOLLIN | EPOLLET;
? ? ? ? ev.data.fd = connfd;
? ? ? ? if (epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &ev) == -1) {
? ? ? ? ? PERROR("epoll_ctl");
? ? ? ? }
? ? ? ? {
? ? ? ? ? std::lock_guard<std::mutex> guard(lock);
? ? ? ? ? vfd.push_back(connfd);
? ? ? ? }
? ? ? } else if (events[i].events & (EPOLLHUP | EPOLLERR)) {
? ? ? ? int fd = events[i].data.fd;
? ? ? ? close(fd);
? ? ? ? {
? ? ? ? ? std::lock_guard<std::mutex> guard(lock);
? ? ? ? ? std::erase(vfd, fd); // -std=c++2a
? ? ? ? }
? ? ? ? PRINT_LOG("EPOLLHUP | EPOLLERR");
? ? ? } else if (events[i].events & EPOLLIN) {
? ? ? ? int fd = events[i].data.fd;
? ? ? ? enum {count = 1024};
? ? ? ? char buf[count + 1] = {'\0'}, *p = buf;
? ? ? ? size_t len = 0;
? ? ? ? while (len < count){
? ? ? ? ? ret = read(fd, p, count - len);
? ? ? ? ? int err = errno;
? ? ? ? ? if (ret > 0){
? ? ? ? ? ? len += ret;
? ? ? ? ? ? p += ret;
? ? ? ? ? }
? ? ? ? ? if (ret == 0){
? ? ? ? ? ? close(fd);
? ? ? ? ? ? {
? ? ? ? ? ? ? std::lock_guard<std::mutex> guard(lock);
? ? ? ? ? ? ? std::erase(vfd, fd);
? ? ? ? ? ? }
? ? ? ? ? ? break;
? ? ? ? ? }
? ? ? ? ? if (err == EAGAIN || err == EWOULDBLOCK || err == EPIPE ||
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? err == ECONNRESET){
? ? ? ? ? ? break;
? ? ? ? ? }
? ? ? ? ? if (err == EINTR){
? ? ? ? ? ? continue;
? ? ? ? ? }
? ? ? ? ? if (err != 0){
? ? ? ? ? ? PRINT_LOG("ret: %d, err: %d, %s", ret, err,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? strerror(err));
? ? ? ? ? }
? ? ? ? }
? ? ? ? if (len > 0) {
? ? ? ? ? PRINT_LOG("%s", buf);
? ? ? ? }
? ? ? } else if (events[i].events & EPOLLOUT){
? ? ? ? int fd = events[i].data.fd;
? ? ? ? enum {count = 1024};
? ? ? ? char buf[count + 1] = {'\0'}, *p = buf;
? ? ? ? std::string msg = "hello client " + std::to_string(seq++);
? ? ? ? strncpy(buf, msg.c_str(), count);
? ? ? ? size_t len = 0;
? ? ? ? while (len < count){
? ? ? ? ? ret = write(fd, p, count - len);
? ? ? ? ? int err = errno;
? ? ? ? ? if (ret > 0){
? ? ? ? ? ? len += ret;
? ? ? ? ? ? p += ret;
? ? ? ? ? }
? ? ? ? ? if (ret == 0){
? ? ? ? ? ? close(fd);
? ? ? ? ? ? {
? ? ? ? ? ? ? std::lock_guard<std::mutex> guard(lock);
? ? ? ? ? ? ? std::erase(vfd, fd);
? ? ? ? ? ? }
? ? ? ? ? ? break;
? ? ? ? ? }
? ? ? ? ? if (err == EAGAIN || err == EWOULDBLOCK || err == EPIPE ||
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? err == ECONNRESET){
? ? ? ? ? ? break;
? ? ? ? ? }
? ? ? ? ? if (err == EINTR){
? ? ? ? ? ? continue;
? ? ? ? ? }
? ? ? ? ? if (err != 0){
? ? ? ? ? ? PRINT_LOG("ret: %d, err: %d, %s", ret, err,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? strerror(err));
? ? ? ? ? }
? ? ? ? }
? ? ? } else {
? ? ? ? PRINT_LOG("unknown event");
? ? ? ? int fd = events[i].data.fd;
? ? ? ? close(fd);
? ? ? ? {
? ? ? ? ? std::lock_guard<std::mutex> guard(lock);
? ? ? ? ? std::erase(vfd, fd);
? ? ? ? }
? ? ? }
? ? }
? }
? return 0;
}
---
//client.cpp
#include <sys/socket.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <signal.h>
#include <string>
#define PRINT_LOG(fmt, ...) do \
{ \
? printf("%s:%d:%s: " fmt "\n", \
? ? ? __FILE__, __LINE__, __func__, ##__VA_ARGS__); \
} while (0) /* ; no trailing semicolon here */
#define PERROR(fmt, ...) do \
{ \
? printf("%s:%d:%s: %s. " fmt "\n", \
? ? ? __FILE__, __LINE__, __func__, strerror(errno), ##__VA_ARGS__); \
} while (0) /* ; no trailing semicolon here */
int set_nonblock(int fd)
{
? int flags = fcntl(fd, F_GETFL);
? if (flags == -1) {
? ? PERROR("fcntl");
? ? return -1;
? }
? flags |= O_NONBLOCK;
? if (fcntl(fd, F_SETFL, flags) == -1) {
? ? PERROR("fcntl");
? ? return -1;
? }
? return 0;
}
int main(int argc, char *argv[])
{
? int fd;
? int ret;
? struct sockaddr_in addr;
? const char *host;
? int port;
? int seq = 0;
? if (argc != 4) {
? ? PRINT_LOG("Usage: %s <host> <port> <msg>\n", argv[0]);
? ? return 1;
? }
? host = argv[1];
? port = atoi(argv[2]);
? fd = socket(AF_INET, SOCK_STREAM, 0);
? if (fd == -1){
? ? PERROR("socket");
? ? return -1;
? }
? memset(&addr, 0, sizeof(addr));
? addr.sin_port = htons(port);
? addr.sin_family = AF_INET;
? ret = inet_aton(host, (struct in_addr*)&addr.sin_addr.s_addr);
? if (ret == 0){
? ? PRINT_LOG("inet_aton failed");
? ? return -1;
? }
? ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr));
? if (ret == -1){
? ? PERROR("connect");
? ? close(fd);
? ? return -1;
? }
? PRINT_LOG("connect to server %s:%d", host, port);
? set_nonblock(fd);
? // signal(SIGPIPE, SIG_IGN);
? for(;;){
? ? //write
? ? sleep(1); //test
? ? std::string msg = argv[3] + std::to_string(seq++);
? ? ret = write(fd, msg.c_str(), msg.size()); //
? ? //read
? ? enum {count = 1024};
? ? char buf[count + 1] = {'\0'};
? ? ret = read(fd, buf, count); //
? ? if (ret > 0){
? ? ? PRINT_LOG("%s", buf);
? ? }
? }
? ret = close(fd);
? return 0;
}