实现代码:TinyOperateSystem:C++系统 - AtomGit | GitCode

  1. 阶段1:核心模块开发
  2. 阶段2:I/O 系统开发
  3. 阶段3:前端接口开
  4. 阶段4:支持系统开发
  5. 阶段5:集成与测试
  6. 阶段6:部署

关键时序图

阶段1:核心模块开发

采用三级缓冲架构实现高性能日志处理:

  1. BufferNode:基础存储单元(4MB固定大小)
  2. LockFreeRingBuffer:基于节点链表的无锁环形缓冲区
  3. DoubleBuffer:双缓冲机制实现读写分离

1. 缓冲系统实现

1.1 BufferNode

1.1.1 设计目标

  • 固定大小缓冲区(默认4MB
  • 原子操作管理写入位置
  • 缓存行对齐避免伪共享
  • 链表结构支持动态扩展

1.1.2 关键实现

1
2
3
4
5
6
7
template<size_t SingleBufferSize = DEFAULT_BUFFER_SIZE>
struct alignas(CACHE_LINE_SIZE) BufferNode {
std::atomic<BufferNode*> next; // 原子next指针
alignas(CACHE_LINE_SIZE) std::atomic<size_t> writeIndex{0}; // 写入位置
char data[SingleBufferSize]; // 数据存储区
BufferNode() : next(nullptr), writeIndex(0) {}
};

1.1.3 优化特性

  • 64节缓存行对齐
  • 原子变量独立缓存行
  • 预分配内存空间

1.2 BufferNodePool

1.2.1 设计目标

  • 对象池管理节点生命周期
  • 限制最大节点数防内存耗尽
  • 线程安全的分配/回收机制

1.2.2 工作流程

1.2.3 关键实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
template<size_t SingleBufferSize = DEFAULT_BUFFER_SIZE>
class BufferNodePool {
public:
using BufferNode = Base::SystemLog::BufferNode<SingleBufferSize>;

explicit BufferNodePool(size_t maxNodes = MAX_BUFFER_NODES);
~BufferNodePool();

BufferNode* Allocate();
void Deallocate(BufferNode* node);
size_t GetTotalAllocatedCount() const;

private:
std::vector<BufferNode*> freeList_; // 空闲节点列表
std::vector<BufferNode*> allocatedNodes_; // 已分配节点列表
std::mutex mutex_; // 分配锁
std::atomic<size_t> totalAllocatedCount_{0}; // 节点计数
const size_t maxNodes_; // 节点上限
};

template<size_t SingleBufferSize>
BufferNodePool<SingleBufferSize>::BufferNodePool(size_t maxNodes)
: maxNodes_(maxNodes)
{}

template<size_t SingleBufferSize>
BufferNodePool<SingleBufferSize>::~BufferNodePool()
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto node : allocatedNodes_) {
free(node);
}
}

template<size_t SingleBufferSize>
auto BufferNodePool<SingleBufferSize>::Allocate() -> BufferNode*
{
// 首先尝试从空闲列表获取
{
std::lock_guard<std::mutex> lock(mutex_);
if (!freeList_.empty()) {
BufferNode* node = freeList_.back();
freeList_.pop_back();
return node;
}
}

// 检查是否达到最大节点数
if (totalAllocatedCount_.load(std::memory_order_relaxed) >= maxNodes_) {
return nullptr;
}

// 分配新节点 (额外空间存储数据)
void* memory = malloc(sizeof(BufferNode) + SingleBufferSize);
if (!memory) {
return nullptr;
}
BufferNode* node = new (memory) BufferNode();
node->writeIndex.store(0, std::memory_order_relaxed);
node->next.store(nullptr, std::memory_order_relaxed);
{
std::lock_guard<std::mutex> lock(mutex_);
allocatedNodes_.push_back(node);
}
totalAllocatedCount_.fetch_add(1, std::memory_order_relaxed);
return node;
}

template<size_t SingleBufferSize>
void BufferNodePool<SingleBufferSize>::Deallocate(BufferNode* node)
{
if (node == nullptr) {
return;
}
node->writeIndex.store(0, std::memory_order_release);
BufferNode* next = node->next.exchange(nullptr, std::memory_order_release);
if (next) {
syslog(LOG_WARNING, "deallocating node with active next pointer");
}
std::lock_guard<std::mutex> lock(mutex_);
freeList_.push_back(node);
}

