include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp
79.6% Lines (393/494)
85.4% Functions (41/48)
| Line | TLA | Hits | Source Code |
|---|---|---|---|
| 1 | // | ||
| 2 | // Copyright (c) 2026 Steve Gerbino | ||
| 3 | // | ||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | ||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | ||
| 6 | // | ||
| 7 | // Official repository: https://github.com/cppalliance/corosio | ||
| 8 | // | ||
| 9 | |||
| 10 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | ||
| 11 | #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | ||
| 12 | |||
| 13 | #include <boost/corosio/detail/platform.hpp> | ||
| 14 | |||
| 15 | #if BOOST_COROSIO_HAS_EPOLL | ||
| 16 | |||
| 17 | #include <boost/corosio/detail/config.hpp> | ||
| 18 | #include <boost/capy/ex/execution_context.hpp> | ||
| 19 | |||
| 20 | #include <boost/corosio/native/native_scheduler.hpp> | ||
| 21 | #include <boost/corosio/detail/scheduler_op.hpp> | ||
| 22 | |||
| 23 | #include <boost/corosio/native/detail/epoll/epoll_op.hpp> | ||
| 24 | #include <boost/corosio/detail/timer_service.hpp> | ||
| 25 | #include <boost/corosio/detail/make_err.hpp> | ||
| 26 | #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp> | ||
| 27 | #include <boost/corosio/native/detail/posix/posix_signal_service.hpp> | ||
| 28 | |||
| 29 | #include <boost/corosio/detail/except.hpp> | ||
| 30 | #include <boost/corosio/detail/thread_local_ptr.hpp> | ||
| 31 | |||
| 32 | #include <atomic> | ||
| 33 | #include <chrono> | ||
| 34 | #include <condition_variable> | ||
| 35 | #include <cstddef> | ||
| 36 | #include <cstdint> | ||
| 37 | #include <limits> | ||
| 38 | #include <mutex> | ||
| 39 | #include <utility> | ||
| 40 | |||
| 41 | #include <errno.h> | ||
| 42 | #include <fcntl.h> | ||
| 43 | #include <sys/epoll.h> | ||
| 44 | #include <sys/eventfd.h> | ||
| 45 | #include <sys/socket.h> | ||
| 46 | #include <sys/timerfd.h> | ||
| 47 | #include <unistd.h> | ||
| 48 | |||
| 49 | namespace boost::corosio::detail { | ||
| 50 | |||
| 51 | struct epoll_op; | ||
| 52 | struct descriptor_state; | ||
| 53 | namespace epoll { | ||
| 54 | struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context; | ||
| 55 | } // namespace epoll | ||
| 56 | |||
| 57 | /** Linux scheduler using epoll for I/O multiplexing. | ||
| 58 | |||
| 59 | This scheduler implements the scheduler interface using Linux epoll | ||
| 60 | for efficient I/O event notification. It uses a single reactor model | ||
| 61 | where one thread runs epoll_wait while other threads | ||
| 62 | wait on a condition variable for handler work. This design provides: | ||
| 63 | |||
| 64 | - Handler parallelism: N posted handlers can execute on N threads | ||
| 65 | - No thundering herd: condition_variable wakes exactly one thread | ||
| 66 | - IOCP parity: Behavior matches Windows I/O completion port semantics | ||
| 67 | |||
| 68 | When threads call run(), they first try to execute queued handlers. | ||
| 69 | If the queue is empty and no reactor is running, one thread becomes | ||
| 70 | the reactor and runs epoll_wait. Other threads wait on a condition | ||
| 71 | variable until handlers are available. | ||
| 72 | |||
| 73 | @par Thread Safety | ||
| 74 | All public member functions are thread-safe. | ||
| 75 | */ | ||
| 76 | class BOOST_COROSIO_DECL epoll_scheduler final | ||
| 77 | : public native_scheduler | ||
| 78 | , public capy::execution_context::service | ||
| 79 | { | ||
| 80 | public: | ||
| 81 | using key_type = scheduler; | ||
| 82 | |||
| 83 | /** Construct the scheduler. | ||
| 84 | |||
| 85 | Creates an epoll instance, eventfd for reactor interruption, | ||
| 86 | and timerfd for kernel-managed timer expiry. | ||
| 87 | |||
| 88 | @param ctx Reference to the owning execution_context. | ||
| 89 | @param concurrency_hint Hint for expected thread count (unused). | ||
| 90 | */ | ||
| 91 | epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1); | ||
| 92 | |||
| 93 | /// Destroy the scheduler. | ||
| 94 | ~epoll_scheduler() override; | ||
| 95 | |||
| 96 | epoll_scheduler(epoll_scheduler const&) = delete; | ||
| 97 | epoll_scheduler& operator=(epoll_scheduler const&) = delete; | ||
| 98 | |||
| 99 | void shutdown() override; | ||
| 100 | void post(std::coroutine_handle<> h) const override; | ||
| 101 | void post(scheduler_op* h) const override; | ||
| 102 | bool running_in_this_thread() const noexcept override; | ||
| 103 | void stop() override; | ||
| 104 | bool stopped() const noexcept override; | ||
| 105 | void restart() override; | ||
| 106 | std::size_t run() override; | ||
| 107 | std::size_t run_one() override; | ||
| 108 | std::size_t wait_one(long usec) override; | ||
| 109 | std::size_t poll() override; | ||
| 110 | std::size_t poll_one() override; | ||
| 111 | |||
| 112 | /** Return the epoll file descriptor. | ||
| 113 | |||
| 114 | Used by socket services to register file descriptors | ||
| 115 | for I/O event notification. | ||
| 116 | |||
| 117 | @return The epoll file descriptor. | ||
| 118 | */ | ||
| 119 | int epoll_fd() const noexcept | ||
| 120 | { | ||
| 121 | return epoll_fd_; | ||
| 122 | } | ||
| 123 | |||
| 124 | /** Reset the thread's inline completion budget. | ||
| 125 | |||
| 126 | Called at the start of each posted completion handler to | ||
| 127 | grant a fresh budget for speculative inline completions. | ||
| 128 | */ | ||
| 129 | void reset_inline_budget() const noexcept; | ||
| 130 | |||
| 131 | /** Consume one unit of inline budget if available. | ||
| 132 | |||
| 133 | @return True if budget was available and consumed. | ||
| 134 | */ | ||
| 135 | bool try_consume_inline_budget() const noexcept; | ||
| 136 | |||
| 137 | /** Register a descriptor for persistent monitoring. | ||
| 138 | |||
| 139 | The fd is registered once and stays registered until explicitly | ||
| 140 | deregistered. Events are dispatched via descriptor_state which | ||
| 141 | tracks pending read/write/connect operations. | ||
| 142 | |||
| 143 | @param fd The file descriptor to register. | ||
| 144 | @param desc Pointer to descriptor data (stored in epoll_event.data.ptr). | ||
| 145 | */ | ||
| 146 | void register_descriptor(int fd, descriptor_state* desc) const; | ||
| 147 | |||
| 148 | /** Deregister a persistently registered descriptor. | ||
| 149 | |||
| 150 | @param fd The file descriptor to deregister. | ||
| 151 | */ | ||
| 152 | void deregister_descriptor(int fd) const; | ||
| 153 | |||
| 154 | void work_started() noexcept override; | ||
| 155 | void work_finished() noexcept override; | ||
| 156 | |||
| 157 | /** Offset a forthcoming work_finished from work_cleanup. | ||
| 158 | |||
| 159 | Called by descriptor_state when all I/O returned EAGAIN and no | ||
| 160 | handler will be executed. Must be called from a scheduler thread. | ||
| 161 | */ | ||
| 162 | void compensating_work_started() const noexcept; | ||
| 163 | |||
| 164 | /** Drain work from thread context's private queue to global queue. | ||
| 165 | |||
| 166 | Called by thread_context_guard destructor when a thread exits run(). | ||
| 167 | Transfers pending work to the global queue under mutex protection. | ||
| 168 | |||
| 169 | @param queue The private queue to drain. | ||
| 170 | @param count Item count for wakeup decisions (wakes other threads if positive). | ||
| 171 | */ | ||
| 172 | void drain_thread_queue(op_queue& queue, long count) const; | ||
| 173 | |||
| 174 | /** Post completed operations for deferred invocation. | ||
| 175 | |||
| 176 | If called from a thread running this scheduler, operations go to | ||
| 177 | the thread's private queue (fast path). Otherwise, operations are | ||
| 178 | added to the global queue under mutex and a waiter is signaled. | ||
| 179 | |||
| 180 | @par Preconditions | ||
| 181 | work_started() must have been called for each operation. | ||
| 182 | |||
| 183 | @param ops Queue of operations to post. | ||
| 184 | */ | ||
| 185 | void post_deferred_completions(op_queue& ops) const; | ||
| 186 | |||
| 187 | private: | ||
| 188 | struct work_cleanup | ||
| 189 | { | ||
| 190 | epoll_scheduler* scheduler; | ||
| 191 | std::unique_lock<std::mutex>* lock; | ||
| 192 | epoll::scheduler_context* ctx; | ||
| 193 | ~work_cleanup(); | ||
| 194 | }; | ||
| 195 | |||
| 196 | struct task_cleanup | ||
| 197 | { | ||
| 198 | epoll_scheduler const* scheduler; | ||
| 199 | std::unique_lock<std::mutex>* lock; | ||
| 200 | epoll::scheduler_context* ctx; | ||
| 201 | ~task_cleanup(); | ||
| 202 | }; | ||
| 203 | |||
| 204 | std::size_t do_one( | ||
| 205 | std::unique_lock<std::mutex>& lock, | ||
| 206 | long timeout_us, | ||
| 207 | epoll::scheduler_context* ctx); | ||
| 208 | void | ||
| 209 | run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx); | ||
| 210 | void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const; | ||
| 211 | void interrupt_reactor() const; | ||
| 212 | void update_timerfd() const; | ||
| 213 | |||
| 214 | /** Set the signaled state and wake all waiting threads. | ||
| 215 | |||
| 216 | @par Preconditions | ||
| 217 | Mutex must be held. | ||
| 218 | |||
| 219 | @param lock The held mutex lock. | ||
| 220 | */ | ||
| 221 | void signal_all(std::unique_lock<std::mutex>& lock) const; | ||
| 222 | |||
| 223 | /** Set the signaled state and wake one waiter if any exist. | ||
| 224 | |||
| 225 | Only unlocks and signals if at least one thread is waiting. | ||
| 226 | Use this when the caller needs to perform a fallback action | ||
| 227 | (such as interrupting the reactor) when no waiters exist. | ||
| 228 | |||
| 229 | @par Preconditions | ||
| 230 | Mutex must be held. | ||
| 231 | |||
| 232 | @param lock The held mutex lock. | ||
| 233 | |||
| 234 | @return `true` if unlocked and signaled, `false` if lock still held. | ||
| 235 | */ | ||
| 236 | bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const; | ||
| 237 | |||
| 238 | /** Set the signaled state, unlock, and wake one waiter if any exist. | ||
| 239 | |||
| 240 | Always unlocks the mutex. Use this when the caller will release | ||
| 241 | the lock regardless of whether a waiter exists. | ||
| 242 | |||
| 243 | @par Preconditions | ||
| 244 | Mutex must be held. | ||
| 245 | |||
| 246 | @param lock The held mutex lock. | ||
| 247 | |||
| 248 | @return `true` if a waiter was signaled, `false` otherwise. | ||
| 249 | */ | ||
| 250 | bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const; | ||
| 251 | |||
| 252 | /** Clear the signaled state before waiting. | ||
| 253 | |||
| 254 | @par Preconditions | ||
| 255 | Mutex must be held. | ||
| 256 | */ | ||
| 257 | void clear_signal() const; | ||
| 258 | |||
| 259 | /** Block until the signaled state is set. | ||
| 260 | |||
| 261 | Returns immediately if already signaled (fast-path). Otherwise | ||
| 262 | increments the waiter count, waits on the condition variable, | ||
| 263 | and decrements the waiter count upon waking. | ||
| 264 | |||
| 265 | @par Preconditions | ||
| 266 | Mutex must be held. | ||
| 267 | |||
| 268 | @param lock The held mutex lock. | ||
| 269 | */ | ||
| 270 | void wait_for_signal(std::unique_lock<std::mutex>& lock) const; | ||
| 271 | |||
| 272 | /** Block until signaled or timeout expires. | ||
| 273 | |||
| 274 | @par Preconditions | ||
| 275 | Mutex must be held. | ||
| 276 | |||
| 277 | @param lock The held mutex lock. | ||
| 278 | @param timeout_us Maximum time to wait in microseconds. | ||
| 279 | */ | ||
| 280 | void wait_for_signal_for( | ||
| 281 | std::unique_lock<std::mutex>& lock, long timeout_us) const; | ||
| 282 | |||
| 283 | int epoll_fd_; | ||
| 284 | int event_fd_; // for interrupting reactor | ||
| 285 | int timer_fd_; // timerfd for kernel-managed timer expiry | ||
| 286 | mutable std::mutex mutex_; | ||
| 287 | mutable std::condition_variable cond_; | ||
| 288 | mutable op_queue completed_ops_; | ||
| 289 | mutable std::atomic<long> outstanding_work_; | ||
| 290 | bool stopped_; | ||
| 291 | bool shutdown_; | ||
| 292 | |||
| 293 | // True while a thread is blocked in epoll_wait. Used by | ||
| 294 | // wake_one_thread_and_unlock and work_finished to know when | ||
| 295 | // an eventfd interrupt is needed instead of a condvar signal. | ||
| 296 | mutable std::atomic<bool> task_running_{false}; | ||
| 297 | |||
| 298 | // True when the reactor has been told to do a non-blocking poll | ||
| 299 | // (more handlers queued or poll mode). Prevents redundant eventfd | ||
| 300 | // writes and controls the epoll_wait timeout. | ||
| 301 | mutable bool task_interrupted_ = false; | ||
| 302 | |||
| 303 | // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2) | ||
| 304 | mutable std::size_t state_ = 0; | ||
| 305 | |||
| 306 | // Edge-triggered eventfd state | ||
| 307 | mutable std::atomic<bool> eventfd_armed_{false}; | ||
| 308 | |||
| 309 | // Set when the earliest timer changes; flushed before epoll_wait | ||
| 310 | // blocks. Avoids timerfd_settime syscalls for timers that are | ||
| 311 | // scheduled then cancelled without being waited on. | ||
| 312 | mutable std::atomic<bool> timerfd_stale_{false}; | ||
| 313 | |||
| 314 | // Sentinel operation for interleaving reactor runs with handler execution. | ||
| 315 | // Ensures the reactor runs periodically even when handlers are continuously | ||
| 316 | // posted, preventing starvation of I/O events, timers, and signals. | ||
| 317 | struct task_op final : scheduler_op | ||
| 318 | { | ||
| 319 | ✗ | void operator()() override {} | |
| 320 | ✗ | void destroy() override {} | |
| 321 | }; | ||
| 322 | task_op task_op_; | ||
| 323 | }; | ||
| 324 | |||
| 325 | //-------------------------------------------------------------------------- | ||
| 326 | // | ||
| 327 | // Implementation | ||
| 328 | // | ||
| 329 | //-------------------------------------------------------------------------- | ||
| 330 | |||
| 331 | /* | ||
| 332 | epoll Scheduler - Single Reactor Model | ||
| 333 | ====================================== | ||
| 334 | |||
| 335 | This scheduler uses a thread coordination strategy to provide handler | ||
| 336 | parallelism and avoid the thundering herd problem. | ||
| 337 | Instead of all threads blocking on epoll_wait(), one thread becomes the | ||
| 338 | "reactor" while others wait on a condition variable for handler work. | ||
| 339 | |||
| 340 | Thread Model | ||
| 341 | ------------ | ||
| 342 | - ONE thread runs epoll_wait() at a time (the reactor thread) | ||
| 343 | - OTHER threads wait on cond_ (condition variable) for handlers | ||
| 344 | - When work is posted, exactly one waiting thread wakes via notify_one() | ||
| 345 | - This matches Windows IOCP semantics where N posted items wake N threads | ||
| 346 | |||
| 347 | Event Loop Structure (do_one) | ||
| 348 | ----------------------------- | ||
| 349 | 1. Lock mutex, try to pop handler from queue | ||
| 350 | 2. If got handler: execute it (unlocked), return | ||
| 351 | 3. If queue empty and no reactor running: become reactor | ||
| 352 | - Run epoll_wait (unlocked), queue I/O completions, loop back | ||
| 353 | 4. If queue empty and reactor running: wait on condvar for work | ||
| 354 | |||
| 355 | The task_running_ flag ensures only one thread owns epoll_wait(). | ||
| 356 | After the reactor queues I/O completions, it loops back to try getting | ||
| 357 | a handler, giving priority to handler execution over more I/O polling. | ||
| 358 | |||
| 359 | Signaling State (state_) | ||
| 360 | ------------------------ | ||
| 361 | The state_ variable encodes two pieces of information: | ||
| 362 | - Bit 0: signaled flag (1 = signaled, persists until cleared) | ||
| 363 | - Upper bits: waiter count (each waiter adds 2 before blocking) | ||
| 364 | |||
| 365 | This allows efficient coordination: | ||
| 366 | - Signalers only call notify when waiters exist (state_ > 1) | ||
| 367 | - Waiters check if already signaled before blocking (fast-path) | ||
| 368 | |||
| 369 | Wake Coordination (wake_one_thread_and_unlock) | ||
| 370 | ---------------------------------------------- | ||
| 371 | When posting work: | ||
| 372 | - If waiters exist (state_ > 1): signal and notify_one() | ||
| 373 | - Else if reactor running: interrupt via eventfd write | ||
| 374 | - Else: no-op (thread will find work when it checks queue) | ||
| 375 | |||
| 376 | This avoids waking threads unnecessarily. With cascading wakes, | ||
| 377 | each handler execution wakes at most one additional thread if | ||
| 378 | more work exists in the queue. | ||
| 379 | |||
| 380 | Work Counting | ||
| 381 | ------------- | ||
| 382 | outstanding_work_ tracks pending operations. When it hits zero, run() | ||
| 383 | returns. Each operation increments on start, decrements on completion. | ||
| 384 | |||
| 385 | Timer Integration | ||
| 386 | ----------------- | ||
| 387 | Timers are handled by timer_service. The reactor adjusts epoll_wait | ||
| 388 | timeout to wake for the nearest timer expiry. When a new timer is | ||
| 389 | scheduled earlier than current, timer_service calls interrupt_reactor() | ||
| 390 | to re-evaluate the timeout. | ||
| 391 | */ | ||
| 392 | |||
| 393 | namespace epoll { | ||
| 394 | |||
| 395 | struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context | ||
| 396 | { | ||
| 397 | epoll_scheduler const* key; | ||
| 398 | scheduler_context* next; | ||
| 399 | op_queue private_queue; | ||
| 400 | long private_outstanding_work; | ||
| 401 | int inline_budget; | ||
| 402 | int inline_budget_max; | ||
| 403 | bool unassisted; | ||
| 404 | |||
| 405 | 191 | scheduler_context(epoll_scheduler const* k, scheduler_context* n) | |
| 406 | 191 | : key(k) | |
| 407 | 191 | , next(n) | |
| 408 | 191 | , private_outstanding_work(0) | |
| 409 | 191 | , inline_budget(0) | |
| 410 | 191 | , inline_budget_max(2) | |
| 411 | 191 | , unassisted(false) | |
| 412 | { | ||
| 413 | 191 | } | |
| 414 | }; | ||
| 415 | |||
| 416 | inline thread_local_ptr<scheduler_context> context_stack; | ||
| 417 | |||
| 418 | struct thread_context_guard | ||
| 419 | { | ||
| 420 | scheduler_context frame_; | ||
| 421 | |||
| 422 | 191 | explicit thread_context_guard(epoll_scheduler const* ctx) noexcept | |
| 423 | 191 | : frame_(ctx, context_stack.get()) | |
| 424 | { | ||
| 425 | 191 | context_stack.set(&frame_); | |
| 426 | 191 | } | |
| 427 | |||
| 428 | 191 | ~thread_context_guard() noexcept | |
| 429 | { | ||
| 430 | 191 | if (!frame_.private_queue.empty()) | |
| 431 | ✗ | frame_.key->drain_thread_queue( | |
| 432 | ✗ | frame_.private_queue, frame_.private_outstanding_work); | |
| 433 | 191 | context_stack.set(frame_.next); | |
| 434 | 191 | } | |
| 435 | }; | ||
| 436 | |||
| 437 | inline scheduler_context* | ||
| 438 | 377683 | find_context(epoll_scheduler const* self) noexcept | |
| 439 | { | ||
| 440 | 377683 | for (auto* c = context_stack.get(); c != nullptr; c = c->next) | |
| 441 | 375997 | if (c->key == self) | |
| 442 | 375997 | return c; | |
| 443 | 1686 | return nullptr; | |
| 444 | } | ||
| 445 | |||
| 446 | } // namespace epoll | ||
| 447 | |||
| 448 | inline void | ||
| 449 | 56388 | epoll_scheduler::reset_inline_budget() const noexcept | |
| 450 | { | ||
| 451 | 56388 | if (auto* ctx = epoll::find_context(this)) | |
| 452 | { | ||
| 453 | // Cap when no other thread absorbed queued work. A moderate | ||
| 454 | // cap (4) amortizes scheduling for small buffers while avoiding | ||
| 455 | // bursty I/O that fills socket buffers and stalls large transfers. | ||
| 456 | 56388 | if (ctx->unassisted) | |
| 457 | { | ||
| 458 | 56388 | ctx->inline_budget_max = 4; | |
| 459 | 56388 | ctx->inline_budget = 4; | |
| 460 | 56388 | return; | |
| 461 | } | ||
| 462 | // Ramp up when previous cycle fully consumed budget. | ||
| 463 | // Reset on partial consumption (EAGAIN hit or peer got scheduled). | ||
| 464 | ✗ | if (ctx->inline_budget == 0) | |
| 465 | ✗ | ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16); | |
| 466 | ✗ | else if (ctx->inline_budget < ctx->inline_budget_max) | |
| 467 | ✗ | ctx->inline_budget_max = 2; | |
| 468 | ✗ | ctx->inline_budget = ctx->inline_budget_max; | |
| 469 | } | ||
| 470 | } | ||
| 471 | |||
| 472 | inline bool | ||
| 473 | 231459 | epoll_scheduler::try_consume_inline_budget() const noexcept | |
| 474 | { | ||
| 475 | 231459 | if (auto* ctx = epoll::find_context(this)) | |
| 476 | { | ||
| 477 | 231459 | if (ctx->inline_budget > 0) | |
| 478 | { | ||
| 479 | 185241 | --ctx->inline_budget; | |
| 480 | 185241 | return true; | |
| 481 | } | ||
| 482 | } | ||
| 483 | 46218 | return false; | |
| 484 | } | ||
| 485 | |||
| 486 | inline void | ||
| 487 | 41097 | descriptor_state::operator()() | |
| 488 | { | ||
| 489 | 41097 | is_enqueued_.store(false, std::memory_order_relaxed); | |
| 490 | |||
| 491 | // Take ownership of impl ref set by close_socket() to prevent | ||
| 492 | // the owning impl from being freed while we're executing | ||
| 493 | 41097 | auto prevent_impl_destruction = std::move(impl_ref_); | |
| 494 | |||
| 495 | 41097 | std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire); | |
| 496 | 41097 | if (ev == 0) | |
| 497 | { | ||
| 498 | ✗ | scheduler_->compensating_work_started(); | |
| 499 | ✗ | return; | |
| 500 | } | ||
| 501 | |||
| 502 | 41097 | op_queue local_ops; | |
| 503 | |||
| 504 | 41097 | int err = 0; | |
| 505 | 41097 | if (ev & EPOLLERR) | |
| 506 | { | ||
| 507 | 1 | socklen_t len = sizeof(err); | |
| 508 | 1 | if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0) | |
| 509 | ✗ | err = errno; | |
| 510 | 1 | if (err == 0) | |
| 511 | ✗ | err = EIO; | |
| 512 | } | ||
| 513 | |||
| 514 | { | ||
| 515 | 41097 | std::lock_guard lock(mutex); | |
| 516 | 41097 | if (ev & EPOLLIN) | |
| 517 | { | ||
| 518 | 12864 | if (read_op) | |
| 519 | { | ||
| 520 | 4981 | auto* rd = read_op; | |
| 521 | 4981 | if (err) | |
| 522 | ✗ | rd->complete(err, 0); | |
| 523 | else | ||
| 524 | 4981 | rd->perform_io(); | |
| 525 | |||
| 526 | 4981 | if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK) | |
| 527 | { | ||
| 528 | ✗ | rd->errn = 0; | |
| 529 | } | ||
| 530 | else | ||
| 531 | { | ||
| 532 | 4981 | read_op = nullptr; | |
| 533 | 4981 | local_ops.push(rd); | |
| 534 | } | ||
| 535 | } | ||
| 536 | else | ||
| 537 | { | ||
| 538 | 7883 | read_ready = true; | |
| 539 | } | ||
| 540 | } | ||
| 541 | 41097 | if (ev & EPOLLOUT) | |
| 542 | { | ||
| 543 | 36120 | bool had_write_op = (connect_op || write_op); | |
| 544 | 36120 | if (connect_op) | |
| 545 | { | ||
| 546 | 4981 | auto* cn = connect_op; | |
| 547 | 4981 | if (err) | |
| 548 | 1 | cn->complete(err, 0); | |
| 549 | else | ||
| 550 | 4980 | cn->perform_io(); | |
| 551 | 4981 | connect_op = nullptr; | |
| 552 | 4981 | local_ops.push(cn); | |
| 553 | } | ||
| 554 | 36120 | if (write_op) | |
| 555 | { | ||
| 556 | ✗ | auto* wr = write_op; | |
| 557 | ✗ | if (err) | |
| 558 | ✗ | wr->complete(err, 0); | |
| 559 | else | ||
| 560 | ✗ | wr->perform_io(); | |
| 561 | |||
| 562 | ✗ | if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK) | |
| 563 | { | ||
| 564 | ✗ | wr->errn = 0; | |
| 565 | } | ||
| 566 | else | ||
| 567 | { | ||
| 568 | ✗ | write_op = nullptr; | |
| 569 | ✗ | local_ops.push(wr); | |
| 570 | } | ||
| 571 | } | ||
| 572 | 36120 | if (!had_write_op) | |
| 573 | 31139 | write_ready = true; | |
| 574 | } | ||
| 575 | 41097 | if (err) | |
| 576 | { | ||
| 577 | 1 | if (read_op) | |
| 578 | { | ||
| 579 | ✗ | read_op->complete(err, 0); | |
| 580 | ✗ | local_ops.push(std::exchange(read_op, nullptr)); | |
| 581 | } | ||
| 582 | 1 | if (write_op) | |
| 583 | { | ||
| 584 | ✗ | write_op->complete(err, 0); | |
| 585 | ✗ | local_ops.push(std::exchange(write_op, nullptr)); | |
| 586 | } | ||
| 587 | 1 | if (connect_op) | |
| 588 | { | ||
| 589 | ✗ | connect_op->complete(err, 0); | |
| 590 | ✗ | local_ops.push(std::exchange(connect_op, nullptr)); | |
| 591 | } | ||
| 592 | } | ||
| 593 | 41097 | } | |
| 594 | |||
| 595 | // Execute first handler inline — the scheduler's work_cleanup | ||
| 596 | // accounts for this as the "consumed" work item | ||
| 597 | 41097 | scheduler_op* first = local_ops.pop(); | |
| 598 | 41097 | if (first) | |
| 599 | { | ||
| 600 | 9962 | scheduler_->post_deferred_completions(local_ops); | |
| 601 | 9962 | (*first)(); | |
| 602 | } | ||
| 603 | else | ||
| 604 | { | ||
| 605 | 31135 | scheduler_->compensating_work_started(); | |
| 606 | } | ||
| 607 | 41097 | } | |
| 608 | |||
| 609 | 205 | inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int) | |
| 610 | 205 | : epoll_fd_(-1) | |
| 611 | 205 | , event_fd_(-1) | |
| 612 | 205 | , timer_fd_(-1) | |
| 613 | 205 | , outstanding_work_(0) | |
| 614 | 205 | , stopped_(false) | |
| 615 | 205 | , shutdown_(false) | |
| 616 | 205 | , task_running_{false} | |
| 617 | 205 | , task_interrupted_(false) | |
| 618 | 410 | , state_(0) | |
| 619 | { | ||
| 620 | 205 | epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC); | |
| 621 | 205 | if (epoll_fd_ < 0) | |
| 622 | ✗ | detail::throw_system_error(make_err(errno), "epoll_create1"); | |
| 623 | |||
| 624 | 205 | event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); | |
| 625 | 205 | if (event_fd_ < 0) | |
| 626 | { | ||
| 627 | ✗ | int errn = errno; | |
| 628 | ✗ | ::close(epoll_fd_); | |
| 629 | ✗ | detail::throw_system_error(make_err(errn), "eventfd"); | |
| 630 | } | ||
| 631 | |||
| 632 | 205 | timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); | |
| 633 | 205 | if (timer_fd_ < 0) | |
| 634 | { | ||
| 635 | ✗ | int errn = errno; | |
| 636 | ✗ | ::close(event_fd_); | |
| 637 | ✗ | ::close(epoll_fd_); | |
| 638 | ✗ | detail::throw_system_error(make_err(errn), "timerfd_create"); | |
| 639 | } | ||
| 640 | |||
| 641 | 205 | epoll_event ev{}; | |
| 642 | 205 | ev.events = EPOLLIN | EPOLLET; | |
| 643 | 205 | ev.data.ptr = nullptr; | |
| 644 | 205 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0) | |
| 645 | { | ||
| 646 | ✗ | int errn = errno; | |
| 647 | ✗ | ::close(timer_fd_); | |
| 648 | ✗ | ::close(event_fd_); | |
| 649 | ✗ | ::close(epoll_fd_); | |
| 650 | ✗ | detail::throw_system_error(make_err(errn), "epoll_ctl"); | |
| 651 | } | ||
| 652 | |||
| 653 | 205 | epoll_event timer_ev{}; | |
| 654 | 205 | timer_ev.events = EPOLLIN | EPOLLERR; | |
| 655 | 205 | timer_ev.data.ptr = &timer_fd_; | |
| 656 | 205 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0) | |
| 657 | { | ||
| 658 | ✗ | int errn = errno; | |
| 659 | ✗ | ::close(timer_fd_); | |
| 660 | ✗ | ::close(event_fd_); | |
| 661 | ✗ | ::close(epoll_fd_); | |
| 662 | ✗ | detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)"); | |
| 663 | } | ||
| 664 | |||
| 665 | 205 | timer_svc_ = &get_timer_service(ctx, *this); | |
| 666 | 205 | timer_svc_->set_on_earliest_changed( | |
| 667 | 5397 | timer_service::callback(this, [](void* p) { | |
| 668 | 5192 | auto* self = static_cast<epoll_scheduler*>(p); | |
| 669 | 5192 | self->timerfd_stale_.store(true, std::memory_order_release); | |
| 670 | 5192 | if (self->task_running_.load(std::memory_order_acquire)) | |
| 671 | ✗ | self->interrupt_reactor(); | |
| 672 | 5192 | })); | |
| 673 | |||
| 674 | // Initialize resolver service | ||
| 675 | 205 | get_resolver_service(ctx, *this); | |
| 676 | |||
| 677 | // Initialize signal service | ||
| 678 | 205 | get_signal_service(ctx, *this); | |
| 679 | |||
| 680 | // Push task sentinel to interleave reactor runs with handler execution | ||
| 681 | 205 | completed_ops_.push(&task_op_); | |
| 682 | 205 | } | |
| 683 | |||
| 684 | 410 | inline epoll_scheduler::~epoll_scheduler() | |
| 685 | { | ||
| 686 | 205 | if (timer_fd_ >= 0) | |
| 687 | 205 | ::close(timer_fd_); | |
| 688 | 205 | if (event_fd_ >= 0) | |
| 689 | 205 | ::close(event_fd_); | |
| 690 | 205 | if (epoll_fd_ >= 0) | |
| 691 | 205 | ::close(epoll_fd_); | |
| 692 | 410 | } | |
| 693 | |||
| 694 | inline void | ||
| 695 | 205 | epoll_scheduler::shutdown() | |
| 696 | { | ||
| 697 | { | ||
| 698 | 205 | std::unique_lock lock(mutex_); | |
| 699 | 205 | shutdown_ = true; | |
| 700 | |||
| 701 | 439 | while (auto* h = completed_ops_.pop()) | |
| 702 | { | ||
| 703 | 234 | if (h == &task_op_) | |
| 704 | 205 | continue; | |
| 705 | 29 | lock.unlock(); | |
| 706 | 29 | h->destroy(); | |
| 707 | 29 | lock.lock(); | |
| 708 | 234 | } | |
| 709 | |||
| 710 | 205 | signal_all(lock); | |
| 711 | 205 | } | |
| 712 | |||
| 713 | 205 | outstanding_work_.store(0, std::memory_order_release); | |
| 714 | |||
| 715 | 205 | if (event_fd_ >= 0) | |
| 716 | 205 | interrupt_reactor(); | |
| 717 | 205 | } | |
| 718 | |||
| 719 | inline void | ||
| 720 | 7032 | epoll_scheduler::post(std::coroutine_handle<> h) const | |
| 721 | { | ||
| 722 | struct post_handler final : scheduler_op | ||
| 723 | { | ||
| 724 | std::coroutine_handle<> h_; | ||
| 725 | |||
| 726 | 7032 | explicit post_handler(std::coroutine_handle<> h) : h_(h) {} | |
| 727 | |||
| 728 | 14064 | ~post_handler() override = default; | |
| 729 | |||
| 730 | 7032 | void operator()() override | |
| 731 | { | ||
| 732 | 7032 | auto h = h_; | |
| 733 | 7032 | delete this; | |
| 734 | 7032 | h.resume(); | |
| 735 | 7032 | } | |
| 736 | |||
| 737 | ✗ | void destroy() override | |
| 738 | { | ||
| 739 | ✗ | delete this; | |
| 740 | ✗ | } | |
| 741 | }; | ||
| 742 | |||
| 743 | 7032 | auto ph = std::make_unique<post_handler>(h); | |
| 744 | |||
| 745 | // Fast path: same thread posts to private queue | ||
| 746 | // Only count locally; work_cleanup batches to global counter | ||
| 747 | 7032 | if (auto* ctx = epoll::find_context(this)) | |
| 748 | { | ||
| 749 | 5372 | ++ctx->private_outstanding_work; | |
| 750 | 5372 | ctx->private_queue.push(ph.release()); | |
| 751 | 5372 | return; | |
| 752 | } | ||
| 753 | |||
| 754 | // Slow path: cross-thread post requires mutex | ||
| 755 | 1660 | outstanding_work_.fetch_add(1, std::memory_order_relaxed); | |
| 756 | |||
| 757 | 1660 | std::unique_lock lock(mutex_); | |
| 758 | 1660 | completed_ops_.push(ph.release()); | |
| 759 | 1660 | wake_one_thread_and_unlock(lock); | |
| 760 | 7032 | } | |
| 761 | |||
| 762 | inline void | ||
| 763 | 51669 | epoll_scheduler::post(scheduler_op* h) const | |
| 764 | { | ||
| 765 | // Fast path: same thread posts to private queue | ||
| 766 | // Only count locally; work_cleanup batches to global counter | ||
| 767 | 51669 | if (auto* ctx = epoll::find_context(this)) | |
| 768 | { | ||
| 769 | 51643 | ++ctx->private_outstanding_work; | |
| 770 | 51643 | ctx->private_queue.push(h); | |
| 771 | 51643 | return; | |
| 772 | } | ||
| 773 | |||
| 774 | // Slow path: cross-thread post requires mutex | ||
| 775 | 26 | outstanding_work_.fetch_add(1, std::memory_order_relaxed); | |
| 776 | |||
| 777 | 26 | std::unique_lock lock(mutex_); | |
| 778 | 26 | completed_ops_.push(h); | |
| 779 | 26 | wake_one_thread_and_unlock(lock); | |
| 780 | 26 | } | |
| 781 | |||
| 782 | inline bool | ||
| 783 | 703 | epoll_scheduler::running_in_this_thread() const noexcept | |
| 784 | { | ||
| 785 | 703 | for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next) | |
| 786 | 456 | if (c->key == this) | |
| 787 | 456 | return true; | |
| 788 | 247 | return false; | |
| 789 | } | ||
| 790 | |||
| 791 | inline void | ||
| 792 | 201 | epoll_scheduler::stop() | |
| 793 | { | ||
| 794 | 201 | std::unique_lock lock(mutex_); | |
| 795 | 201 | if (!stopped_) | |
| 796 | { | ||
| 797 | 168 | stopped_ = true; | |
| 798 | 168 | signal_all(lock); | |
| 799 | 168 | interrupt_reactor(); | |
| 800 | } | ||
| 801 | 201 | } | |
| 802 | |||
| 803 | inline bool | ||
| 804 | 18 | epoll_scheduler::stopped() const noexcept | |
| 805 | { | ||
| 806 | 18 | std::unique_lock lock(mutex_); | |
| 807 | 36 | return stopped_; | |
| 808 | 18 | } | |
| 809 | |||
| 810 | inline void | ||
| 811 | 52 | epoll_scheduler::restart() | |
| 812 | { | ||
| 813 | 52 | std::unique_lock lock(mutex_); | |
| 814 | 52 | stopped_ = false; | |
| 815 | 52 | } | |
| 816 | |||
| 817 | inline std::size_t | ||
| 818 | 187 | epoll_scheduler::run() | |
| 819 | { | ||
| 820 | 374 | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 821 | { | ||
| 822 | 28 | stop(); | |
| 823 | 28 | return 0; | |
| 824 | } | ||
| 825 | |||
| 826 | 159 | epoll::thread_context_guard ctx(this); | |
| 827 | 159 | std::unique_lock lock(mutex_); | |
| 828 | |||
| 829 | 159 | std::size_t n = 0; | |
| 830 | for (;;) | ||
| 831 | { | ||
| 832 | 99924 | if (!do_one(lock, -1, &ctx.frame_)) | |
| 833 | 159 | break; | |
| 834 | 99765 | if (n != (std::numeric_limits<std::size_t>::max)()) | |
| 835 | 99765 | ++n; | |
| 836 | 99765 | if (!lock.owns_lock()) | |
| 837 | 47958 | lock.lock(); | |
| 838 | } | ||
| 839 | 159 | return n; | |
| 840 | 159 | } | |
| 841 | |||
| 842 | inline std::size_t | ||
| 843 | 2 | epoll_scheduler::run_one() | |
| 844 | { | ||
| 845 | 4 | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 846 | { | ||
| 847 | ✗ | stop(); | |
| 848 | ✗ | return 0; | |
| 849 | } | ||
| 850 | |||
| 851 | 2 | epoll::thread_context_guard ctx(this); | |
| 852 | 2 | std::unique_lock lock(mutex_); | |
| 853 | 2 | return do_one(lock, -1, &ctx.frame_); | |
| 854 | 2 | } | |
| 855 | |||
| 856 | inline std::size_t | ||
| 857 | 34 | epoll_scheduler::wait_one(long usec) | |
| 858 | { | ||
| 859 | 68 | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 860 | { | ||
| 861 | 7 | stop(); | |
| 862 | 7 | return 0; | |
| 863 | } | ||
| 864 | |||
| 865 | 27 | epoll::thread_context_guard ctx(this); | |
| 866 | 27 | std::unique_lock lock(mutex_); | |
| 867 | 27 | return do_one(lock, usec, &ctx.frame_); | |
| 868 | 27 | } | |
| 869 | |||
| 870 | inline std::size_t | ||
| 871 | 2 | epoll_scheduler::poll() | |
| 872 | { | ||
| 873 | 4 | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 874 | { | ||
| 875 | 1 | stop(); | |
| 876 | 1 | return 0; | |
| 877 | } | ||
| 878 | |||
| 879 | 1 | epoll::thread_context_guard ctx(this); | |
| 880 | 1 | std::unique_lock lock(mutex_); | |
| 881 | |||
| 882 | 1 | std::size_t n = 0; | |
| 883 | for (;;) | ||
| 884 | { | ||
| 885 | 3 | if (!do_one(lock, 0, &ctx.frame_)) | |
| 886 | 1 | break; | |
| 887 | 2 | if (n != (std::numeric_limits<std::size_t>::max)()) | |
| 888 | 2 | ++n; | |
| 889 | 2 | if (!lock.owns_lock()) | |
| 890 | 2 | lock.lock(); | |
| 891 | } | ||
| 892 | 1 | return n; | |
| 893 | 1 | } | |
| 894 | |||
| 895 | inline std::size_t | ||
| 896 | 4 | epoll_scheduler::poll_one() | |
| 897 | { | ||
| 898 | 8 | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 899 | { | ||
| 900 | 2 | stop(); | |
| 901 | 2 | return 0; | |
| 902 | } | ||
| 903 | |||
| 904 | 2 | epoll::thread_context_guard ctx(this); | |
| 905 | 2 | std::unique_lock lock(mutex_); | |
| 906 | 2 | return do_one(lock, 0, &ctx.frame_); | |
| 907 | 2 | } | |
| 908 | |||
| 909 | inline void | ||
| 910 | 10035 | epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const | |
| 911 | { | ||
| 912 | 10035 | epoll_event ev{}; | |
| 913 | 10035 | ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP; | |
| 914 | 10035 | ev.data.ptr = desc; | |
| 915 | |||
| 916 | 10035 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0) | |
| 917 | ✗ | detail::throw_system_error(make_err(errno), "epoll_ctl (register)"); | |
| 918 | |||
| 919 | 10035 | desc->registered_events = ev.events; | |
| 920 | 10035 | desc->fd = fd; | |
| 921 | 10035 | desc->scheduler_ = this; | |
| 922 | |||
| 923 | 10035 | std::lock_guard lock(desc->mutex); | |
| 924 | 10035 | desc->read_ready = false; | |
| 925 | 10035 | desc->write_ready = false; | |
| 926 | 10035 | } | |
| 927 | |||
| 928 | inline void | ||
| 929 | 10035 | epoll_scheduler::deregister_descriptor(int fd) const | |
| 930 | { | ||
| 931 | 10035 | ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr); | |
| 932 | 10035 | } | |
| 933 | |||
| 934 | inline void | ||
| 935 | 16090 | epoll_scheduler::work_started() noexcept | |
| 936 | { | ||
| 937 | 16090 | outstanding_work_.fetch_add(1, std::memory_order_relaxed); | |
| 938 | 16090 | } | |
| 939 | |||
| 940 | inline void | ||
| 941 | 22973 | epoll_scheduler::work_finished() noexcept | |
| 942 | { | ||
| 943 | 45946 | if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1) | |
| 944 | 162 | stop(); | |
| 945 | 22973 | } | |
| 946 | |||
| 947 | inline void | ||
| 948 | 31135 | epoll_scheduler::compensating_work_started() const noexcept | |
| 949 | { | ||
| 950 | 31135 | auto* ctx = epoll::find_context(this); | |
| 951 | 31135 | if (ctx) | |
| 952 | 31135 | ++ctx->private_outstanding_work; | |
| 953 | 31135 | } | |
| 954 | |||
| 955 | inline void | ||
| 956 | ✗ | epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const | |
| 957 | { | ||
| 958 | // Note: outstanding_work_ was already incremented when posting | ||
| 959 | ✗ | std::unique_lock lock(mutex_); | |
| 960 | ✗ | completed_ops_.splice(queue); | |
| 961 | ✗ | if (count > 0) | |
| 962 | ✗ | maybe_unlock_and_signal_one(lock); | |
| 963 | ✗ | } | |
| 964 | |||
| 965 | inline void | ||
| 966 | 9962 | epoll_scheduler::post_deferred_completions(op_queue& ops) const | |
| 967 | { | ||
| 968 | 9962 | if (ops.empty()) | |
| 969 | 9962 | return; | |
| 970 | |||
| 971 | // Fast path: if on scheduler thread, use private queue | ||
| 972 | ✗ | if (auto* ctx = epoll::find_context(this)) | |
| 973 | { | ||
| 974 | ✗ | ctx->private_queue.splice(ops); | |
| 975 | ✗ | return; | |
| 976 | } | ||
| 977 | |||
| 978 | // Slow path: add to global queue and wake a thread | ||
| 979 | ✗ | std::unique_lock lock(mutex_); | |
| 980 | ✗ | completed_ops_.splice(ops); | |
| 981 | ✗ | wake_one_thread_and_unlock(lock); | |
| 982 | ✗ | } | |
| 983 | |||
| 984 | inline void | ||
| 985 | 399 | epoll_scheduler::interrupt_reactor() const | |
| 986 | { | ||
| 987 | // Only write if not already armed to avoid redundant writes | ||
| 988 | 399 | bool expected = false; | |
| 989 | 399 | if (eventfd_armed_.compare_exchange_strong( | |
| 990 | expected, true, std::memory_order_release, | ||
| 991 | std::memory_order_relaxed)) | ||
| 992 | { | ||
| 993 | 274 | std::uint64_t val = 1; | |
| 994 | 274 | [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val)); | |
| 995 | } | ||
| 996 | 399 | } | |
| 997 | |||
| 998 | inline void | ||
| 999 | 373 | epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const | |
| 1000 | { | ||
| 1001 | 373 | state_ |= 1; | |
| 1002 | 373 | cond_.notify_all(); | |
| 1003 | 373 | } | |
| 1004 | |||
| 1005 | inline bool | ||
| 1006 | 1686 | epoll_scheduler::maybe_unlock_and_signal_one( | |
| 1007 | std::unique_lock<std::mutex>& lock) const | ||
| 1008 | { | ||
| 1009 | 1686 | state_ |= 1; | |
| 1010 | 1686 | if (state_ > 1) | |
| 1011 | { | ||
| 1012 | ✗ | lock.unlock(); | |
| 1013 | ✗ | cond_.notify_one(); | |
| 1014 | ✗ | return true; | |
| 1015 | } | ||
| 1016 | 1686 | return false; | |
| 1017 | } | ||
| 1018 | |||
| 1019 | inline bool | ||
| 1020 | 124875 | epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const | |
| 1021 | { | ||
| 1022 | 124875 | state_ |= 1; | |
| 1023 | 124875 | bool have_waiters = state_ > 1; | |
| 1024 | 124875 | lock.unlock(); | |
| 1025 | 124875 | if (have_waiters) | |
| 1026 | ✗ | cond_.notify_one(); | |
| 1027 | 124875 | return have_waiters; | |
| 1028 | } | ||
| 1029 | |||
| 1030 | inline void | ||
| 1031 | ✗ | epoll_scheduler::clear_signal() const | |
| 1032 | { | ||
| 1033 | ✗ | state_ &= ~std::size_t(1); | |
| 1034 | ✗ | } | |
| 1035 | |||
| 1036 | inline void | ||
| 1037 | ✗ | epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const | |
| 1038 | { | ||
| 1039 | ✗ | while ((state_ & 1) == 0) | |
| 1040 | { | ||
| 1041 | ✗ | state_ += 2; | |
| 1042 | ✗ | cond_.wait(lock); | |
| 1043 | ✗ | state_ -= 2; | |
| 1044 | } | ||
| 1045 | ✗ | } | |
| 1046 | |||
| 1047 | inline void | ||
| 1048 | ✗ | epoll_scheduler::wait_for_signal_for( | |
| 1049 | std::unique_lock<std::mutex>& lock, long timeout_us) const | ||
| 1050 | { | ||
| 1051 | ✗ | if ((state_ & 1) == 0) | |
| 1052 | { | ||
| 1053 | ✗ | state_ += 2; | |
| 1054 | ✗ | cond_.wait_for(lock, std::chrono::microseconds(timeout_us)); | |
| 1055 | ✗ | state_ -= 2; | |
| 1056 | } | ||
| 1057 | ✗ | } | |
| 1058 | |||
| 1059 | inline void | ||
| 1060 | 1686 | epoll_scheduler::wake_one_thread_and_unlock( | |
| 1061 | std::unique_lock<std::mutex>& lock) const | ||
| 1062 | { | ||
| 1063 | 1686 | if (maybe_unlock_and_signal_one(lock)) | |
| 1064 | ✗ | return; | |
| 1065 | |||
| 1066 | 1686 | if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_) | |
| 1067 | { | ||
| 1068 | 26 | task_interrupted_ = true; | |
| 1069 | 26 | lock.unlock(); | |
| 1070 | 26 | interrupt_reactor(); | |
| 1071 | } | ||
| 1072 | else | ||
| 1073 | { | ||
| 1074 | 1660 | lock.unlock(); | |
| 1075 | } | ||
| 1076 | } | ||
| 1077 | |||
| 1078 | 99798 | inline epoll_scheduler::work_cleanup::~work_cleanup() | |
| 1079 | { | ||
| 1080 | 99798 | if (ctx) | |
| 1081 | { | ||
| 1082 | 99798 | long produced = ctx->private_outstanding_work; | |
| 1083 | 99798 | if (produced > 1) | |
| 1084 | 7 | scheduler->outstanding_work_.fetch_add( | |
| 1085 | produced - 1, std::memory_order_relaxed); | ||
| 1086 | 99791 | else if (produced < 1) | |
| 1087 | 16845 | scheduler->work_finished(); | |
| 1088 | 99798 | ctx->private_outstanding_work = 0; | |
| 1089 | |||
| 1090 | 99798 | if (!ctx->private_queue.empty()) | |
| 1091 | { | ||
| 1092 | 51818 | lock->lock(); | |
| 1093 | 51818 | scheduler->completed_ops_.splice(ctx->private_queue); | |
| 1094 | } | ||
| 1095 | } | ||
| 1096 | else | ||
| 1097 | { | ||
| 1098 | ✗ | scheduler->work_finished(); | |
| 1099 | } | ||
| 1100 | 99798 | } | |
| 1101 | |||
| 1102 | 70536 | inline epoll_scheduler::task_cleanup::~task_cleanup() | |
| 1103 | { | ||
| 1104 | 35268 | if (!ctx) | |
| 1105 | ✗ | return; | |
| 1106 | |||
| 1107 | 35268 | if (ctx->private_outstanding_work > 0) | |
| 1108 | { | ||
| 1109 | 5184 | scheduler->outstanding_work_.fetch_add( | |
| 1110 | 5184 | ctx->private_outstanding_work, std::memory_order_relaxed); | |
| 1111 | 5184 | ctx->private_outstanding_work = 0; | |
| 1112 | } | ||
| 1113 | |||
| 1114 | 35268 | if (!ctx->private_queue.empty()) | |
| 1115 | { | ||
| 1116 | 5184 | if (!lock->owns_lock()) | |
| 1117 | ✗ | lock->lock(); | |
| 1118 | 5184 | scheduler->completed_ops_.splice(ctx->private_queue); | |
| 1119 | } | ||
| 1120 | 35268 | } | |
| 1121 | |||
| 1122 | inline void | ||
| 1123 | 10364 | epoll_scheduler::update_timerfd() const | |
| 1124 | { | ||
| 1125 | 10364 | auto nearest = timer_svc_->nearest_expiry(); | |
| 1126 | |||
| 1127 | 10364 | itimerspec ts{}; | |
| 1128 | 10364 | int flags = 0; | |
| 1129 | |||
| 1130 | 10364 | if (nearest == timer_service::time_point::max()) | |
| 1131 | { | ||
| 1132 | // No timers - disarm by setting to 0 (relative) | ||
| 1133 | } | ||
| 1134 | else | ||
| 1135 | { | ||
| 1136 | 10319 | auto now = std::chrono::steady_clock::now(); | |
| 1137 | 10319 | if (nearest <= now) | |
| 1138 | { | ||
| 1139 | // Use 1ns instead of 0 - zero disarms the timerfd | ||
| 1140 | 110 | ts.it_value.tv_nsec = 1; | |
| 1141 | } | ||
| 1142 | else | ||
| 1143 | { | ||
| 1144 | 10209 | auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>( | |
| 1145 | 10209 | nearest - now) | |
| 1146 | 10209 | .count(); | |
| 1147 | 10209 | ts.it_value.tv_sec = nsec / 1000000000; | |
| 1148 | 10209 | ts.it_value.tv_nsec = nsec % 1000000000; | |
| 1149 | // Ensure non-zero to avoid disarming if duration rounds to 0 | ||
| 1150 | 10209 | if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0) | |
| 1151 | ✗ | ts.it_value.tv_nsec = 1; | |
| 1152 | } | ||
| 1153 | } | ||
| 1154 | |||
| 1155 | 10364 | if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0) | |
| 1156 | ✗ | detail::throw_system_error(make_err(errno), "timerfd_settime"); | |
| 1157 | 10364 | } | |
| 1158 | |||
| 1159 | inline void | ||
| 1160 | 35268 | epoll_scheduler::run_task( | |
| 1161 | std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx) | ||
| 1162 | { | ||
| 1163 | 35268 | int timeout_ms = task_interrupted_ ? 0 : -1; | |
| 1164 | |||
| 1165 | 35268 | if (lock.owns_lock()) | |
| 1166 | 10191 | lock.unlock(); | |
| 1167 | |||
| 1168 | 35268 | task_cleanup on_exit{this, &lock, ctx}; | |
| 1169 | |||
| 1170 | // Flush deferred timerfd programming before blocking | ||
| 1171 | 35268 | if (timerfd_stale_.exchange(false, std::memory_order_acquire)) | |
| 1172 | 5180 | update_timerfd(); | |
| 1173 | |||
| 1174 | // Event loop runs without mutex held | ||
| 1175 | epoll_event events[128]; | ||
| 1176 | 35268 | int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms); | |
| 1177 | |||
| 1178 | 35268 | if (nfds < 0 && errno != EINTR) | |
| 1179 | ✗ | detail::throw_system_error(make_err(errno), "epoll_wait"); | |
| 1180 | |||
| 1181 | 35268 | bool check_timers = false; | |
| 1182 | 35268 | op_queue local_ops; | |
| 1183 | |||
| 1184 | // Process events without holding the mutex | ||
| 1185 | 81647 | for (int i = 0; i < nfds; ++i) | |
| 1186 | { | ||
| 1187 | 46379 | if (events[i].data.ptr == nullptr) | |
| 1188 | { | ||
| 1189 | std::uint64_t val; | ||
| 1190 | // Mutex released above; analyzer can't track unlock via ref | ||
| 1191 | // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection) | ||
| 1192 | 69 | [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val)); | |
| 1193 | 69 | eventfd_armed_.store(false, std::memory_order_relaxed); | |
| 1194 | 69 | continue; | |
| 1195 | 69 | } | |
| 1196 | |||
| 1197 | 46310 | if (events[i].data.ptr == &timer_fd_) | |
| 1198 | { | ||
| 1199 | std::uint64_t expirations; | ||
| 1200 | // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection) | ||
| 1201 | [[maybe_unused]] auto r = | ||
| 1202 | 5184 | ::read(timer_fd_, &expirations, sizeof(expirations)); | |
| 1203 | 5184 | check_timers = true; | |
| 1204 | 5184 | continue; | |
| 1205 | 5184 | } | |
| 1206 | |||
| 1207 | // Deferred I/O: just set ready events and enqueue descriptor | ||
| 1208 | // No per-descriptor mutex locking in reactor hot path! | ||
| 1209 | 41126 | auto* desc = static_cast<descriptor_state*>(events[i].data.ptr); | |
| 1210 | 41126 | desc->add_ready_events(events[i].events); | |
| 1211 | |||
| 1212 | // Only enqueue if not already enqueued | ||
| 1213 | 41126 | bool expected = false; | |
| 1214 | 41126 | if (desc->is_enqueued_.compare_exchange_strong( | |
| 1215 | expected, true, std::memory_order_release, | ||
| 1216 | std::memory_order_relaxed)) | ||
| 1217 | { | ||
| 1218 | 41126 | local_ops.push(desc); | |
| 1219 | } | ||
| 1220 | } | ||
| 1221 | |||
| 1222 | // Process timers only when timerfd fires | ||
| 1223 | 35268 | if (check_timers) | |
| 1224 | { | ||
| 1225 | 5184 | timer_svc_->process_expired(); | |
| 1226 | 5184 | update_timerfd(); | |
| 1227 | } | ||
| 1228 | |||
| 1229 | 35268 | lock.lock(); | |
| 1230 | |||
| 1231 | 35268 | if (!local_ops.empty()) | |
| 1232 | 24624 | completed_ops_.splice(local_ops); | |
| 1233 | 35268 | } | |
| 1234 | |||
| 1235 | inline std::size_t | ||
| 1236 | 99958 | epoll_scheduler::do_one( | |
| 1237 | std::unique_lock<std::mutex>& lock, | ||
| 1238 | long timeout_us, | ||
| 1239 | epoll::scheduler_context* ctx) | ||
| 1240 | { | ||
| 1241 | for (;;) | ||
| 1242 | { | ||
| 1243 | 135226 | if (stopped_) | |
| 1244 | 159 | return 0; | |
| 1245 | |||
| 1246 | 135067 | scheduler_op* op = completed_ops_.pop(); | |
| 1247 | |||
| 1248 | // Handle reactor sentinel - time to poll for I/O | ||
| 1249 | 135067 | if (op == &task_op_) | |
| 1250 | { | ||
| 1251 | 35269 | bool more_handlers = !completed_ops_.empty(); | |
| 1252 | |||
| 1253 | // Nothing to run the reactor for: no pending work to wait on, | ||
| 1254 | // or caller requested a non-blocking poll | ||
| 1255 | 45461 | if (!more_handlers && | |
| 1256 | 20384 | (outstanding_work_.load(std::memory_order_acquire) == 0 || | |
| 1257 | timeout_us == 0)) | ||
| 1258 | { | ||
| 1259 | 1 | completed_ops_.push(&task_op_); | |
| 1260 | 1 | return 0; | |
| 1261 | } | ||
| 1262 | |||
| 1263 | 35268 | task_interrupted_ = more_handlers || timeout_us == 0; | |
| 1264 | 35268 | task_running_.store(true, std::memory_order_release); | |
| 1265 | |||
| 1266 | 35268 | if (more_handlers) | |
| 1267 | 25077 | unlock_and_signal_one(lock); | |
| 1268 | |||
| 1269 | 35268 | run_task(lock, ctx); | |
| 1270 | |||
| 1271 | 35268 | task_running_.store(false, std::memory_order_relaxed); | |
| 1272 | 35268 | completed_ops_.push(&task_op_); | |
| 1273 | 35268 | continue; | |
| 1274 | 35268 | } | |
| 1275 | |||
| 1276 | // Handle operation | ||
| 1277 | 99798 | if (op != nullptr) | |
| 1278 | { | ||
| 1279 | 99798 | bool more = !completed_ops_.empty(); | |
| 1280 | |||
| 1281 | 99798 | if (more) | |
| 1282 | 99798 | ctx->unassisted = !unlock_and_signal_one(lock); | |
| 1283 | else | ||
| 1284 | { | ||
| 1285 | ✗ | ctx->unassisted = false; | |
| 1286 | ✗ | lock.unlock(); | |
| 1287 | } | ||
| 1288 | |||
| 1289 | 99798 | work_cleanup on_exit{this, &lock, ctx}; | |
| 1290 | |||
| 1291 | 99798 | (*op)(); | |
| 1292 | 99798 | return 1; | |
| 1293 | 99798 | } | |
| 1294 | |||
| 1295 | // No pending work to wait on, or caller requested non-blocking poll | ||
| 1296 | ✗ | if (outstanding_work_.load(std::memory_order_acquire) == 0 || | |
| 1297 | timeout_us == 0) | ||
| 1298 | ✗ | return 0; | |
| 1299 | |||
| 1300 | ✗ | clear_signal(); | |
| 1301 | ✗ | if (timeout_us < 0) | |
| 1302 | ✗ | wait_for_signal(lock); | |
| 1303 | else | ||
| 1304 | ✗ | wait_for_signal_for(lock, timeout_us); | |
| 1305 | 35268 | } | |
| 1306 | } | ||
| 1307 | |||
| 1308 | } // namespace boost::corosio::detail | ||
| 1309 | |||
| 1310 | #endif // BOOST_COROSIO_HAS_EPOLL | ||
| 1311 | |||
| 1312 | #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | ||
| 1313 |