Rewrote Frontend
This commit is contained in:
		
							
								
								
									
										68
									
								
								backend/shl/msd/blocking_iterator.hpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										68
									
								
								backend/shl/msd/blocking_iterator.hpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,68 @@
 | 
			
		||||
// Copyright (C) 2022 Andrei Avram
 | 
			
		||||
 | 
			
		||||
#ifndef MSD_CHANNEL_BLOCKING_ITERATOR_HPP_
 | 
			
		||||
#define MSD_CHANNEL_BLOCKING_ITERATOR_HPP_
 | 
			
		||||
 | 
			
		||||
#include <iterator>
 | 
			
		||||
#include <mutex>
 | 
			
		||||
 | 
			
		||||
namespace msd {
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @brief An iterator that block the current thread,
 | 
			
		||||
 * waiting to fetch elements from the channel.
 | 
			
		||||
 *
 | 
			
		||||
 * Used to implement channel range-based for loop.
 | 
			
		||||
 *
 | 
			
		||||
 * @tparam Channel Instance of channel.
 | 
			
		||||
 */
 | 
			
		||||
template <typename channel>
 | 
			
		||||
class blocking_iterator {
 | 
			
		||||
   public:
 | 
			
		||||
    using value_type = typename channel::value_type;
 | 
			
		||||
 | 
			
		||||
    explicit blocking_iterator(channel& ch) : ch_{ch} {}
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Advances to next element in the channel.
 | 
			
		||||
     */
 | 
			
		||||
    blocking_iterator<channel> operator++() const noexcept { return *this; }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Returns an element from the channel.
 | 
			
		||||
     */
 | 
			
		||||
    value_type operator*() const
 | 
			