template<size_t SingleBufferSize>
size_t BufferNodePool<SingleBufferSize>::GetTotalAllocatedCount() const
{
return totalAllocatedCount_.load(std::memory_order_relaxed);
}

1.2.4 异常处理

  • 分配失败返回nullptr
  • 释放时检测异常指针(如非空next指针)

1.3 LockFreeRingBuffer

1.3.1 设计目标

  • 基于BufferNode构建动态环形缓冲区
  • CAS实现无锁写入
  • 支持消息分片(>4MB消息)
  • 小数据复制优化

1.3.2 写入流程

1.3.3 关键实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
using MessageConsumer = std::function<ErrCode(const char*, size_t)>;
template<size_t SingleBufferSize = DEFAULT_BUFFER_SIZE> // 默认4MB节点
class LockFreeRingBuffer {
public:
using BufferNode = Base::SystemLog::BufferNode<SingleBufferSize>;
LockFreeRingBuffer(size_t maxNodes = MAX_BUFFER_NODES);
~LockFreeRingBuffer();
ErrCode Append(const char* msg, size_t len);
void Collect(MessageConsumer messageConsumer);
void Reset();
size_t GetSize() const;
size_t GetNodeCount() const;

private:
BufferNode* CreateNode();
ErrCode ExpandBuffer();
void CopySmallData(char* dest, const char* src, size_t len);
ErrCode HandleOversizedMessage(const char* msg, size_t len);
void HandleOutOfMemory();
void HandleNodeLimitReached();

const size_t maxNodes_;
std::shared_ptr<BufferNodePool<SingleBufferSize>> pool_;
alignas(CACHE_LINE_SIZE) std::atomic<BufferNode*> headNode_{nullptr};
alignas(CACHE_LINE_SIZE) std::atomic<BufferNode*> currentNode_{nullptr};
};

template<size_t SingleBufferSize>
LockFreeRingBuffer<SingleBufferSize>::LockFreeRingBuffer(size_t maxNodes)
: maxNodes_(maxNodes), pool_(std::make_shared<BufferNodePool<SingleBufferSize>>(maxNodes))
{
currentNode_ = CreateNode();
headNode_.store(currentNode_.load(std::memory_order_relaxed), std::memory_order_relaxed);
}

template<size_t SingleBufferSize>
LockFreeRingBuffer<SingleBufferSize>::~LockFreeRingBuffer()
{
BufferNode* node = headNode_.load(std::memory_order_acquire);
while (node) {
BufferNode* next = node->next.load(std::memory_order_relaxed);
pool_->Deallocate(node);
node = next;
}
}

template<size_t SingleBufferSize>
ErrCode LockFreeRingBuffer<SingleBufferSize>::Append(const char* msg, size_t len)
{
if (len > SingleBufferSize) {
return HandleOversizedMessage(msg, len);
}

int32_t retryCount = 0;
while(retryCount++ < MAX_RETRY_TIMES) {
BufferNode* curNode = currentNode_.load(std::memory_order_acquire);
if (curNode == nullptr) {
auto errCode = ExpandBuffer();
if (errCode != ERR_OK) {
return errCode;
}
continue;
}

// 指数退避策略减少竞争
if (retryCount > 10) {
int32_t backoff = 0;
backoff = 1 << (retryCount / 5);
for (int i = 0; i < backoff; ++i) {
_mm_pause();
}
}

size_t curIndex = curNode->writeIndex.load(std::memory_order_acquire);
if (curIndex + len > SingleBufferSize) {
auto errCode = ExpandBuffer();
if (errCode != ERR_OK) {
return errCode;
}
}
// CAS 更新写位置
if (curNode->writeIndex.compare_exchange_weak(curIndex, curIndex + len,
std::memory_order_acq_rel, std::memory_order_acquire)) {
if (len <= CACHE_LINE_SIZE) {
CopySmallData(curNode->data + curIndex, msg, len);
} else {
__builtin_memcpy(curNode->data + curIndex, msg, len);
}
return ERR_OK;
}
}
return ERR_BUFFER_APPEND_FAILED;
}

