代码

buffer.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#ifndef BLOCKQUEUE_H
#define BLOCKQUEUE_H

#include <deque>
#include <condition_variable>
#include <mutex>
#include <sys/time.h>
using namespace std;

template<typename T>
class BlockQueue {
public:
explicit BlockQueue(size_t maxsize = 1000);
~BlockQueue();

bool empty();
bool full();
void push_back(const T& item);
void push_front(const T& item);
bool pop(T &item);
bool pop(T &item, int timeout);
void clear();
T front();
T back();
size_t capacity();
size_t size();

void flush();
void Close();
private:
char* BeginPtr_(); // buffer开头
const char* BeginPtr_() const;
void MakeSpace_(size_t len);

std::vector<char> buffer_;
std::atomic<std::size_t> readPos_; // 读的下标
std::atomic<std::size_t> writePos_; // 写的下标
};

buffer.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#include "buffer.h"

//init vector<char>
Buffer::Buffer(int initBuffSize) : buffer_(initBuffSize), readPos_(0), writePos_(0) {}

//get size of write: size of buffer - writePos_
size_t Buffer::WritableBytes() const {
return buffer_.size() - writePos_;
}

//get size of read: size of write - size of read
size_t Buffer::ReadbleBytes() const {
return writePos_ - readPos_;
}

//get already used size
size_t Buffer::PrependableBytes() const {
return readPos_;
}

const char* Buffer::Peek() const {
return &buffer_[readPos_];
}

//ensure the length of write
void Buffer::EnsureWriteable(size_t len) {
if(len > WritableBytes()) {
MakeSpace_(len);
}
assert(len <= WritableBytes());
}

//move forward the writePos_
void Buffer::HasWritten(size_t len) {
writePos_ += len;
}

//move forward the readPos_
void Buffer::Retrieve(size_t len) {
readPos_ += len;
}

//read until the end
void Buffer::RetrieveUntil(const char* end) {
assert(Peek() <= end );
Retrieve(end - Peek()); //end - the ptr of read
}

//zero the buffer
void Buffer::RetrieveAll() {
bzero(&buffer_[0], buffer_.size());
readPos_ = writePos_ = 0;
}

//get the last part of str
std::string Buffer::RetrieveAllToStr() {
std::string str(Peek(), ReadbleBytes());
RetrieveAll();
return str;
}

//get the ptr of write
const char* Buffer::BeginWriteConst() const {
return &buffer_[writePos_];
}

char *Buffer::BeginWrite() {
return &buffer_[writePos_];
}

//append the str to buffer
void Buffer::Append(const char* str, size_t len) {
assert(str);
EnsureWriteable(len);
std::copy(str, str + len, BeginWrite());
HasWritten(len);
}

void Buffer::Append(const std::string& str) {
Append(str.c_str(), str.size());
}

void Append(const void* data, size_t len) {
Append(static_cast<const char*>(data), len);
}

void Append(const Buffer& buff) {
Append(buff.Peek(), buff.ReadbleBytes());
}

//write the content of fd to the buffer
ssize_t Buffer::ReadFd(int fd, int* Errno) {
char buff[65535]; //stack area
struct iovec iov[2];
size_t writeable = WritableBytes();

iov[0].iov_base = BeginWrite();
iov[0].iov_len = writeable;
iov[1].iov_base = buff;
iov[1].iov_len = sizeof(buff);

ssize_t len = readv(fd, iov, 2);
if(len < 0) {
*Errno = errno;
} else if(static_cast<size_t>(len) <= writeable) {
writePos_ += len;
} else {
writePos_ = buffer_.size();
Append(buff, static_cast<size_t>(len - writeable));
}
return len;
}

ssize_t Buffer::WriteFd(int fd, int* Errno) {
ssize_t len = write(fd, Peek(), ReadbleBytes());
if(len < 0) {
*Errno = errno;
return len;
}
Retrieve(len);
return len;
}

char* Buffer::BeginPtr_() {
return &buffer_[0];
}

const char* Buffer::BeginPtr_() const{
return &buffer_[0];
}

//extend the buffer
void Buffer::MakeSpace_(size_t len) {
if(WritableBytes() + PrependableBytes() < len) {
buffer_.resize(writePos_ + len +1);
} else {
size_t readable = ReadbleBytes();
std::copy(BeginPtr_() + readPos_, BeginPtr_() + writePos_, BeginPtr_());
readPos_ = 0;
writePos_ = readable;
assert(readable = ReadbleBytes());
}
}

设计原理简述

首先,buffer的设计采用NIO的后端模型,而为了实现NIO的模型buffer的设计就存在了input buffer和output buffer,设计output buffer的原因为在实现非阻塞时由于TCP受到window的控制如果假设有100的数据只传输了80的数据还有20的数据未传输时就必须连接到下次传输之前,等待POLLOUT事件一起传输,而设计input buffer的原因就是为了确保接受的消息完整。
Buffer的设计是一个与queue类似拥有read index和write index的size可变的连续的缓冲区(具体设计参考源码)

Buffer::readFd()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ssize_t Buffer::ReadFd(int fd, int* Errno) {
char buff[65535]; // 栈区
struct iovec iov[2];
size_t writeable = WritableBytes(); // 先记录能写多少
// 分散读, 保证数据全部读完
iov[0].iov_base = BeginWrite();
iov[0].iov_len = writeable;
iov[1].iov_base = buff;
iov[1].iov_len = sizeof(buff);

ssize_t len = readv(fd, iov, 2);
if(len < 0) {
*Errno = errno;
} else if(static_cast<size_t>(len) <= writeable) { // 若len小于writable,说明写区可以容纳len
writePos_ += len; // 直接移动写下标
} else {
writePos_ = buffer_.size(); // 写区写满了,下标移到最后
Append(buff, static_cast<size_t>(len - writeable)); // 剩余的长度
}
return len;
}

readFd()实现了一个数据如果能够写入缓冲区则写入缓冲区,如果缓冲区已满则写入一个栈区。
在具体实现上由于buffer是使用vector会动态的增长空间,同时还可以在同一输入中拓展到最大的buffer区

思考

这里同样用了一个相似的例子展示buffer的基本过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include <stdio.h>

#include <iostream>

#include <cstring>

#include <vector>

#include <atomic>

#include <unistd.h>



#define max 1024



class Buffer

{

public:

    Buffer();

    bool write_();

    bool read_();

    const char *writeptr();

    size_t writeable();



private:

    std::vector<char> buffer_;

    std::atomic<size_t> writepos_;

    std::atomic<size_t> readpos_;

};



Buffer::Buffer()

{

    readpos_ = writepos_ = 0;

}



const char *Buffer::writeptr()

{

    return &buffer_[writepos_];

}



bool Buffer::write_()

{

    int retlen = read(0, &buffer_, max);

    writepos_ += retlen;

    return retlen;

}



bool Buffer::read_()

{

    int retlen = write(1, &buffer_, writepos_);

    return retlen;

}



size_t Buffer ::writeable()

{

    return writepos_ - readpos_;

}



int main()

{

    Buffer bu;

    bu.write_();

    printf("-----------------------------------\n");

    bu.read_();

    return 0;

}
1
2
3
4
5
6
7
  const char* Buffer::BeginWriteConst() const {
return &buffer_[writePos_];
}

char *Buffer::BeginWrite() {
return &buffer_[writePos_];
}

注意到如上代码对于函数const的修饰可以理解为const修饰的函数只有可读权限对于成员变量不能修改,例子如下

1
2
3
4
5
const Buffer constBuffer;
const char* readOnlyData = constBuffer.BeginWriteConst(); // 可以调用 const 成员函数

Buffer nonConstBuffer;
char* readWriteData = nonConstBuffer.BeginWrite(); // 可以调用非 const 成员函数

为什么要使用static_cast<const char*>(data)中为什么要使用static_cast转换而不是用const char* (data)
我在网上搜索一番后发现是因为这是一种新式结构?(暂定)
我还再次尝试了使用read和push_vector同时对vector同时操作,发现不能兼容暂时不知道什么原因在malloc的调用就会报错。

日志模块

代码

blockqueue.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#ifndef BLOCKQUEUE_H

#define BLOCKQUEUE_H



#include <deque>

#include <condition_variable>

#include <mutex>

#include <sys/time.h>

using namespace std;



template <typename T>

class BlockQueue

{

public:

    explicit BlockQueue(size_t maxsize = 1000);

    ~BlockQueue();

    bool empty();

    bool full();

    void push_back(const T &item);

    void push_front(const T &item);

    bool pop(T &item);

    bool pop(T &item, int timeout);

    void clear();

    T front();

    T back();

    size_t capacity();

    size_t size();



    void flush();

    void Close();



private:

    deque<T> deq_;

    mutex mtx_;

    bool isClose_;

    size_t capacity_;

    condition_variable condConsumer_;

    condition_variable condProducer_;

};



template <typename T>

BlockQueue<T>::BlockQueue(size_t maxsize) : capacity_(maxsize)

{

    assert(maxsize > 0);

    isClose_ = false;

}



template <typename T>

BlockQueue<T>::~BlockQueue()

{

    Close();

}



template <typename T>

void BlockQueue<T>::Close()

{

    clear();

    isClose_ = true;

    condConsumer_.notify_all();

    condProducer_.notify_all();

}



template <typename T>

void BlockQueue<T>::clear()