		||||
    {
 | 
			
		||||
        value_type value;
 | 
			
		||||
        value << ch_;
 | 
			
		||||
 | 
			
		||||
        return value;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Makes iteration continue until the channel is closed and empty.
 | 
			
		||||
     */
 | 
			
		||||
    bool operator!=(blocking_iterator<channel>) const
 | 
			
		||||
    {
 | 
			
		||||
        std::unique_lock<std::mutex> lock{ch_.mtx_};
 | 
			
		||||
        ch_.waitBeforeRead(lock);
 | 
			
		||||
 | 
			
		||||
        return !(ch_.closed() && ch_.empty());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
   private:
 | 
			
		||||
    channel& ch_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
}  // namespace msd
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @brief Output iterator specialization
 | 
			
		||||
 */
 | 
			
		||||
template <typename T>
 | 
			
		||||
struct std::iterator_traits<msd::blocking_iterator<T>> {
 | 
			
		||||
    using value_type = typename msd::blocking_iterator<T>::value_type;
 | 
			
		||||
    using iterator_category = std::output_iterator_tag;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
#endif  // MSD_CHANNEL_BLOCKING_ITERATOR_HPP_
 | 
			
		||||
							
								
								
									
										130
									
								
								backend/shl/msd/channel.hpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										130
									
								
								backend/shl/msd/channel.hpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,130 @@
 | 
			
		||||
// Copyright (C) 2022 Andrei Avram
 | 
			
		||||
 | 
			
		||||
#ifndef MSD_CHANNEL_HPP_
 | 
			
		||||
#define MSD_CHANNEL_HPP_
 | 
			
		||||
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <condition_variable>
 | 
			
		||||
#include <cstdlib>
 | 
			
		||||
#include <mutex>
 | 
			
		||||
#include <queue>
 | 
			
		||||
#include <stdexcept>
 | 
			
		||||
#include <type_traits>
 | 
			
		||||
#include <utility>
 | 
			
		||||
 | 
			
		||||
#include "blocking_iterator.hpp"
 | 
			
		||||
 | 
			
		||||
namespace msd {
 | 
			
		||||
 | 
			
		||||
#if (__cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L))
 | 
			
		||||
#define NODISCARD [[nodiscard]]
 | 
			
		||||
#else
 | 
			
		||||
#define NODISCARD
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
namespace detail {
 | 
			
		||||
template <typename T>
 | 
			
		||||
struct remove_cvref {
 | 
			
		||||
    using type = typename std::remove_cv<typename std::remove_reference<T>::type>::type;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
template <typename T>
 | 
			
		||||
using remove_cvref_t = typename remove_cvref<T>::type;
 | 
			
		||||
}  // namespace detail
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @brief Exception thrown if trying to write on closed channel.
 | 
			
		||||
 */
 | 
			
		||||
class closed_channel : public std::runtime_error {
 | 
			
		||||
   public:
 | 
			
		||||
    explicit closed_channel(const char* msg) : std::runtime_error{msg} {}
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @brief Thread-safe container for sharing data between threads.
 | 
			
		||||
 *
 | 
			
		||||
 * Implements a blocking input iterator.
 | 
			
		||||
 *
 | 
			
		||||
 * @tparam T The type of the elements.
 | 
			
		||||
 */
 | 
			
		||||
template <typename T>
 | 
			
		||||
class channel {
 | 
			
		||||
   public:
 | 
			
		||||
    using value_type = T;
 | 
			
		||||
    using iterator = blocking_iterator<channel<T>>;
 | 
			
		||||
    using size_type = std::size_t;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Creates a new channel.
 | 
			
		||||
     *
 | 
			
		||||
     * @param capacity Number of elements the channel can store before blocking.
 | 
			
		||||
     */
 | 
			
		||||
    explicit constexpr channel(size_type capacity = 0);
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Pushes an element into the channel.
 | 
			
		||||
     *
 | 
			
		||||
     * @throws closed_channel if channel is closed.
 | 
			
		||||
     */
 | 
			
		||||
    template <typename Type>
 | 
			
		||||
    friend void operator>>(Type&&, channel<detail::remove_cvref_t<Type>>&);
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Pops an element from the channel.
 | 
			
		||||
     *
 | 
			
		||||
     * @tparam Type The type of the elements
 | 
			
		||||
     */
 | 
			
		||||
    template <typename Type>
 | 
			
		||||
    friend void operator<<(Type&, channel<Type>&);
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Returns the number of elements in the channel.
 | 
			
		||||
     */
 | 
			
		||||
    NODISCARD inline size_type constexpr size() const noexcept;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Returns true if there are no elements in channel.
 | 
			
		||||
     */
 | 
			
		||||
    NODISCARD inline bool constexpr empty() const noexcept;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Closes the channel.
 | 
			
		||||
     */
 | 
			
		||||
    inline void close() noexcept;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Returns true if the channel is closed.
 | 
			
		||||
     */
 | 
			
		||||
    NODISCARD inline bool closed() const noexcept;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Iterator
 | 
			
		||||
     */
 | 
			
		||||
    iterator begin() noexcept;
 | 
			
		||||
    iterator end() noexcept;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Channel cannot be copied or moved.
 | 
			
		||||
     */
 | 
			
		||||
    channel(const channel&) = delete;
 | 
			
		||||
    channel& operator=(const channel&) = delete;
 | 
			
		||||
    channel(channel&&) = delete;
 | 
			
		||||
    channel& operator=(channel&&) = delete;
 | 
			
		||||
    virtual ~channel() = default;
 | 
			
		||||
 | 
			
		||||
   private:
 | 
			
		||||
    const size_type cap_;
 | 
			
		||||
    std::queue<T> queue_;
 | 
			
		||||
    std::mutex mtx_;
 | 
			
		||||
    std::condition_variable cnd_;
 | 
			
		||||
    std::atomic<bool> is_closed_{false};
 | 
			
		||||
 | 
			
		||||
    inline void waitBeforeRead(std::unique_lock<std::mutex>&);
 | 
			
		||||
    friend class blocking_iterator<channel>;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
#include "channel_impl.hpp"
 | 
			
		||||
 | 
			
		||||
}  // namespace msd
 | 
			
		||||
 | 
			
		||||
#endif  // MSD_CHANNEL_HPP_
 | 
			
		||||
							
								
								
									
										87
									
								
								backend/shl/msd/channel_impl.hpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										87
									
								
								backend/shl/msd/channel_impl.hpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,87 @@
 | 
			
		||||
// Copyright (C) 2022 Andrei Avram
 | 
			
		||||
 | 
			
		||||
template <typename T>
 | 
			
		||||
constexpr channel<T>::channel(const size_type capacity) : cap_{capacity}
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T>
 | 
			
		||||
void operator>>(T&& in, channel<detail::remove_cvref_t<T>>& ch)
 | 
			
		||||
{
 | 
			
		||||
    if (ch.closed()) {
 | 
			
		||||
        throw closed_channel{"cannot write on closed channel"};
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::unique_lock<std::mutex> lock{ch.mtx_};
 | 
			
		||||
 | 
			
		||||
    if (ch.cap_ > 0 && ch.queue_.size() == ch.cap_) {
 | 
			
		||||
        ch.cnd_.wait(lock, [&ch]() { return ch.queue_.size() < ch.cap_; });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    ch.queue_.push(std::forward<T>(in));
 | 
			
		||||
 | 
			
		||||
    ch.cnd_.notify_one();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T>
 | 
			
		||||
void operator<<(T& out, channel<T>& ch)
 | 
			
		||||
{
 | 
			
		||||
    if (ch.closed() && ch.empty()) {
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    {
 | 
			
		||||
        std::unique_lock<std::mutex> lock{ch.mtx_};
 | 
			
		||||
        ch.waitBeforeRead(lock);
 | 
			
		||||
 | 
			
		||||
        if (ch.queue_.size() > 0) {
 | 
			
		||||
            out = std::move(ch.queue_.front());
 | 
			
		||||
            ch.queue_.pop();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    ch.cnd_.notify_one();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T>
 | 
			
		||||
constexpr typename channel<T>::size_type channel<T>::size() const noexcept
 | 
			
		||||
{
 | 
			
		||||
    return queue_.size();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T>
 | 
			
		||||
constexpr bool channel<T>::empty() const noexcept
 | 
			
		||||
{
 | 
			
		||||
    return queue_.empty();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T>
 | 
			
		||||
void channel<T>::close() noexcept
 | 
			
		||||
{
 | 
			
		||||
    is_closed_.store(true);
 | 
			
		||||
    cnd_.notify_all();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T>
 | 
			
		||||
bool channel<T>::closed() const noexcept
 | 
			
		||||
{
 | 
			
		||||
    return is_closed_.load();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T>
 | 
			
		||||
blocking_iterator<channel<T>> channel<T>::begin() noexcept
 | 
			
		||||
{
 | 
			
		||||
    return blocking_iterator<channel<T>>{*this};
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T>
 | 
			
		||||
blocking_iterator<channel<T>> channel<T>::end() noexcept
 | 
			
		||||
{
 | 
			
		||||
    return blocking_iterator<channel<T>>{*this};
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T>
 | 
			
		||||
void channel<T>::waitBeforeRead(std::unique_lock<std::mutex>& lock)
 | 
			
		||||
{
 | 
			
		||||
    cnd_.wait(lock, [this] { return queue_.size() > 0 || closed(); });
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user