template<size_t SingleBufferSize>
void LockFreeRingBuffer<SingleBufferSize>::Collect(MessageConsumer messageConsumer)
{
BufferNode* node = headNode_.load(std::memory_order_acquire);
while (node) {
size_t writeIndex = node->writeIndex.load(std::memory_order_acquire);
if (writeIndex > 0) {
auto errCode = messageConsumer(node->data, writeIndex);
if (errCode != ERR_OK) {
syslog(LOG_ERR, "messageConsumer function failed: %d", errCode);
}
}
BufferNode* next = node->next.load(std::memory_order_relaxed);
if (next == nullptr) {
break;
}
node = next;
}
}

template<size_t SingleBufferSize>
void LockFreeRingBuffer<SingleBufferSize>::Reset()
{
BufferNode* node = headNode_.load(std::memory_order_acquire);
while (node) {
node->writeIndex.store(0, std::memory_order_release);
BufferNode* next = node->next.load(std::memory_order_relaxed);
if (next == nullptr) {
break;
}
node = next;
}
currentNode_.store(headNode_.load(std::memory_order_acquire), std::memory_order_release);
}

template<size_t SingleBufferSize>
size_t LockFreeRingBuffer<SingleBufferSize>::GetSize() const
{
size_t size = 0;
BufferNode* node = headNode_.load(std::memory_order_acquire);
while (node) {
size += node->writeIndex.load(std::memory_order_relaxed);
BufferNode* next = node->next.load(std::memory_order_relaxed);
if (next == nullptr) {
break;
}
node = next;
}
return size;
}

template<size_t SingleBufferSize>
size_t LockFreeRingBuffer<SingleBufferSize>::GetNodeCount() const
{
return pool_->GetTotalAllocatedCount();
}

template<size_t SingleBufferSize>
auto LockFreeRingBuffer<SingleBufferSize>::CreateNode() -> BufferNode*
{
if (pool_ == nullptr) {
syslog(LOG_ERR, "BufferNodePool is not initialized");
return nullptr;
}

BufferNode* newNode = pool_->Allocate();
if (newNode == nullptr) {
HandleOutOfMemory();
return nullptr;
}
newNode->writeIndex.store(0, std::memory_order_relaxed);
newNode->next.store(nullptr, std::memory_order_relaxed);
__builtin_memset(newNode->data, 0, SingleBufferSize);
return newNode;
}

template<size_t SingleBufferSize>
ErrCode LockFreeRingBuffer<SingleBufferSize>::ExpandBuffer()
{
BufferNode* curNode = currentNode_.load(std::memory_order_acquire);
if (curNode->next.load(std::memory_order_acquire)) {
currentNode_.store(curNode->next, std::memory_order_release);
return ERR_OK;
}

if (pool_ == nullptr) {
throw std::runtime_error("BufferNodePool is not initialized");
}
if (pool_->GetTotalAllocatedCount() >= maxNodes_) {
HandleNodeLimitReached();
return ERR_POOL_NODES_OVERFLOW;
}

BufferNode* newNode = CreateNode();
if (newNode == nullptr) {
return ERR_BUFFER_CREATE_NODE_FAILED;
}

// CAS添加新节点到链表
BufferNode* expected = nullptr;
if (curNode->next.compare_exchange_strong(expected, newNode,
std::memory_order_release, std::memory_order_relaxed)) {
currentNode_.store(newNode, std::memory_order_release);
return ERR_OK;
} else {
pool_->Deallocate(newNode);
currentNode_.store(curNode->next.load(std::memory_order_acquire), std::memory_order_release);
return ERR_OK;
}
}

// 小数据复制优化
template<size_t SingleBufferSize>
void LockFreeRingBuffer<SingleBufferSize>::CopySmallData(char* dest, const char* src, size_t len)
{
if (len == 0) {
return;
}
// 小数据使用直接复制
if (len <= sizeof(uint64_t)) {
switch (len) {
case 8:
*reinterpret_cast<uint64_t*>(dest) = *reinterpret_cast<const uint64_t*>(src);
return;
case 4:
*reinterpret_cast<uint32_t*>(dest) = *reinterpret_cast<const uint32_t*>(src);
return;
case 2:
*reinterpret_cast<uint16_t*>(dest) = *reinterpret_cast<const uint16_t*>(src);
return;
case 1:
*dest = *src;
return;
default:
break;
}
}

__builtin_memcpy(dest, src, len);
}