{

    lock_guard<mutex> locker(mtx_);

    deq_.clear();

}



template <typename T>

bool BlockQueue<T>::empty()

{

    lock_guard<mutex> locker(mtx_);

    return deq_.empty();

}



template <typename T>

bool BlockQueue<T>::full()

{

    lock_guard<mutex> locker(mtx_);

    return deq_.size() >= capacity_;

}



template <typename T>

void BlockQueue<T>::push_back(const T &item)

{

    unique_lock<mutex> locker(mtx_);

    while (deq_.size() >= capacity_)

    {

        condProducer_.wait(locker);

    }

    deq_.push_back(item);

    condProducer_.notify_one();

}



template <typename T>

void BlockQueue<T>::push_front(const T &item)

{

    unique_lock<mutex> locker(mtx_);

    while (deq_.size() >= capacity_)

    {

        condProducer_.wait(locker);

    }

    deq_.push_front(item);

    condConsumer_.notify_one();

}



template <typename T>

bool BlockQueue<T>::pop(T &item)

{

    unique_lock<mutex> locker(mtx_);

    while (deq_.empty())

    {

        condConsumer_.wait(locker);

    }

    item = deq_front();

    deq_.pop_front();

    condProducer_.notify_one();

    return true;

}



template <typename T>

bool BlockQueue<T>::pop(T &item, int timeout)

{

    unique_lock<std::mutex> locker(mtx_);

    while (deq_.empty())

    {

        if (condConsumer_.wait_for(locker, std::chrono::seconds(timeout)) == std::cv_status::timeout)

        {

            return false;

        }

        if (isClose_)

        {

            return false;

        }

    }

    item = deq_.front();

    deq_.pop_front();

    condProducer_.notify_one();

    return true;

}



template <typename T>

T BlockQueue<T>::front()

{

    lock_guard<std::mutex> locker(mtx_);

    return deq_.front();

}



template <typename T>

T BlockQueue<T>::back()

{

    lock_guard<mutex> locker(mtx_);

    return deq_.back();

}



template <typename T>

size_t BlockQueue<T>::capacity()

{

    lock_guard<mutex> locker(mtx_);

    return capacity_;

}



template <typename T>

size_t BlockQueue<T>::size()

{

    lock_guard<mutex> locker(mtx_);

    return deq_.size();

}



template <typename T>

void BlockQueue<T>::flush()

{

    condProducer_.notify_one();

}

#endif

log.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#ifndef LOG_H

#define LOG_H



#include <mutex>

#include <string>

#include <thread>

#include <sys/time.h>

#include <stdarg.h>

#include <assert.h>

#include <sys/stat.h>

#include "blockqueue.h"

#include "../buffer/buffer.h"



class Log

{

public:

    void init(int level, const char *path = "./log", const char *suffix = "./log", int maxQueueCapacity = 1024);



    static Log *Instance();

    static void FlushLogThread();



    void write(int level, cons char *format, ...);

    void flush();



    int GetLevel();

    void SetLevel(int level);

    bool IsOpen() { return isOpen_; }



private:

    Log();

    void AppendLogLevelTitle_(int level);

    virtual ~Log();

    void AsyncWrite_();



private:

    static const int LOG_PATH_LEN = 256;

    static const int LOG_NAME_LEN = 256;

    static const int MAX_LINES = 50000;



    const char *path_;

    const char *suffix_;



    int MAX_LINES_;



    int lineCount_;

    int toDay;



    bool isOpen_;



    Buffer buff_;

    int level_;

    bool isAsync_;



    FILE *fp_;

    std::unique_ptr<BlockQueue<std::string>> deque_;

    std::unique_ptr<std::thread> writeThread_;

    std::mutex mtx_;

};



