// Copyright (C) 2022 Andrei Avram template constexpr channel::channel(const size_type capacity) : cap_{capacity} { } template void operator>>(T&& in, channel>& ch) { if (ch.closed()) { throw closed_channel{"cannot write on closed channel"}; } std::unique_lock lock{ch.mtx_}; if (ch.cap_ > 0 && ch.queue_.size() == ch.cap_) { ch.cnd_.wait(lock, [&ch]() { return ch.queue_.size() < ch.cap_; }); } ch.queue_.push(std::forward(in)); ch.cnd_.notify_one(); } template void operator<<(T& out, channel& ch) { if (ch.closed() && ch.empty()) { return; } { std::unique_lock lock{ch.mtx_}; ch.waitBeforeRead(lock); if (ch.queue_.size() > 0) { out = std::move(ch.queue_.front()); ch.queue_.pop(); } } ch.cnd_.notify_one(); } template constexpr typename channel::size_type channel::size() const noexcept { return queue_.size(); } template constexpr bool channel::empty() const noexcept { return queue_.empty(); } template void channel::close() noexcept { is_closed_.store(true); cnd_.notify_all(); } template bool channel::closed() const noexcept { return is_closed_.load(); } template blocking_iterator> channel::begin() noexcept { return blocking_iterator>{*this}; } template blocking_iterator> channel::end() noexcept { return blocking_iterator>{*this}; } template void channel::waitBeforeRead(std::unique_lock& lock) { cnd_.wait(lock, [this] { return queue_.size() > 0 || closed(); }); }