template<size_t SingleBufferSize>
ErrCode LockFreeRingBuffer<SingleBufferSize>::HandleOversizedMessage(const char* msg, size_t len)
{
size_t written = 0;
while (written < len) {
size_t chunkSize = std::min(SingleBufferSize, len - written);
auto errCode = Append(msg + written, chunkSize);
if (errCode != ERR_OK) {
return errCode;
}
written += chunkSize;
}
return ERR_OK;
}

template<size_t SingleBufferSize>
void LockFreeRingBuffer<SingleBufferSize>::HandleOutOfMemory()
{
static int32_t oomCount = 0;
const char* warnMsg = "WARNING: Log buffer out of memory!\n";
auto result = write(STDERR_FILENO, warnMsg, strlen(warnMsg));
if (result == -1) {
perror("Failed to write warning message");
}
oomCount++;

// 每10次OOM重启内存
if (oomCount > 10) {
oomCount = 0;
Reset();
}
}

template<size_t SingleBufferSize>
void LockFreeRingBuffer<SingleBufferSize>::HandleNodeLimitReached()
{
static int limitCount = 0;
limitCount++;
if (limitCount > 100) {
limitCount = 0;
Reset();
}
}

1.3.4 性能优化

  • 小数据直接复制(避免memcpy
  • 指数退避缓解CAS竞争
  • 分片处理超大消息

1.4 DoubleBuffer

1.4.1 设计目标

  • 双缓冲(Front/Back)机制
  • 安全交换保证数据一致性
  • 读写分离提高并发性

1.4.2 状态转换

1.4.3 关键实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
template<size_t SingleBufferSize = DEFAULT_BUFFER_SIZE> // 默认4MB节点
class DoubleBuffer {
public:
using LockFreeRingBuffer = Base::SystemLog::LockFreeRingBuffer<SingleBufferSize>;

DoubleBuffer()
: frontBuffer_(std::make_unique<LockFreeRingBuffer>()), backBuffer_(std::make_unique<LockFreeRingBuffer>()) {}

ErrCode Append(const char* msg, size_t len);

LockFreeRingBuffer &GetBackBuffer() {
return *backBuffer_;
}

ErrCode SafeSwap();

void EmergencySwap();

void ReleaseBackBuffer();

size_t GetFrontSize() const {
return frontBuffer_->GetSize();
}

size_t GetBackSize() const {
return backBuffer_->GetSize();
}

bool IsBackBufferInUse() const {
return backBufferInUse_;
}

private:
void InnerSwapBuffers() {
backBufferInUse_ = true;
std::swap(frontBuffer_, backBuffer_);
}

std::unique_ptr<LockFreeRingBuffer> frontBuffer_;
std::unique_ptr<LockFreeRingBuffer> backBuffer_;
mutable std::mutex swapMutex_;
std::condition_variable swapCondition_;
bool backBufferInUse_ = false;
std::atomic<bool> swapping_{false};
};

template<size_t SingleBufferSize>
ErrCode DoubleBuffer<SingleBufferSize>::Append(const char* msg, size_t len)
{
if (!swapping_.load(std::memory_order_acquire)) {
return frontBuffer_->Append(msg, len);
}
std::lock_guard<std::mutex> lock(swapMutex_);
return frontBuffer_->Append(msg, len);
}

template<size_t SingleBufferSize>
ErrCode DoubleBuffer<SingleBufferSize>::SafeSwap()
{
if (swapping_.exchange(true)) {
return ERR_OPERATION_IN_PROGRESS;
}
std::unique_lock<std::mutex> lock(swapMutex_);
auto bakBackBufferInUse = backBufferInUse_;
swapping_.store(true, std::memory_order_release);
swapCondition_.wait(lock, [this] {
return !backBufferInUse_;
});
InnerSwapBuffers();
swapping_.store(false, std::memory_order_release);
backBufferInUse_ = bakBackBufferInUse;
return ERR_OK;
}

template<size_t SingleBufferSize>
void DoubleBuffer<SingleBufferSize>::EmergencySwap()
{
std::unique_lock<std::mutex> lock(swapMutex_, std::try_to_lock);
if (!lock.owns_lock()) {
syslog(LOG_ERR, "EmergencySwap failed to acquire lock");
return;
}
swapping_.store(true, std::memory_order_release);
InnerSwapBuffers();
frontBuffer_->Reset();
swapping_.store(false, std::memory_order_release);
}

template<size_t SingleBufferSize>
void DoubleBuffer<SingleBufferSize>::ReleaseBackBuffer()
{
{
std::lock_guard<std::mutex> lock(swapMutex_);
backBufferInUse_ = false;
backBuffer_->Reset();
}
swapCondition_.notify_one();
}

2. 异步调度器实现

2.1 AsyncLogger架构

2.2 核心机制

  • 双触发条件
    • 定时刷新(默认1000ms
    • 阈值触发(软阈值2MB/硬阈值3.8MB
  • 三级缓冲状态
    • 前端缓冲:接收新数据
    • 后端缓冲:后台处理中
    • 已释放缓冲:可复用状态

2.3 关键实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
using BackProcessTaskCallback = MessageConsumer;
templatetemplate<size_t SingleBufferSize = DEFAULT_BUFFER_SIZE> // 默认4MB节点
class AsyncLogger {
public:
using DoubleBuffer = Base::SystemLog::DoubleBuffer<SingleBufferSize>;
using LockFreeRingBuffer = Base::SystemLog::LockFreeRingBuffer<SingleBufferSize>;

explicit AsyncLogger(BackProcessTaskCallback taskCallback);
~AsyncLogger();

// 写入日志(非阻塞)
ErrCode Append(const char* message, size_t len);
// 立即刷新日志(阻塞)
ErrCode Flush();
// 启动后台线程
ErrCode Start();
// 停止并刷新剩余日志
ErrCode Stop();

void SetFlushInterval(uint32_t milliseconds) {
flushInterval_ = std::chrono::milliseconds(milliseconds);
}

void SetFlushThreshold(size_t softFlushThreshold = DEFAULT_SOFT_FLUSH_THRESHOLD,
size_t hardFlushThreshold = DEFAULT_HARD_FLUSH_THRESHOLD) {
softFlushThreshold_ = softFlushThreshold;
hardFlushThreshold_ = hardFlushThreshold;
}

private:
void BackendThreadFunc();
void ProcessBuffer(LockFreeRingBuffer &buffer);
void EmergencyFlush();

BackProcessTaskCallback taskCallback_;
DoubleBuffer doubleBuffer_;
std::unique_ptr<std::thread> backendThread_;
std::atomic<bool> running_{false};
std::atomic<bool> forceFlush_{false};
std::atomic<bool> processing_{false};
std::mutex mutex_;
std::condition_variable cond_;
std::chrono::milliseconds flushInterval_{DEFAULT_FLUSH_INTERVAL_MS};
size_t softFlushThreshold_{DEFAULT_SOFT_FLUSH_THRESHOLD};
size_t hardFlushThreshold_{DEFAULT_HARD_FLUSH_THRESHOLD};
};

template<size_t SingleBufferSize>
AsyncLogger<SingleBufferSize>::AsyncLogger(BackProcessTaskCallback taskCallback)
: taskCallback_(std::move(taskCallback))
{
if (!taskCallback_) {
throw std::invalid_argument("taskCallback function must be valid");
}
}

template<size_t SingleBufferSize>
AsyncLogger<SingleBufferSize>::~AsyncLogger()
{
try {
Stop();
} catch (...) {
syslog(LOG_ERR, "exception in AsyncLogger destructor");
}
}

template<size_t SingleBufferSize>
ErrCode AsyncLogger<SingleBufferSize>::Start()
{
if (running_.load(std::memory_order_acquire)) {
return ERR_ALREADY_RUNNING;
}
running_.store(true, std::memory_order_release);
forceFlush_.store(false, std::memory_order_release);
processing_.store(false, std::memory_order_release);
try {
backendThread_ = std::make_unique<std::thread>([this] {
BackendThreadFunc();
});
auto handle = backendThread_->native_handle();
pthread_setname_np(handle, "log_backend");
return ERR_OK;
} catch (const std::system_error &e) {
syslog(LOG_CRIT, "failed to start backend thread: %s", e.what());
running_.store(false, std::memory_order_release);
return ERR_THREAD_START_FAILED;
}
}

template<size_t SingleBufferSize>
ErrCode AsyncLogger<SingleBufferSize>::Stop()
{
if (!running_.load(std::memory_order_acquire)) {
return ERR_NOT_RUNNING;
}
running_.store(false, std::memory_order_release);
{
std::lock_guard<std::mutex> lock(mutex_);
cond_.notify_one();
}
try {
if (backendThread_ && backendThread_->joinable()) {
backendThread_->join();
}
if (doubleBuffer_.GetFrontSize() > 0) {
EmergencyFlush();
}
return ERR_OK;
} catch (const std::system_error &e) {
syslog(LOG_ERR, "failed to join backend thread: %s", e.what());
return ERR_THREAD_JOIN_FAILED;
}
}

template<size_t SingleBufferSize>
ErrCode AsyncLogger<SingleBufferSize>::Append(const char* message, size_t len)
{
if (!running_.load(std::memory_order_acquire)) {
return ERR_LOGGER_NOT_RUNNING;
}
if (!message || len == 0) {
return ERR_INVALID_ARGUMENT;
}

auto errCode = doubleBuffer_.Append(message, len);
if (errCode != ERR_OK) {
return errCode;
}
if (doubleBuffer_.GetFrontSize() >= softFlushThreshold_ && !processing_.load(std::memory_order_acquire)) {
std::lock_guard<std::mutex> lock(mutex_);
cond_.notify_one();
return ERR_OK;
}
if (doubleBuffer_.GetFrontSize() >= hardFlushThreshold_) {
std::lock_guard<std::mutex> lock(mutex_);
forceFlush_.store(true, std::memory_order_release);
cond_.notify_one();
return ERR_OK;
}
return ERR_OK;
}

template<size_t SingleBufferSize>
ErrCode AsyncLogger<SingleBufferSize>::Flush()
{
if (!running_.load(std::memory_order_acquire)) {
return ERR_LOGGER_NOT_RUNNING;
}
{
std::lock_guard<std::mutex> lock(mutex_);
forceFlush_.store(true, std::memory_order_release);
cond_.notify_one();
}
for (int i = 0; i < MAX_RETRY_TIMES; ++i) {
if (doubleBuffer_.GetFrontSize() == 0 &&
!processing_.load(std::memory_order_acquire)) {
return ERR_OK;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return ERR_FLUSH_TIMEOUT;
}

template<size_t SingleBufferSize>
void AsyncLogger<SingleBufferSize>::BackendThreadFunc()
{
while (running_.load(std::memory_order_acquire)) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait_for(lock, flushInterval_, [this] {
return forceFlush_.load(std::memory_order_acquire) || !running_.load(std::memory_order_acquire) ||
doubleBuffer_.GetFrontSize() >= softFlushThreshold_;
});
if (!running_.load(std::memory_order_acquire)) {
break;
}
if (doubleBuffer_.GetFrontSize() == 0) {
continue;
}

processing_.store(true, std::memory_order_release);
if (auto errCode = doubleBuffer_.SafeSwap(); errCode != ERR_OK) {
syslog(LOG_ERR, "SafeSwap failed: %d", errCode);
processing_.store(false, std::memory_order_release);
continue;
}
lock.unlock();

try {
auto &backBuffer = doubleBuffer_.GetBackBuffer();
ProcessBuffer(backBuffer);
} catch (const std::exception &e) {
syslog(LOG_CRIT, "exception during buffer processing: %s", e.what());
}
doubleBuffer_.ReleaseBackBuffer();
processing_.store(false, std::memory_order_release);
}
}

template<size_t SingleBufferSize>
void AsyncLogger<SingleBufferSize>::ProcessBuffer(LockFreeRingBuffer &buffer)
{
const size_t dataSize = buffer.GetSize();
if (dataSize == 0) return;

buffer.Collect(taskCallback_);
}

template<size_t SingleBufferSize>
void AsyncLogger<SingleBufferSize>::EmergencyFlush()
{
syslog(LOG_WARNING, "Performing emergency flush");

if (doubleBuffer_.SafeSwap() != ERR_OK) {
doubleBuffer_.EmergencySwap();
}

auto &backBuffer = doubleBuffer_.GetBackBuffer();
ProcessBuffer(backBuffer);
doubleBuffer_.ReleaseBackBuffer();
}

2.4 后台线程流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void BackendThreadFunc() {
while (running_) {
std::unique_lock lock(mutex_);
// 等待:刷新间隔到达/强制刷新信号/达到软阈值
cond_.wait_for(lock, flushInterval_, [this]{
return forceFlush_ || !running_ ||
buffer_.GetFrontSize() >= softThreshold_;
});

if (!running_) break;

// 执行缓冲交换
buffer_.SafeSwap();
lock.unlock();

// 处理数据(调用用户回调)
ProcessBuffer(backBuffer);

// 重置后端缓冲
buffer_.ReleaseBackBuffer();
}
}

2.5 异常处理机制

  1. 后台线程异常
    • 捕获所有异常并记录系统日志
    • 维持服务状态不崩溃
  2. 停止过程保障
    • 等待后台线程退出
    • 紧急刷新剩余日志
  3. 内存溢出防护
    • 节点数上限控制
    • 分片写入超大消息

3. 性能优化总结

  1. 无锁设计:核心写入路径无锁(CAS操作)
  2. 缓存优化:热点数据缓存行隔离
  3. 批量处理:双缓冲+定时刷新减少I/O次数
  4. 小消息优化:特化复制函数消除函数调用开销
  5. 动态扩展:按需分配避免内存浪费

4. 测试框架设计

4.1 测试架构概览

4.2 测试策略

  1. 单元测试:验证每个模块的基本功能
  2. 性能测试:评估系统在高负载下的表现
  3. 内存泄漏检测:使用GoogleTest结合Valgrind检测内存问题
  4. 多线程测试:验证并发场景下的稳定性

4.3 测试覆盖率增强

  1. 空缓冲区处理:测试空缓冲区的收集操作
  2. 单字节写入:验证最小消息的处理
  3. 缓冲区满场景:测试硬阈值触发机制
  4. 池耗尽情况:验证节点池达到上限时的行为4.3 测试覆盖率增强

4.4 内存泄漏检测方法

4.4.1 使用Valgrind运行测试

1
2
3
4
5
6
7
8
9
10
# 编译测试程序
g++ -std=c++20 -g -O0 -o log_system_test \
buffer_node_pool_test.cpp \
lock_free_ring_buffer_test.cpp \
double_buffer_test.cpp \
async_logger_test.cpp \
-lgtest -lgtest_main -lpthread

# 使用Valgrind检测内存泄漏
valgrind --leak-check=full --show-leak-kinds=all ./log_system_test

4.4.2 内存泄漏检测关键点

  1. 对象池管理:确保所有分配的BufferNode都被正确回收
  2. 环形缓冲区:验证节点链表在销毁时完全释放
  3. 双缓冲系统:检查交换过程中的资源管理
  4. 异步日志器:确认后台线程的资源清理

4.5 测试报告

内存泄漏检测结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
==190811== Memcheck, a memory error detector
==190811== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==190811== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==190811== Command: ./out/double_buffer_test
==190811== Parent PID: 2626
==190811==
==190811==
==190811== HEAP SUMMARY:
==190811== in use at exit: 0 bytes in 0 blocks
==190811== total heap usage: 549 allocs, 549 frees, 1,728,222,415 bytes allocated
==190811==
==190811== All heap blocks were freed -- no leaks are possible
==190811==
==190811== For lists of detected and suppressed errors, rerun with: -s
==190811== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

性能测试结果

1
2
3
4
5
6
7
8
9
10
[PERFORMANCE REPORT]
Success: 160000 | Failed: 0
Threads: 16
Messages per thread: 10000
Total messages: 160000
Message size: 128 bytes
Elapsed time: 1.16963 seconds
Throughput: 136796 msg/sec
Bandwidth: 16.2445 MB/sec
TotalBytes: 20480000 | expectedBytes: 20480000

4.6 持续集成方案(1.0实现)

Jenkins流水线配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
pipeline {
agent any
stages {
stage('Build') {
steps {
sh 'g++ -std=c++20 -o log_system_test *.cpp -lgtest -lgtest_main -lpthread'
}
}
stage('Test') {
steps {
sh './log_system_test'
sh 'valgrind --leak-check=full --error-exitcode=1 ./log_system_test'
}
}
stage('Coverage') {
steps {
sh 'gcov -r *.cpp'
}
}
}
}

5. 关键设计决策

设计点 方案选择 优势
同步机制 写入路径无锁+交换路径有锁 兼顾性能与正确性
内存管理 对象池+动态扩展 减少碎片+防OOM
缓冲区结构 节点链表+环形索引 避免数据拷贝
生产者-消费者 双缓冲+条件变量 解耦读写操作
内存序 精细控制 memory_order 方案选择平衡性能与一致性

通过三级缓冲架构实现高性能日志处理,在保证数据可靠性的同时,最大化系统吞吐量。

阶段2:I/O 系统开发

1. 设计思路

采用策略模式实现可扩展的I/O系统:

  1. 定义统一的IOutputStrategy接口
  2. 实现多种输出策略:控制台、文件、网络等
  3. 使用工厂模式创建输出策略实例
  4. I/O管理器组合多种输出策略

1.1 时序图 (日志写入流程)

1.2 文件滚动流程图

2. 实现

2.1 I/O策略接口

2.2 控制台输出实现

2.3 文件输出基础类

2.4 内存映射文件输出

2.5 直接I/O输出

2.6 I/O管理器

2.7 集成AsyncLogger

2.8 异步fsync优化

3. I/O性能优化策略批量写入优化

  1. 批量写入

    • 在内存映射和直接I/O中实现大块写入
    • 减少系统调用次数
  2. 异步fsync

    • 后台线程定期刷新文件
    • 避免阻塞主日志线程
  3. 写入策略对比

    策略 适用场景 优点 缺点
    标准文件I/O 通用场景 简单可靠 性能一般
    内存映射文件 高吞吐量 零拷贝高效 文件大小受限
    直接I/O 避免双重缓存 绕过页缓存 需要对齐处理
  4. 文件滚动优化

    • 按大小滚动:防止单个文件过大
    • 按时间滚动:方便日志归档
    • 滚动时异步执行,减少阻塞

4. 性能测试结果

1
empty

5. 设计总结

  1. 可扩展架构
    • 策略模式支持灵活添加新输出方式
    • 工厂模式简化输出策略创建
  2. 高性能设计
    • 内存映射文件实现零拷贝写入
    • 直接I/O绕过系统缓存
    • 异步fsync减少I/O阻塞
  3. 健壮性保障
    • 文件滚动防止磁盘空间耗尽
    • 异常处理确保系统稳定
    • 回退机制(O_DIRECT失败时使用普通模式)
  4. 资源管理
    • RAII管理文件描述符和内存映射
    • 对象生命周期明确

I/O系统满足高性能日志处理需求,通过多种优化策略实现每秒GB级的日志处理能力,同时保持系统稳定性和可扩展性。

阶段3:前端接口开发

1. 流式API

1.1 实现LogStream

  • 运算符重载
  • 类型安全拼接
  • 自动刷新机制

2. 格式化与宏

2.1 实现LogFormatter

  • 零拷贝格式化
  • 时间戳优化

2.2 日志宏系统

  • 编译期过滤
  • 文件/行号记录

阶段4:支持系统开发

1. 配置管理

  • 运行时配置加载
  • 热更新支持
  • 分级日志控制

2. 监控系统

  • 性能指标收集
  • 资源使用监控
  • 警报阈值设置

3. 资源治理

  • 内存池实现
  • OOM防护机制
  • 动态资源调整

阶段5:集成与测试

1. 系统集成

  • 模块间接口对接
  • 依赖关系解决
  • 初始化流程

2. 性能测试

测试类型 指标 目标值
单线程吞吐量 日志条数/秒 >500,000
多线程扩展性 16线程吞吐量 >1,200,000
延迟测试 单条日志处理时间 <700ns
崩溃恢复 最后日志丢失时间窗口 <100ms
内存压力 最大内存占用 <512MB

3. 稳定性测试

  • 72小时持续运行
  • 高负载压力测试
  • 随机崩溃注入

阶段6:部署

1. 部署方案

风险管理

1. 无锁编程复杂性

  • 缓解:使用TSAN检测数据竞争
  • 缓解:充分压力测试

2. 崩溃安全可靠性

  • 缓解:多平台信号处理测试
  • 缓解:使用async-signal-safe函数

3. 性能优化瓶颈

  • 缓解:使用perf工具分析热点
  • 缓解:多级性能指标监控

最终检查清单

  • 无锁队列通过并发压力测试
  • 崩溃时日志完整性验证
  • 达到1.2M logs/sec吞吐量
  • 内存使用不超过512MB上限
  • 支持动态配置更新
  • 完善的监控指标输出
  • 文档覆盖所有使用场景

这个实现计划从核心模块开始逐步扩展到完整系统,每阶段都有明确的交付物和验证标准。建议从缓冲系统开始实现,因为它是最关键的性能基础。在实现过程中,持续进行性能测试和内存分析,确保达到设计目标。