#define LOG_BASE(level, format, ...)                   \

    do                                                 \

    {                                                  \

        Log *log = Log::Instance();                    \

        if (log->isOpen() && log->GetLevel() <= level) \

        {                                              \

            log->write(level, format, ##__VA_ARGS__);  \

            log->flush();                              \

        }                                              \

    } while (0);



#define LOG_DEBUG(format, ...)

do

{

    LOG_BASE(0, format, ##__VA_ARGS__)

} while (0);

#define LOG_INFO(format, ...)

do

{

    LOG_BASE(1, format, ##__VA_ARGS__)

} while (0);

#define LOG_WARN(format, ...)

do

{

    LOG_BASE(2, format, ##__VA_ARGS__)

} while (0);

#define LOG_ERROR(format, ...)

do

{

    LOG_BASE(3, format, ##__VA_ARGS__)

} while (0);



#endif

log.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
 #include "./log.h"



Log::Log()

{

    fp_ = nullptr;

    deque_ = nullptr;

    writeThread_ = nullptr;

    lineCount_ = 0;

    toDay_ = 0;

    isAsync_ = false;

}



Log::~Log()

{

    while (!deque_->empty())

    {

        deque_->flush();

    }

    deque_->Close();

    writeThread_->join();

    if (fp_)

    {

        lock_guard<mutex> locker(mtx_);

        flush();

        fclose(fp_);

    }

}



void Log::flush()

{

    if (isAsync_)

    {

        deque_->fluse();

    }

    fflush(fp_);

}



Log *Log::Instance()

{

    static Log log;

    return &log;

}



void Log::FlushLogThread()

{

    Log::Instance()->AsyncWrite_();

}



void Log::AsyncWrite_()

{

    string str = "";

    while (deque_->pop(str))

    {

        lock_guard<mutex> locker(mtx_);

        fputs(str.c_str(), fp_);

    }

}



void Log::init(int level, const char *path, const char *suffix, int maxQueCapacity)

{

    isOpen_ = true;

    level_ = level;

    path_ = path;

    suffix_ = suffix;

    if (maxQueCapacity)

    {

        isAsync_ = true;

        if (!deque_)

        {

            unique_ptr<BlockQueue<std::string>> newQue(new BlockQueue<std::string>);

            deque_ = move(newQue);



            unique_ptr<thread> newThread(new thread(FlushLogThread)); writeThread_ = move(newThread);

        }

    }

    else

    {

        isAsync_ = false;

    }



    lineCount_ = 0;

    time_t timer = time(nullptr);

    struct tm *systime = localtime(&timer);

    char fileName[LOG_NAME_LEN] = {0};

    snprintf(fileName, LOG_NAME_LEN - 1, "%s/%04d_%02d_%02d%s", path_, systime->tm_year + 1900, systime->tm_mon + 1, systime->tm_mday, suffix_);



    toDay_ = systime->tm_mday;



    {

        lock_guard<mutex> locker(mtx_);

        buff_.RetrieveAll();

        if (fp_)

        {

            flush();

            fclose(fp_);

        }

        fp_ = fopen(fileName, "a");

        if (fp_ == nullptr)

        {

            mkdir(fileName, 0777);

            fp_ = fopen(fileName, "a");

        }

        assert(fp_ != nullptr);

    }

}



void Log::write(int level, const char *format, ...)

{

    struct timeval now = {0, 0};

    gettimeofday(&now, nullptr);

    time_t tSec = now.tv_sec;

    struct tm *sysTime = localtime(&tSec);

    struct tm t = *sysTime;

    va_list vaList;



    if (toDay != t.tm_mday || (lineCount_ && (lineCount_ % MAX_LINES == 0)))

    {

        unique_ptr<mutex> locker(mtx_);

        locker.unlock();

        char newFile[LOG_NAME_LEN];

        char tail[36] = {0};

        snprintf(tail, 36, "%04d_%02d_%02d", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday);



        if (toDay_ != t.tm_mday)

        {

            snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s/%s", path_, tail, suffix_);

            toDay_ = t.tm_mday;

            lineCount_ = 0;

        }

        else

        {

            snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s-%d%s", path_, tail, (lineCount_ / MAX_LINES), suffix_);

        }



        locker.lock();

        flush();

        fclose(fp_);

        fp_ = open(newFile, "a");

        assert(fp_ != nullptr);

    }



    {

        unique_ptr<mutex> locker(mtx_);

        lineCount_++;

        int n = snprintf(buff_.BeginWrite(), 128, "%d-%02d-%02d %02d:%02d:%02d:%06ld", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec, now.tv_usec);



        buff_.HasWritten(n);

        AppendLogLevelTitle_(level);



        va_start(vaList, format);

        int m = vsnprintf(buff_.BeginWrite(), buff_WritableBytes(), format, vaList);

        va_end(vaList);



        buff_.HasWritten(m);

        buff_.Append("\n\0", 2);



        if (isAsync_ && deque_ && !deque_->full())

        {

            deque_->push_back(buff_.RetrieveAlltostr());

        }

        else

        {

            fputs(buff_.Peek(), fp_);

        }

        buff_.RetrieveAll();

    }

}



void Log::AppendLogLevelTitle_(int level)

{

    switch (level)

    {

    case 0:

        buff_.Append("[debug]: ", 9);

        break;

    case 1:

        buff_.Append("[info] : ", 9);

        break;

    case 2:

        buff_.Append("[warn] : ", 9);

        break;

    case 3:

        buff_.Append("[error]: ", 9);

        break;

    default:

        buff_.Append("[info] : ", 9);

        break;

    }

}



int Log::GetLevel()

{

    lock_guard<mutex> locker(mtx_);

    return level_;

}



void Log::SetLevel(int level)

{

    lock_guard<mutex> locker(mtx_);

    level_ = level;

}

设计原理简述

我在网上搜集资料的时候发现在设计日志模块中涉及锁等操作主要采用了生产者消费者的模型属于一种设计模式,整个过程简单来说就是mutex,thread,unique_lock(unique_lock上锁wait和condition_variable.notify_one的上锁和解锁多对象对多对象)的综合运用。

生产者与消费者模式例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#include <iostream>

#include <thread>

#include <mutex>

#include <queue>

#include <condition_variable>



using namespace std;



// 缓冲区存储的数据类型

struct CacheData

{

    // 商品id

    int id;

    // 商品属性

    string data;

};



queue<CacheData> Q;

// 缓冲区最大空间

const int MAX_CACHEDATA_LENGTH = 10;

// 互斥量,生产者之间,消费者之间,生产者和消费者之间,同时都只能一个线程访问缓冲区

mutex m;

condition_variable condConsumer;

condition_variable condProducer;

// 全局商品id

int ID = 1;



// 消费者动作

void ConsumerActor()

{

    unique_lock<mutex> lockerConsumer(m);

    cout << "[" << this_thread::get_id() << "] 获取了锁" << endl;

    while (Q.empty())

    {

        cout << "因为队列为空,所以消费者Sleep" << endl;

        cout << "[" << this_thread::get_id() << "] 不再持有锁" << endl;

        // 队列空, 消费者停止,等待生产者唤醒

        condConsumer.wait(lockerConsumer);

        cout << "[" << this_thread::get_id() << "] Weak, 重新获取了锁" << endl;

    }

    cout << "[" << this_thread::get_id() << "] ";

    CacheData temp = Q.front();

    cout << "- ID:" << temp.id << " Data:" << temp.data << endl;

    Q.pop();

    condProducer.notify_one();

    cout << "[" << this_thread::get_id() << "] 释放了锁" << endl;

}



// 生产者动作

void ProducerActor()

{

    unique_lock<mutex> lockerProducer(m);

    cout << "[" << this_thread::get_id() << "] 获取了锁" << endl;

    while (Q.size() > MAX_CACHEDATA_LENGTH)

    {

        cout << "因为队列为满,所以生产者Sleep" << endl;

        cout << "[" << this_thread::get_id() << "] 不再持有锁" << endl;

        // 对列慢,生产者停止,等待消费者唤醒

        condProducer.wait(lockerProducer);

        cout << "[" << this_thread::get_id() << "] Weak, 重新获取了锁" << endl;

    }

    cout << "[" << this_thread::get_id() << "] ";

    CacheData temp;

    temp.id = ID++;

    temp.data = "*****";

    cout << "+ ID:" << temp.id << " Data:" << temp.data << endl;

    Q.push(temp);

    condConsumer.notify_one();

    cout << "[" << this_thread::get_id() << "] 释放了锁" << endl;

}



// 消费者

void ConsumerTask()

{

    while (1)

    {

        ConsumerActor();

    }

}



// 生产者

void ProducerTask()

{

    while (1)

    {

        ProducerActor();

    }

}



// 管理线程的函数

void Dispatch(int ConsumerNum, int ProducerNum)

{

    vector<thread> thsC;

    for (int i = 0; i < ConsumerNum; ++i)

    {

        thsC.push_back(thread(ConsumerTask));

    }



    vector<thread> thsP;

    for (int j = 0; j < ProducerNum; ++j)

    {

        thsP.push_back(thread(ProducerTask));

    }



    for (int i = 0; i < ConsumerNum; ++i)

    {

        if (thsC[i].joinable())

        {

            thsC[i].join();

        }

    }



    for (int j = 0; j < ProducerNum; ++j)

    {

        if (thsP[j].joinable())

        {

            thsP[j].join();

        }

    }

}



int main()

{

    // 一个消费者线程,5个生产者线程,则生产者经常要等待消费者

    Dispatch(1, 5);

    return 0;

}

思考

unique_guard和lock_guard最大的差别在于unique_guard更加灵活在其作用域可以手动加解锁,而lock_guard却在作用域里没有提供更多灵活的选项
在线程池判断中又判断了同步和非同步的情况加上读入buff的判断情况是否相当于两次缓冲了?为什么要使用单列(懒汉模式)?
懒汉模式就是一个类只返回一个实例,下面一段代码就可以理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>

class Singleton {
private:
// 私有构造函数,防止外部直接实例化
Singleton() {}

// 私有静态成员变量,用于保存唯一的实例
static Singleton* instance;

public:
// 公有静态方法,提供全局访问点
static Singleton* getInstance() {
// 在第一次调用时进行实例化
if (instance == nullptr) {
instance = new Singleton();
std::cout << "创建单例实例\n";
}
return instance;
}

// 其他成员方法...
};

// 在文件作用域内初始化静态成员变量
Singleton* Singleton::instance = nullptr;

int main() {
// 使用单例模式获取实例
Singleton* obj1 = Singleton::getInstance();
Singleton* obj2 = Singleton::getInstance();

// obj1 和 obj2 是相同的实例
std::cout << "obj1 和 obj2 " << ((obj1 == obj2) ? "是" : "不是") << "相同的实例\n";

return 0;
}

线程池模块

代码

sqlconnpool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#ifndef SQLCONNPOOL_H
#define SQLCONNPOOL_H

#include <mysql/mysql.h>
#include <string>
#include <queue>
#include <mutex>
#include <semaphore.h>
#include <thread>
#include "../log/log.h"

class SqlConnPool
{
public:
static SqlConnPool *Instance();

MYSQL *GetConn();
void FreeConn(MYSQL *conn);
int GetFreeConnCount();

void Init(const char *host , int port, const char *user, const char *pwd, const char *dbName, int connSize);
void ClosePool();
private:
SqlConnPool() = default;
~SqlConnPool() {ClosePool();}

int MAX_CONN_;

std::queue<MYSQL *> connQue_;
std::mutex mtx_;
sem_t semId_;
};

class SqlConnRAII
{
public:
SqlConnRAII(MYSQL **sql, SqlConnPool *connpool)
{
assert(connpool);
*sql = connpool->GetConn();
sql_ = *sql;
connpool_ = connpool;
}

~SqlConnRAII()
{
if(sql_){ connpool_->FreeConn(sql_);}
}
private:
MYSQL *sql_;
SqlConnPool *connpool_;
};

#endif

sqlconnpool.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
  #ifndef SQLCONNPOOL_H
#define SQLCONNPOOL_H

#include <mysql/mysql.h>
#include <string>
#include <queue>
#include <mutex>
#include <semaphore.h>
#include <thread>
#include "../log/log.h"

class SqlConnPool
{
public:
static SqlConnPool *Instance();

MYSQL *GetConn();
void FreeConn(MYSQL *conn);
int GetFreeConnCount();

void Init(const char *host , int port, const char *user, const char *pwd, const char *dbName, int connSize);
void ClosePool();
private:
SqlConnPool() = default;
~SqlConnPool() {ClosePool();}

int MAX_CONN_;

std::queue<MYSQL *> connQue_;
std::mutex mtx_;
sem_t semId_;
};

class SqlConnRAII
{
public:
SqlConnRAII(MYSQL **sql, SqlConnPool *connpool)
{
assert(connpool);
*sql = connpool->GetConn();
sql_ = *sql;
connpool_ = connpool;
}

~SqlConnRAII()
{
if(sql_){ connpool_->FreeConn(sql_);}
}
private:
MYSQL *sql_;
SqlConnPool *connpool_;
};

#endif

threadpool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#ifndef SQLCONNPOOL_H
#define SQLCONNPOOL_H

#include <mysql/mysql.h>
#include <string>
#include <queue>
#include <mutex>
#include <semaphore.h>
#include <thread>
#include "../log/log.h"

class SqlConnPool
{
public:
static SqlConnPool *Instance();

MYSQL *GetConn();
void FreeConn(MYSQL *conn);
int GetFreeConnCount();

void Init(const char *host , int port, const char *user, const char *pwd, const char *dbName, int connSize);
void ClosePool();
private:
SqlConnPool() = default;
~SqlConnPool() {ClosePool();}

int MAX_CONN_;

std::queue<MYSQL *> connQue_;
std::mutex mtx_;
sem_t semId_;
};

class SqlConnRAII
{
public:
SqlConnRAII(MYSQL **sql, SqlConnPool *connpool)
{
assert(connpool);
*sql = connpool->GetConn();
sql_ = *sql;
connpool_ = connpool;
}

~SqlConnRAII()
{
if(sql_){ connpool_->FreeConn(sql_);}
}
private:
MYSQL *sql_;
SqlConnPool *connpool_;
};

#endif

设计原理简述

lambda使用

代码中使用了lambda,注意使用时候的构造和mutable的使用,在lambda的使用中其中构造的函数看作自己构成了一个作用域,其作用域相对于加了一个const且只有可读权限,这时候就要注意[ ]对于参数的应用以及需要传参时候注意添加mutable,下面我给出一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <iostream>



int main()

{

    int x = 1, y = 2;



    auto Add = [](int x, int y) -> int

    { return x + y; };



    auto z = Add(x, y);

    std::cout << "z:" << z << std::endl;



    auto Swap = [x, y](int z, int a) mutable -> int

    {

        int temp = z;

        z = x;

        a = temp;

    };



    auto ret = Swap(x, y);



    std::cout << "x: " << x << " y: " << y << std::endl;

    return 0;

}

思考

麻了一开始不会用gcc编译出来全是undefined,原来用gcc编译的时候要把其他include包含的cpp文件添加进来,例如在编译一个测试log的时候g++ test.cpp ./log/log.cpp ./pool/sqlconnpool.cpp ./buffer/buffer.cpp `mysql_config --cflags --libs` -o test1

在以下这段代码中,我们注意到thread采用了lambda函数的使用方式进行而不是直接thread,这是由于在访问共享资源如tasks和mtx_的时候我们可以利用this来捕获当前线程的数据

1
2
3
4
5
6
7
8
9
std::thread([this]() {
std::unique_lock<std::mutex> locker(pool_->mtx_);
while(true) {
if(!pool_->tasks.empty()) {
auto task = std::move(pool_->tasks.front()); // 左值变右值,资产转移
pool_->tasks.pop();
locker.unlock(); // 因为已经把任务取出来了,所以可以提前解锁了
task();
locker.lock(); // 马上又要取任务了,上锁

再看到以下代码,为什么要使用std::move呢这是因为在std::move()资产转移的时候不会执行拷贝函数同时移动内部资源把执行权交给当前进程,总的来说就是为了提高效率。

1
2
3
4
5
auto task = std::move(pool_->tasks.front());    // 左值变右值,资产转移
pool_->tasks.pop();
locker.unlock(); // 因为已经把任务取出来了,所以可以提前解锁了
task();
locker.lock(); // 马上又要取任务了,上锁

还有这段代码中的AddTask(T && task),这里使用了一个完美转发使用了一个std::forward也就是在泛型编程中设置函数调用为右值引用而不是拷贝,为什么使用完美转发呢,是因为在泛型编程中这样可以提高效率。

1
2
3
4
5
6
 void AddTask(T&& task) {
std::unique_lock<std::mutex> locker(pool_->mtx_);
pool_->tasks.emplace(std::forward<T>(task));
pool_->cond_.notify_one();
}

代码

httprequest.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#ifndef HTTP_REPQUEST_H
#define HTTP_REPQUEST_H

#include <unordered_map>
#include <unordered_set>
#include <string>
#include <regex>
#include <errno.h>
#include <mysql/mysql.h>

#include "../buffer/buffer.h"
#include "../log/log.h"
#include "../pool/sqlconnpool.h"

class HttpRequest {
public:
enum PARSE_STATE
{
REQUEST_LINE,
HEADERS,
BODY,
FINISH,
};

HttpRequest() {Init();}
~HttpRequest() = default;

void Init();
bool parse(Buffer& buff);

std::string path() const;
std::string& path();
std::string method() const;
std::string version() const;
std::string GetPost(const std::string& key) const;
std::string GetPost(const char* key) const;

bool IsKeepAlive() const;
private:
bool ParesRequestLine_(const std::string& line);
void ParseHeader_(const std::string& line);
void ParseBody_(const std::string& line);

void ParsePath_();
void ParsePost_();
void ParseFromUrlencoded_();

static bool UserVerify(const std::string& name, const std::string& pwd, bool isLogin);

PARSE_STATE state_;
std::string method_, path_, version_, body_;
std::unordered_map<std::string, std::string> header_;
std::unordered_map<std::string, std::string> post_;

static const std::unordered_set<std::string> DEFAULT_HTML;
static const std::unordered_map<std::string, int> DEFAULT_HTML_TAG;
static int ConverHex(char ch);


};


#endif

httprequest.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
#include "httprequest.h"
using namespace std;

const unordered_set<string> HttpRequest::DEFAULT_HTML{
"/index", "/register", "/login",
"/welcome", "/video", "/picture", };

const unordered_map<string, int> HttpRequest::DEFAULT_HTML_TAG {
{"/register.html", 0}, {"/login.html", 1}, };

void HttpRequest::Init() {
method_ = path_ = version_ = body_ = "";
state_ = REQUEST_LINE;
header_.clear();
post_.clear();
}

bool HttpRequest::IsKeepAlive() const {
if(header_.count("Connection") == 1) {
return header_.find("Connection")->second == "keep-alive" && version_ == "1.1";
}
return false;
}

// 解析处理
bool HttpRequest::parse(Buffer& buff) {
const char CRLF[] = "\r\n"; // 行结束符标志(回车换行)
if(buff.ReadableBytes() <= 0) { // 没有可读的字节
return false;
}
// 读取数据
while(buff.ReadableBytes() && state_ != FINISH) {
// 从buff中的读指针开始到读指针结束,这块区域是未读取得数据并去处"\r\n",返回有效数据得行末指针
const char* lineEnd = search(buff.Peek(), buff.BeginWriteConst(), CRLF, CRLF + 2);
// 转化为string类型
std::string line(buff.Peek(), lineEnd);
switch(state_)
{
/*
有限状态机,从请求行开始,每处理完后会自动转入到下一个状态
*/
case REQUEST_LINE:
if(!ParseRequestLine_(line)) {
return false;
}
ParsePath_(); // 解析路径
break;
case HEADERS:
ParseHeader_(line);
if(buff.ReadableBytes() <= 2) {
state_ = FINISH;
}
break;
case BODY:
ParseBody_(line);
break;
default:
break;
}
if(lineEnd == buff.BeginWrite()) { break; } // 读完了
buff.RetrieveUntil(lineEnd + 2); // 跳过回车换行
}
LOG_DEBUG("[%s], [%s], [%s]", method_.c_str(), path_.c_str(), version_.c_str());
return true;
}

// 解析路径
void HttpRequest::ParsePath_() {
if(path_ == "/") {
path_ = "/index.html";
}
else {
for(auto &item: DEFAULT_HTML) {
if(item == path_) {
path_ += ".html";
break;
}
}
}
}

bool HttpRequest::ParseRequestLine_(const string& line) {
regex patten("^([^ ]*) ([^ ]*) HTTP/([^ ]*)$");
smatch subMatch;
// 在匹配规则中,以括号()的方式来划分组别 一共三个括号 [0]表示整体
if(regex_match(line, subMatch, patten)) { // 匹配指定字符串整体是否符合
method_ = subMatch[1];
path_ = subMatch[2];
version_ = subMatch[3];
state_ = HEADERS; // 状态转换为下一个状态
return true;
}
LOG_ERROR("RequestLine Error");
return false;
}

void HttpRequest::ParseHeader_(const string& line) {
regex patten("^([^:]*): ?(.*)$");
smatch subMatch;
if(regex_match(line, subMatch, patten)) {
header_[subMatch[1]] = subMatch[2];
}
else {
state_ = BODY; // 状态转换为下一个状态
}
}

void HttpRequest::ParseBody_(const string& line) {
body_ = line;
ParsePost_();
state_ = FINISH; // 状态转换为下一个状态
LOG_DEBUG("Body:%s, len:%d", line.c_str(), line.size());
}

// 16进制转化为10进制
int HttpRequest::ConverHex(char ch) {
if(ch >= 'A' && ch <= 'F') return ch -'A' + 10;
if(ch >= 'a' && ch <= 'f') return ch -'a' + 10;
return ch;
}

// 处理post请求
void HttpRequest::ParsePost_() {
if(method_ == "POST" && header_["Content-Type"] == "application/x-www-form-urlencoded") {
ParseFromUrlencoded_(); // POST请求体示例
if(DEFAULT_HTML_TAG.count(path_)) { // 如果是登录/注册的path
int tag = DEFAULT_HTML_TAG.find(path_)->second;
LOG_DEBUG("Tag:%d", tag);
if(tag == 0 || tag == 1) {
bool isLogin = (tag == 1); // 为1则是登录
if(UserVerify(post_["username"], post_["password"], isLogin)) {
path_ = "/welcome.html";
}
else {
path_ = "/error.html";
}
}
}
}
}

// 从url中解析编码
void HttpRequest::ParseFromUrlencoded_() {
if(body_.size() == 0) { return; }

string key, value;
int num = 0;
int n = body_.size();
int i = 0, j = 0;

for(; i < n; i++) {
char ch = body_[i];
switch (ch) {
// key
case '=':
key = body_.substr(j, i - j);
j = i + 1;
break;
// 键值对中的空格换为+或者%20
case '+':
body_[i] = ' ';
break;
case '%':
num = ConverHex(body_[i + 1]) * 16 + ConverHex(body_[i + 2]);
body_[i + 2] = num % 10 + '0';
body_[i + 1] = num / 10 + '0';
i += 2;
break;
// 键值对连接符
case '&':
value = body_.substr(j, i - j);
j = i + 1;
post_[key] = value;
LOG_DEBUG("%s = %s", key.c_str(), value.c_str());
break;
default:
break;
}
}
assert(j <= i);
if(post_.count(key) == 0 && j < i) {
value = body_.substr(j, i - j);
post_[key] = value;
}
}

bool HttpRequest::UserVerify(const string &name, const string &pwd, bool isLogin) {
if(name == "" || pwd == "") { return false; }
LOG_INFO("Verify name:%s pwd:%s", name.c_str(), pwd.c_str());
MYSQL* sql;
SqlConnRAII(&sql, SqlConnPool::Instance());
assert(sql);

bool flag = false;
unsigned int j = 0;
char order[256] = { 0 };
MYSQL_FIELD *fields = nullptr;
MYSQL_RES *res = nullptr;

if(!isLogin) { flag = true; }
/* 查询用户及密码 */
snprintf(order, 256, "SELECT username, password FROM user WHERE username='%s' LIMIT 1", name.c_str());
LOG_DEBUG("%s", order);

if(mysql_query(sql, order)) {
mysql_free_result(res);
return false;
}
res = mysql_store_result(sql);
j = mysql_num_fields(res);
fields = mysql_fetch_fields(res);

while(MYSQL_ROW row = mysql_fetch_row(res)) {
LOG_DEBUG("MYSQL ROW: %s %s", row[0], row[1]);
string password(row[1]);
/* 注册行为 且 用户名未被使用*/
if(isLogin) {
if(pwd == password) { flag = true; }
else {
flag = false;
LOG_INFO("pwd error!");
}
}
else {
flag = false;
LOG_INFO("user used!");
}
}
mysql_free_result(res);

/* 注册行为 且 用户名未被使用*/
if(!isLogin && flag == true) {
LOG_DEBUG("regirster!");
bzero(order, 256);
snprintf(order, 256,"INSERT INTO user(username, password) VALUES('%s','%s')", name.c_str(), pwd.c_str());
LOG_DEBUG( "%s", order);
if(mysql_query(sql, order)) {
LOG_DEBUG( "Insert error!");
flag = false;
}
flag = true;
}
// SqlConnPool::Instance()->FreeConn(sql);
LOG_DEBUG( "UserVerify success!!");
return flag;
}

std::string HttpRequest::path() const{
return path_;
}

std::string& HttpRequest::path(){
return path_;
}
std::string HttpRequest::method() const {
return method_;
}

std::string HttpRequest::version() const {
return version_;
}

std::string HttpRequest::GetPost(const std::string& key) const {
assert(key != "");
if(post_.count(key) == 1) {
return post_.find(key)->second;
}
return "";
}

std::string HttpRequest::GetPost(const char* key) const {
assert(key != nullptr);
if(post_.count(key) == 1) {
return post_.find(key)->second;
}
return "";
}

定时器

代码

heaptimer.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#ifndef HEAP_TIMER_H
#define HEAP_TIMER_H

#include <queue>
#include <unordered_map>
#include <time.h>
#include <algorithm>
#include <arpa/inet.h>
#include <functional>
#include <chrono>
#include "../log/log.h"

typedef std::function<void()> TimeoutCallBack;
typedef std::chrono::high_resolution_clock Clock;
typedef std::chrono::milliseconds MS;
typedef Clock::time_point TimeStamp;

struct TimerNode
{
int id;
TimeStamp expires;
TimeoutCallBack cb;
bool operator<(const TimerNode& t)
{
return expires < t.expires;
}

bool operator>(const TimerNode& t)
{
return expires > t.expires;
}
};

class HeapTimer
{
public:
HeapTimer() {heap_.reserve(64); }
~HeapTimer() {clear(); }

void adjust(int id, int newExpires);
void add(int id, int timeout, const TimeoutCallBack& cb);
void dowork(int id);
void clear();
void tick();
void pop();
int GetNextTick();
private:
void del_(size_t i);
void siftup_(size_t i);
bool siftdown_(size_t i, size_t n);
void SwapNode_(size_t i, size_t j);

std::vector<TimerNode> heap_;

std::unordered_map<int, size_t> ref_;
};

#endif

heaptimer.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
  #include "heaptimer.h"

void HeapTimer::SwapNode_(size_t i, size_t j)
{
assert(i >= 0 && i < heap_.size());
assert(j >= 0 && j < heap_.size());
swap(heap_[i], heap_[j]);
ref_[heap_[i].id] = i;
ref_[heap_[j].id] = j;
}

void HeapTimer::siftup_(size_t i)
{
assert(i >= 0 && i < heap_.size());
size_t parent = (i - 1)/2;
while(parent >= 0)
{
if(heap_[parent] > heap_[i])
{
SwapNode_(i, parent);
i = parent;
parent = (i - 1)/2;
}else
{
break;
}
}
}

bool HeapTimer::siftdown_(size_t i, size_t n)
{
assert(i >= 0 && i < heap_.size());
assert(n >= 0 && n < heap_.size());
auto index = i;
auto child = 2*index+1;
while(child < n)
{
if(child+1 < n && heap_[child+1] < heap_[child])
{
child++;
}
if(heap_[child] < heap_[index])
{
SwapNode_(index, child);
index = child;
child = 2*child+1;
}
break;
}
return index > i;
}

void HeapTimer::del_(size_t index)
{
assert(index >= 0 && index < heap_.size());

size_t tmp = index;
size_t n = heap_.size();
assert(tmp <= n);
if(index < heap_.size()-1)
{
SwapNode_(tmp, heap_.size()-1);
if(!siftdown_(tmp, n))
{
siftup_(tmp);
}
}
ref_.erase(heap_.back().id);
heap_.pop_back();
}

void HeapTimer::adjust(int id, int newExpires)
{
assert(!heap_.empty() && ref_.count(id));
heap_[ref_[id]].expires = Clock::now() + MS(newExpires);
siftdown_(ref_[id], heap_.size());
}

void HeapTimer::add(int id, int timeOut, const TimeoutCallBack& cb)
{
assert(id >= 0);
if(ref_.count(id))
{
int tmp = ref_[id];
heap_[tmp].expires = Clock::now() + MS(timeOut);
heap_[tmp].cb = cb;
if(!siftdown_(tmp, heap_.size()))
{
siftup_(tmp);
}else
{
size_t n = heap_.size();
ref_[id] = n;
heap_.push_back({id, Clock::now() + MS(timeOut), cb});
siftup_(n);
}
}
}

void HeapTimer::dowork(int id)
{
if(heap_.empty() || ref_.count(id) == 0)
{
return;
}
size_t i = ref_[id];
auto node = heap_[i];
node.cb();;
del_(i);
}

void HeapTimer::tick()
{
if(heap_.empty())
{
return;
}
while(!heap_.empty())
{
TimerNode node = heap_.front();
if(std::chrono::duration_cast<MS>(node.expires - Clock::now()).count() > 0)
{
break;
}
node.cb();
pop();
}
}

void HeapTimer::pop()
{
assert(!heap_.empty());
del_(0);
}

void HeapTimer::clear()
{
ref_.clear();
heap_.clear();
}

int HeapTimer::GetNextTick()
{
tick();
size_t res = -1;
if(!heap_.empty())
{
res = std::chrono::duration<MS>(heap_.front().expires - Clock::now()).count();
if(res < 0) {res = 0;}
}
return res;
}


思考

enum和#define用法类似,但注意enum的定义有一个为成员变量赋值的操作。

1
std::unordered_map<std::string, std::string> sample #实现的是一个键值对操作string->string

cpp中以下原代码可能存在问题(标记一下) 确实是存在问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  bool HeapTimer::siftdown_(size_t i, size_t n) {
assert(i >= 0 && i < heap_.size());
assert(n >= 0 && n <= heap_.size()); // n:共几个结点
auto index = i;
auto child = 2*index+1;
while(child < n) {
if(child+1 < n && heap_[child+1] < heap_[child]) {
child++;
}
if(heap_[child] < heap_[index]) {
SwapNode_(index, child);
index = child;
child = 2*child+1;
### }
### break; // 需要跳出循环
###->修改为以下
}else{
break;
}

}
return index > i;
}


整个操作也就是一个小根堆的置换,下面我写了一个例子就可以很好的理解了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include <stdio.h>
#include <vector>
#include <iostream>
#include <assert.h>
#include <unordered_map>
using namespace std;

struct TimeNode
{
int result;
int id;
bool operator<(const TimeNode t)
{
return result < t.result;
}
bool operator>(TimeNode &t)
{
return result > t.result;
}
};

class HeapTimer
{
public:
bool add(int id, const TimeNode &Node);
bool SwapNode(int res, int tar);
bool swapup(size_t i);
bool swapdown(size_t i, size_t n);
void ShowNode() const;

private:
vector<TimeNode> node_;
unordered_map<int, size_t> ref_;
};

bool HeapTimer::add(int id, const TimeNode &Node)
{
size_t n = node_.size();
ref_[id] = n;
node_.push_back(Node);
cout << "here" << endl;
// swapup(n);
return true;
}

bool HeapTimer::SwapNode(int res, int tar)
{

swap(node_[res], node_[tar]);
cout << "right" << endl;
// ref_[node_[res].id] = res;
// ref_[node_[tar].id] = tar;
}

bool HeapTimer::swapup(size_t i)
{
size_t parent = (i - 1) / 2;
while (parent >= 0 && parent < 1000)
{
if (node_[parent] > node_[i])
{
cout<< "node_[parent] " << parent << " :"<< node_[parent].result << " node_[i]" << node_[i].result << endl;
SwapNode(i, parent);
i = parent;
parent = (i - 1) / 2;
}
else
{
break;
}
}
return true;
}

bool HeapTimer::swapdown(size_t i, size_t n)
{
size_t index = i;
size_t child = 2 * i + 1;
while (child < n)
{
if (child + 1 < n && node_[child + 1] < node_[child])
{
child++;
}
if (node_[child] < node_[index])
{
SwapNode(child, index);
index = child;
child = 2 * index + 1;
}
else
{
break;
}
}
return index > i;
}

void HeapTimer::ShowNode() const
{
for (int i = 0; i < 10; i++)
{
cout << "node " << i << ":" << node_[i].result << endl;
}
}

int main()
{
TimeNode n1 = {1, 1};
TimeNode n2 = {2, 2};
TimeNode n3 = {3, 3};
TimeNode n4 = {4, 4};
TimeNode n5 = {5, 5};
TimeNode n6 = {6, 6};

HeapTimer T;
// T.add(1, n1);
T.add(2, n2);
T.add(3, n3);
T.add(4, n4);
T.add(5, n5);
T.add(1, n1);

// T.swapup(4);
T.ShowNode();
T.swapup(4);
T.ShowNode();

return 0;
}

关于时间c++ 提供了一个时钟库chrono来计算精确时间,好像是一种固定的用法为如下午例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <chrono>

int main() {
// 获取当前时间点
auto start_time = std::chrono::high_resolution_clock::now();

// 模拟一些工作,例如执行某个算法或操作

// 获取结束时间点
auto end_time = std::chrono::high_resolution_clock::now();

// 计算时间间隔
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// 与上文代码相比把microseconds封装,以及把结构转换为size_t
// 例:size_t duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();

std::cout << "Time taken: " << duration.count() << " microseconds" << std::endl;

return 0;
}

在这段代码中的while循环的判断为(listenEvent_ & EPOLLET),我在网上搜索大概表示只接受EPOLLET模式,为什么使用do while呢是因为这样可以保证在不同初始情况下都会确保至少处理一个连接

1
2
3
4
5
6
7
8
9
10
  do {
int fd = accept(listenFd_, (struct sockaddr *)&addr, &len);
if(fd <= 0) { return;}
else if(HttpConn::userCount >= MAX_FD) {
SendError_(fd, "Server busy!");
LOG_WARN("Clients is full!");
return;
}
AddClient_(fd, addr);
} while(listenEvent_ & EPOLLET);

再看到timenode是如何实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 timer_->add(fd, timeoutMS_, std::bind(&WebServer::CloseConn_, this, &users_[fd]));
void HeapTimer::add(int id, int timeOut, const TimeoutCallBack& cb)
//如何理解std::bind呢,上述代码中就相当于this->CloseConn_(fd),注意第二参数如果有结构体就是结构体
struct Foo {
void print_sum(int n1, int n2)
{
std::cout << n1+n2 << '\n';
}
int data = 10;
};
int main()
{
Foo foo;
auto f = std::bind(&Foo::print_sum, &foo, 95, std::placeholders::_1);
f(5); // 100
}
//bind也可以使用lambda来代替
timer_->add(fd, timeoutMS_, [this, &user = users_[fd]](){CloseConn_[&user]});

HTTP连接模块

代码

httpresponse.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#ifndef HTTP_RESPONSE_H
#define HTTP_RESPONSE_H

#include <unordered_map>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/mman.h>

#include "../buffer/buffer.h"
#include "../log/log.h"

class HttpResponse
{
public:
HttpResponse();
~HttpResponse();

void Init(const std::string& srcDir, std::string& path, bool isKeepAlive = false, int cone = -1);
void MakeResponse(Buffer& buff);
void UnmapFile();
char *File();
size_t FileLen() const;
void ErrorContent(Buffer& buff, std::string message);
int Code() const {return code_;}
private:
void AddStateLine_(Buffer &buff);
void AddHeader_(Buffer &buff);
void AddContent_(Buffer &buff);

void ErrorHtml_();
std::string GetFileType_();

int code_;
bool isKeepAlive_;

std::string path_;
std::string srcDir_;

char* mmFile_;
struct stat mmFileStat_;

static const std::unordered_map<std::string, std::string> SUFFIX_TYPE;
static const std::unordered_map<int, std::string> CODE_STATUS;
static const std::unordered_map<int, std::string> CODE_PATH;
};

#endif

httpresponse.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
  #include "httpresponse.h"

using namespace std;

const unordered_map<string, string> HttpResponse::SUFFIX_TYPE =
{
{ ".html", "text/html" },
{ ".xml", "text/xml" },
{ ".xhtml", "application/xhtml+xml" },
{ ".txt", "text/plain" },
{ ".rtf", "application/rtf" },
{ ".pdf", "application/pdf" },
{ ".word", "application/nsword" },
{ ".png", "image/png" },
{ ".gif", "image/gif" },
{ ".jpg", "image/jpeg" },
{ ".jpeg", "image/jpeg" },
{ ".au", "audio/basic" },
{ ".mpeg", "video/mpeg" },
{ ".mpg", "video/mpeg" },
{ ".avi", "video/x-msvideo" },
{ ".gz", "application/x-gzip" },
{ ".tar", "application/x-tar" },
{ ".css", "text/css "},
{ ".js", "text/javascript "},
};

const unordered_map<int, string> HttpResponse::CODE_STATUS =
{
{ 200, "OK" },
{ 400, "Bad Request" },
{ 403, "Forbidden" },
{ 404, "Not Found" },
};

const unordered_map<int, string> HttpResponse::CODE_PATH =
{
{ 400, "/400.html" },
{ 403, "/403.html" },
{ 404, "/404.html" },
};

HttpResponse::HttpResponse()
{
code_ = -1;
path_ = srcDir_ = "";
isKeepAlive_ = false;
mmFile_ = nullptr;
mmFileStat_ = {0};
}

HttpResponse::~HttpResponse()
{
UnmapFile();
}

void HttpResponse::Init(const string& srcDir, string& path, bool isKeepAlive, int code)
{
assert(srcDir != "");
if(mmFile_){UnmapFile();}
code_ = code;
isKeepAlive_ = isKeepAlive;
path_ = path;
srcDir_ = srcDir;
mmFile_ = nullptr;
mmFileStat_ = {0};
}

void HttpResponse::MakeResponse(Buffer& buff)
{
if(stat((srcDir_ + path_).data(), &mmFileStat_) < 0 || S_ISDIR(mmFileStat_.st_mode))
{
code_ = 404;
}else if(!(mmFileStat_.st_mode & S_IROTH)){
code_ = 403;
}else if(code_ == -1){
code_ = 200;
}
ErrorHtml_();
AddStateLine_(buff);
AddHeader_(buff);
AddContent_(buff);
}

char* HttpResponse::File()
{
return mmFile_;
}

size_t HttpResponse::FileLen() const
{
return mmFileStat_.st_size;
}

void HttpResponse::ErrorHtml_()
{
if(CODE_PATH.count(code_) == -1)
{
path_ = CODE_PATH.find(code_)->second;
stat((srcDir_ + path_).data(), &mmFileStat_);
}
}

void HttpResponse::AddStateLine_(Buffer& buff)
{
string status;
if(CODE_STATUS.count(code_) == 1)
{
status = CODE_STATUS.find(code_)->second;
}
else{
code_ = 400;
status = CODE_STATUS.find(400)->second;
}
buff.Append("HTTP/1.1 " + to_string(code_) + " " + status + "\r\n");
}

void HttpResponse::AddHeader_(Buffer& buff)
{
buff.Append("Connection: ");
if(isKeepAlive_){
buff.Append("keep-alive\r\n");
buff.Append("keep-alive: max=6, timeout=120\r\n");
}else{
buff.Append("clsee\r\n");
}
buff.Append("Content-type: " + GetFileType_() + "\r\n");
}

void HttpResponse::AddContent_(Buffer &buff)
{
int srcFd = open((srcDir_ + path_).data(), O_RDONLY);
if(srcFd < 0){
ErrorContent(buff, "File NotFound!");
return;
}

LOG_DEBUG("file path %s", (srcDir_ + path_).data());
int* mmRet = (int*)mmap(0, mmFileStat_.st_size, PROT_READ, MAP_PRIVATE, srcFd, 0);
if(*mmRet == -1)
{
ErrorContent(buff, "File NotFound!");
return;
}
mmFile_ = (char*)mmRet;
close(srcFd);
buff.Append("Content-length:" + to_string(mmFileStat_.st_size) + "\r\n\r\n");
}

void HttpResponse::UnmapFile()
{
if(mmFile_)
{
munmap(mmFile_, mmFileStat_.st_size);
mmFile_ = nullptr;
}
}

string HttpResponse::GetFileType_() {
string::size_type idx = path_.find_last_of('.');
if(idx == string::npos) { // 最大值 find函数在找不到指定值得情况下会返回string::npos
return "text/plain";
}
string suffix = path_.substr(idx);
if(SUFFIX_TYPE.count(suffix) == 1) {
return SUFFIX_TYPE.find(suffix)->second;
}
return "text/plain";
}

void HttpResponse::ErrorContent(Buffer& buff, string message)
{
string body;
string status;
body += "<html><title>Error</title>";
body += "<body bgcolor=\"ffffff\">";
if(CODE_STATUS.count(code_) == 1) {
status = CODE_STATUS.find(code_)->second;
} else {
status = "Bad Request";
}
body += to_string(code_) + " : " + status + "\n";
body += "<p>" + message + "</p>";
body += "<hr><em>TinyWebServer</em></body></html>";

buff.Append("Content-length: " + to_string(body.size()) + "\r\n\r\n");
buff.Append(body);
}

httpconn.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#ifndef HTTP_CONN_H
#define HTTP_CONN_H

#include <sys/types.h>
#include <sys/uio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <errno.h>

#include "../log/log.h"
#include "../buffer/buffer.h"
#include "httpresponse.h"
#include "httprequest.h"

class HttpConn
{
public:
HttpConn();
~HttpConn();

void init(int sockFd, const sockaddr_in& addr);
ssize_t read(int *saveErrno);
ssize_t write(int *saveErrno);
void Close();
int GetFd() const;
int GetPort() const;
const char* GetIP() const;
sockaddr_in GetAddr() const;
bool process();

int ToWriteBytes()
{
return iov_[0].iov_len + iov_[1].iov_len;
}

bool IsKeepAlive() const {
return request_.IsKeepAlive();
}

static bool isET;
static const char* srcDir;
static std::atomic<int> userCount;
private:

int fd_;
struct sockaddr_in addr_;

bool isClose_;

int iovCnt_;
struct iovec iov_[2];

Buffer readBuff_;
Buffer writeBuff_;

HttpRequest request_;
HttpResponse response_;
};

#endif

httpconn.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include "httpconn.h"
using namespace std;

const char* HttpConn::srcDir;
std::atomic<int> HttpConn::userCount;
bool HttpConn::isET;

HttpConn::HttpConn()
{
fd_ = -1;
addr_ = {0};
isClose_ = true;
};

HttpConn::~HttpConn()
{
Close();
};

void HttpConn::init(int fd, const sockaddr_in &addr)
{
assert(fd > 0);
userCount++;
addr_ = addr;
fd_ = fd;
writeBuff_.RetrieveAll();
readBuff_.RetrieveAll();
isClose_ = false;
LOG_INFO("Client[%d](%s:%d) in, userCount:%d", fd_, GetIP(), GetPort(), (int)userCount);
}

void HttpConn::Close()
{
response_.UnmapFile();
if(isClose_ = false)
{
isClose_ = true;
userCount--;
close(fd_);
LOG_INFO("Client[%d](%s:%d) quit, UserCount:%d", fd_, GetIP(), GetPort(), (int)userCount);
}
}

int HttpConn::GetFd() const
{
return fd_;
}

struct sockaddr_in HttpConn::GetAddr() const
{
return addr_;
}

const char* HttpConn::GetIP() const
{
return inet_ntoa(addr_.sin_addr);
}

int HttpConn::GetPort() const
{
return addr_.sin_port;
}

ssize_t HttpConn::read(int *saveErrno)
{
ssize_t len = -1;
do {
len = readBuff_.ReadFd(fd_, saveErrno);
if(len <= 0)
{
break;
}
}while (isET);
return len;
}

ssize_t HttpConn::write(int *saveErrno)
{
ssize_t len = -1;
do {
len = writev(fd_, iov_, iovCnt_);
if(len <= 0)
{
*saveErrno = errno;
break;
}
if(iov_[0].iov_len + iov_[1].iov_len == 0) {break;}
else if(static_cast<size_t>(len) > iov_[0].iov_len)
{
iov_[1].iov_base = (uint8_t*) iov_[1].iov_base + (len - iov_[0].iov_len);
iov_[1].iov_len -= (len - iov_[0].iov_len);
if(iov_[0].iov_len)
{
writeBuff_.RetrieveAll();
iov_[0].iov_len = 0;
}
}
else {
iov_[0].iov_base = (uint8_t*)iov_[0].iov_base + len;
iov_[0].iov_len -= len;
writeBuff_.Retrieve(len);
}
}while(isET || ToWriteBytes() > 10240);
return len;
}

bool HttpConn::process()
{
request_.Init();
if(readBuff_.ReadbleBytes() <= 0)
{
return false;
}
else if(request_.parse(readBuff_))
{
LOG_DEBUG("%s", request_.path().c_str());
response_.Init(srcDir, request_.path(), request_.IsKeepAlive(), 200);
} else {
response_.Init(srcDir, request_.path(), false, 400);
}

response_.MakeResponse(writeBuff_);

iov_[0].iov_base = const_cast<char*>(writeBuff_.Peek());
iov_[0].iov_len = writeBuff_.ReadbleBytes();
iovCnt_ = 1;

if(response_.FileLen() > 0 && response_.File())
{
iov_[1].iov_base = response_.File();
iov_[1].iov_len = response_.FileLen();
iovCnt_ = 2;
}
LOG_DEBUG("filesize:%d, %d to %d", response_.FileLen() , iovCnt_, ToWriteBytes());
return true;

}

思考

wait

epoll模块

代码

epoller.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#ifndef EPOLLER_H
#define EPOLLER_H

#include <sys/epoll.h>
#include <unistd.h>
#include <assert.h>
#include <vector>
#include <errno.h>

class Epoller
{
public:
explicit Epoller(int maxEvent = 1024);
~Epoller();

bool AddFd(int fd, uint32_t events);
bool ModFd(int fd, uint32_t events);
bool DelFd(int fd);
int Wait(int timeoutMs = -1);
int GetEventFd(size_t i) const;
uint32_t GetEvents(size_t i) const;
private:
int epollFd_;
std::vector<struct epoll_event> events_;
};

#endif

epoller.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
 #include "epoller.h"

Epoller::Epoller(int maxEvent):epollFd_(epoll_create(512)), events_(maxEvent)
{
assert(epollFd_ >= 0&& events_.size() > 0);
}

Epoller::~Epoller()
{
close(epollFd_);
}

bool Epoller::AddFd(int fd, uint32_t events)
{
if(fd < 0) return false;
epoll_event ev = {0};
ev.data.fd = fd;
ev.events = events;
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev);
}

bool Epoller::ModFd(int fd, uint32_t events)
{
if(fd < 0) return false;
epoll_event ev = {0};
ev.data.fd = fd;
ev.events = events;
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_MOD, fd, &ev);
}

bool Epoller::DelFd(int fd)
{
if(fd < 0) return false;
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, 0);
}

int Epoller::Wait(int timeoutMs)
{
return epoll_wait(epollFd_, &events_[0], static_cast<int>(events_.size()), timeoutMs);
}

int Epoller::GetEventFd(size_t i) const
{
assert(i < events_.size() && i >= 0);
return events_[i].data.fd;
}

uint32_t Epoller::GetEvents(size_t i) const
{
assert(i < events_.size() && i >= 0);
return events_[i].events;
}

webserver.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#ifndef WEBSERVER_H
#define WEBSERVER_H

#include <unordered_map>
#include <fcntl.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "epoller.h"
#include "../time/heaptimer.h"

#include "../log/log.h"
#include "../pool/sqlconnpool.h"
#include "../pool/threadpool.h"

#include "../httpr/httpconn.h"

class WebServer{
public:
WebServer(int port, int trigMode, int timeoutMS, bool OptLinger, int sqlPort, const char* sqlUser, const char* sqlPwd, const char* dbName, int connPoolNum, int threadNum, bool openlog, int logLevel, int logQueSize);
~WebServer();

void start();
private:
bool InitSocket_();
void InitEventMode_(int trigMode);
void AddClient_(int fd, sockaddr_in addr);

void DealListen_();
void DealWrite_(HttpConn* client);
void DealRead_(HttpConn* client);

void sendError_(int fd, const char* info);
void ExtentTime_(HttpConn* client);
void CloseConn_(HttpConn* client);

void OnRead_(HttpConn *client);
void OnWrite_(HttpConn *client);
void OnProcess(HttpConn *client);

static const int MAX_FD = 65536;

static int SetFdNonblock(int fd);

int port_;
bool openLinger_;
int timeoutMS_;
bool isClose_;
int listenFd_;
char* srcDir_;

uint32_t listenEvent_;
uint32_t connEvent_;

std::unique_ptr<HeapTimer> timer_;
std::unique_ptr<ThreadPool> threadpool_;
std::unique_ptr<Epoller> epoller_;
std::unordered_map<int, HttpConn> users_;
};

#endif

webserver.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
#include "webserver.h"

using namespace std;

WebServer::WebServer(int port, int trigMode, int timeoutMS, bool OptLinger, int sqlPort, const char* sqlUser, const char* sqlPwd, const char* dbName, int connPoolNum, int threadNum, bool openLog, int logLevel, int logQueSize): port_(port), openLinger_(OpenLinger), timeoutMS_(timeoutMS), isClose_(false),timer_(new HeapTimer()), threadpool_(new ThreadPool(threadNum)),epoller_(new Epoller())
{
srcDir_ = getcwd(nullptr, 256);
assert(srcDir_);
strcat(srcDir_, "/resources/");
HttpConn::userCount = 0;
HttpConn::srcDir = srcDir_;

SqlConnPool::Instance()->Init("localhost", sqlPort, sqlUser, sqlPwd, dbName, connPoolNum);
InitEventMode_(trigMode);
if(!InitSocket_()) { isClose_ = true;}

if(openLog)
{
Log::Instance()->init(logLevel, "./log", ".log", logQueSize);
if(isClose_) { LOG_ERROR("========== Server init error!==========");}
else{
LOG_INFO("========== Server init ==========");
LOG_INFO("Port:%d, OpenLinger: %s", port_, OptLinger? "true":"false");
LOG_INFO("Listen Mode: %s, OpenConn Mode: %s",
(listenEvent_ & EPOLLET ? "ET": "LT"),
(connEvent_ & EPOLLET ? "ET": "LT"));
LOG_INFO("LogSys level: %d", logLevel);
LOG_INFO("srcDir: %s", HttpConn::srcDir);
LOG_INFO("SqlConnPool num: %d, ThreadPool num: %d", connPoolNum, threadNum);
}
}
}

WebServer::~WebServer()
{
close(listenFd_);
isClose_ = true;
free(srcDir_);
SqlConnPool::Instance()->ClosePool();
}

void WebServer::InitEventMode_(int trigMode)
{
listenEvent_ = EPOLLRDHUP;
connEvent_ = EPOLLONESHOT | EPOLLRDHUP;
switch (trigMode)
{
case 0:
break;
case 1:
connEvent_ |= EPOLLET;
break;
case 2:
listenEvent_ |= EPOLLET;
break;
case 3:
listenEvent_ |= EPOLLET;
connEvent_ |= EPOLLET;
break;
default:
listenEvent_ |= EPOLLET;
connEvent_ |= EPOLLET;
break;
}
HttpConn::isET = (connEvent_ & EPOLLET);
}

void WebServer::Start()
{
int timeMS = -1;
if(!isClose_) {LOG_INFO("========== Server start ==========");}
while(!isClose_)
{
if(timeoutMS_ > 0)
{
timeMS = timer_->GetNextTick();
}
int eventCnt = epoller_->Wait(timeMS);
for(int i = 0; i < eventCnt; i++)
{
int fd = epoller_->GetEventFd(i);
uint32_t events = epoller_->GetEvents(i);
if(fd == listenFd_)
{
DealListen_();
}
else if(events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR))
{
assert(users_.count(fd) > 0);
CloseConn_(&users_[fd]);
}
else if(events & EPOLLIN)
{
assert(users_.count(fd) > 0);
DealRead_(&users_[fd]);
}
else if(events & EPOLLOUT)
{
assert(users_.count(fd) > 0);
DealWrite_(&users_[fd]);
} else {
LOG_ERROR("Unexpected event");
}
}
}
}

void WebServer::sendError_(int fd, const char* info)
{
assert(fd > 0);
int ret = send(fd, info, strlen(info), 0);
if(ret < 0)
{
LOG_WARN("send error to client[%d] error!", fd);
}
close(fd);
}

void WebServer::CloseConn_(HttpConn* client)
{
assert(client);
LOG_INFO("client[%d] quit!", client->GetFd());
epoller_->DelFd(client->GetFd());
client->Close();
}

void WebServer::AddClient_(int fd, sockaddr_in addr)
{
assert(fd > 0);
users_[fd].init(fd, addr);
if(timeoutMS_ > 0)
{
timer_->add(fd, timeoutMS_, std::bind(&WebServer::CloseConn_, this, &users_[fd]));
}
epoller_->AddFd(fd, EPOLLIN | connEvent_);
SetFdNonblock(fd);
LOG_INFO("Client[%d] in!", users_[fd].GetFd());
}

void WebServer::DealListen_()
{
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
do{
int fd = accept(listenFd_, (struct sockaddr*)&addr, &len);
if(fd <= 0) {return;}
else if(HttpConn::userCount >= MAX_FD)
{
sendError_(fd, "Server busy!");
LOG_WARN("Clients is full!");
return;
}
AddClient_(fd, addr);
}while(listenEvent_ & EPOLLET);
}

void WebServer::DealRead_(HttpConn *client)
{
assert(client);
ExtentTime_(client);
threadpool_->AddTask(std::bind(&WebServer::OnRead_, this, client));
}

void WebServer::DealWrite_(HttpConn *client)
{
assert(client);
ExtentTime_(client);
threadpool_->AddTask(std::bind(&WebServer::OnWrite_, this, client));
}

void WebServer::ExtentTime_(HttpConn *client)
{
assert(client);
if(timeoutMS_ > 0) {timer_->adjust(client->GetFd(), timeoutMS_);}
}

void WebServer::OnRead_(HttpConn * client)
{
assert(client);
int ret = -1;
int readErrno = 0;
ret = client->read(&readErrno);
if(ret <= 0 && readErrno != EAGAIN)
{
CloseConn_(client);
return;
}

OnProcess(client);
}

void WebServer::OnProcess(HttpConn* client)
{
if(client->process())
{
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
} else {
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLIN);
}
}

void WebServer::OnWrite_(HttpConn* client)
{
assert(client);
int ret = -1;
int writeErrno = 0;
ret = client->write(&writeErrno);
if(client->ToWriteBytes() == 0)
{
if(client->IsKeepAlive())
{
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLIN);
return;
}
}
else if(ret < 0){
if(writeErrno == EAGAIN)
{
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
return;
}
}
CloseConn_(client);
}

bool WebServer::InitSocket_()
{
int ret;
struct sockaddr_in addr;
if(port_ > 65534 || port_ < 1024)
{
LOG_ERROR("Port:%d error!", port_);
return false;
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(port_);

{
struct linger optLinger = {0};
if(openLinger_)
{
optLinger.l_linger = 1;
optLinger.l_onoff = 1;
}

listenFd_ = socket(AF_INET, SOCK_STREAM, 0);
if(listenFd_ < 0)
{
LOG_ERROR("Create socket error!", port_);
return false;
}

ret = setsockopt(listenFd_, SOL_SOCKET, SO_LINGER, &optLinger, sizeof(optLinger));
if(ret < 0)
{
close(listenFd_);
LOG_ERROR("Init linger error!", port_);
return false;
}
}

int optval = 1;

ret = setsockopt(listenFd_, SOL_SOCKET, SO_REUSEADDR, (const void*)&optval, sizeof(int));
if(ret == -1)
{
LOG_ERROR("set socket setsockopt error!");
close(listenFd_);
return false;
}

ret = bind(listenFd_, (struct sockaddr*)&addr, sizeof(addr));
if(ret < 0)
{
LOG_ERROR("Bind Port:%d error!", port_);
close(listenFd_);
return false;
}

ret = listen(listenFd_, 6);
if(ret < 0)
{
LOG_ERROR("Listen port:%d error!", port_);
close(listenFd_);
return false;
}
ret = epoller_->AddFd(listenFd_, listenEvent_ | EPOLLIN);
if(ret == 0)
{
LOG_ERROR("Add listen error!");
close(listenFd_);
return false;
}
SetFdNonblock(listenFd_);
LOG_INFO("Server port:%d", port_);
return true;
}

int WebServer::SetFdNonblock(int fd)
{
assert(fd > 0);
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFD, 0) | O_NONBLOCK);
}

思考

getcmd(source, maxlen)返回当前文件路径
EPOLLRDHUP 表示检擦sock是否关闭
EPOLLONESHOT 表示在ET模式下sock可能被重复操作,这个就是设置一个进程只操作一个
整个项目的逻辑大概的理一下:

1
Init(初始化日志,sock服务等)->添加epoll管理(线程池操作)->管理fd->wait等待fd连接->处理listen(使用wait_,初始化了一个user_使用unordered_map使得一个fd对应一个sock_addr,一个timer_同样unordered_map使得一个fd对应一个时间管理的time_node,最后将fd在epoll管理中配置为可读)->处理read(将read的任务添加进线程池,之后将其处理了将fd在epoll管理中再次配置为可写等待write)->处理write(操作流程与read相似)

总结

完整项目的阅读完成不得不感慨程序设计的精妙与机智,在阅读aworkholic大佬的分析讲解不得不感慨大佬的博识与值得钦佩。

参考

TinyWeb Server项目实现参考:
https://blog.csdn.net/weixin_51322383/article/details/130464403
muduo buffer实现过程:
https://blog.csdn.net/wanggao_1990/article/details/119426351
NIO理解:
https://blog.csdn.net/wanggao_1990/article/details/119426351
readv等高级IO函数使用:
https://wanggao1990.blog.csdn.net/article/details/109296400
iov结合readv使用:
https://blog.csdn.net/weixin_44609676/article/details/130026592
智能指针
https://zhuanlan.zhihu.com/p/642134340?utm_id=0
多线程基础

https://www.zhihu.com/tardis/bd/art/194198073?source_id=1001
生产者与消费者模式:
https://blog.csdn.net/qq_38316300/article/details/124319533
C++设计模式总结
https://blog.csdn.net/qq_38316300/category_11782458.html
lambda语法:
https://blog.csdn.net/weixin_52345097/article/details/131499309
mysql安装配置:
https://blog.csdn.net/qq_37120477/article/details/130653390
小根堆:
https://blog.csdn.net/qq_64861334/article/details/129365951
stat函数:
https://blog.csdn.net/Dustinthewine/article/details/126673326
epoll使用:
https://cloud.tencent.com/developer/article/1996941
epoll使用例子

https://zhuanlan.zhihu.com/p/665635140
什么是RAII:
https://blog.csdn.net/qq_37535749/article/details/113450219
红黑树:
https://blog.csdn.net/cy973071263/article/details/122543826
std::bind()使用的例子:
https://blog.csdn.net/qq_44237695/article/details/123435575
c++转移语义和完美转发:
https://www.bilibili.com/read/cv14005909/
c++宏的使用方法:
https://blog.csdn.net/q2453303961/article/details/125173946
c++11的引用*(看完文章才发现是腾讯的文章讲的太好了):
https://zhuanlan.zhihu.com/p/676864635