9#include <botan/filters.h> 
   11#if defined(BOTAN_HAS_THREAD_UTILS) 
   13   #include <botan/internal/barrier.h> 
   14   #include <botan/internal/semaphore.h> 
   19struct Threaded_Fork_Data {
 
   24      Semaphore m_input_ready_semaphore;
 
   29      Barrier m_input_complete_barrier;
 
   36      const uint8_t* m_input = 
nullptr;
 
   41      size_t m_input_length = 0;
 
   48      Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
 
   49   Filter* filters[4] = {f1, f2, f3, f4};
 
   56Threaded_Fork::Threaded_Fork(Filter* filters[], 
size_t count) :
 
   57      Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
 
   58   set_next(filters, count);
 
   61Threaded_Fork::~Threaded_Fork() {
 
   62   m_thread_data->m_input = 
nullptr;
 
   63   m_thread_data->m_input_length = 0;
 
   65   m_thread_data->m_input_ready_semaphore.release(m_threads.size());
 
   67   for(
auto& thread : m_threads) {
 
   72std::string Threaded_Fork::name()
 const {
 
   73   return "Threaded Fork";
 
   76void Threaded_Fork::set_next(Filter* f[], 
size_t n) {
 
   80   if(n < m_threads.size()) {
 
   84      for(
size_t i = m_threads.size(); i != n; ++i) {
 
   85         m_threads.push_back(std::make_shared<std::thread>([
this, next = m_next[i]] { thread_entry(next); }));
 
   90void Threaded_Fork::send(
const uint8_t input[], 
size_t length) {
 
   91   if(!m_write_queue.empty()) {
 
   92      thread_delegate_work(m_write_queue.data(), m_write_queue.size());
 
   94   thread_delegate_work(input, length);
 
   96   bool nothing_attached = 
true;
 
   97   for(
size_t j = 0; j != total_ports(); ++j) {
 
   98      if(m_next[j] != 
nullptr) {
 
   99         nothing_attached = 
false;
 
  103   if(nothing_attached) {
 
  104      m_write_queue += std::make_pair(input, length);
 
  106      m_write_queue.clear();
 
  110void Threaded_Fork::thread_delegate_work(
const uint8_t input[], 
size_t length) {
 
  112   m_thread_data->m_input = input;
 
  113   m_thread_data->m_input_length = length;
 
  116   m_thread_data->m_input_complete_barrier.wait(total_ports() + 1);
 
  117   m_thread_data->m_input_ready_semaphore.release(total_ports());
 
  120   m_thread_data->m_input_complete_barrier.sync();
 
  123   m_thread_data->m_input = 
nullptr;
 
  124   m_thread_data->m_input_length = 0;
 
  127void Threaded_Fork::thread_entry(Filter* filter) {
 
  129      m_thread_data->m_input_ready_semaphore.acquire();
 
  131      if(m_thread_data->m_input == 
nullptr) {
 
  135      filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
 
  136      m_thread_data->m_input_complete_barrier.sync();