Botan  2.4.0
Crypto and TLS for C++11
threaded_fork.cpp
Go to the documentation of this file.
1 /*
2 * Threaded Fork
3 * (C) 2013 Joel Low
4 * 2013 Jack Lloyd
5 *
6 * Botan is released under the Simplified BSD License (see license.txt)
7 */
8 
9 #include <botan/basefilt.h>
10 
11 #if defined(BOTAN_TARGET_OS_HAS_THREADS)
12 
13 #include <botan/internal/semaphore.h>
14 #include <botan/internal/barrier.h>
15 #include <functional>
16 
17 namespace Botan {
18 
19 struct Threaded_Fork_Data
20  {
21  /*
22  * Semaphore for indicating that there is work to be done (or to
23  * quit)
24  */
25  Semaphore m_input_ready_semaphore;
26 
27  /*
28  * Synchronises all threads to complete processing data in lock-step.
29  */
30  Barrier m_input_complete_barrier;
31 
32  /*
33  * The work that needs to be done. This should be only when the threads
34  * are NOT running (i.e. before notifying the work condition, after
35  * the input_complete_barrier has reset.)
36  */
37  const uint8_t* m_input = nullptr;
38 
39  /*
40  * The length of the work that needs to be done.
41  */
42  size_t m_input_length = 0;
43  };
44 
45 /*
46 * Threaded_Fork constructor
47 */
48 Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) :
49  Fork(nullptr, static_cast<size_t>(0)),
50  m_thread_data(new Threaded_Fork_Data)
51  {
52  Filter* filters[4] = { f1, f2, f3, f4 };
53  set_next(filters, 4);
54  }
55 
56 /*
57 * Threaded_Fork constructor
58 */
59 Threaded_Fork::Threaded_Fork(Filter* filters[], size_t count) :
60  Fork(nullptr, static_cast<size_t>(0)),
61  m_thread_data(new Threaded_Fork_Data)
62  {
63  set_next(filters, count);
64  }
65 
66 Threaded_Fork::~Threaded_Fork()
67  {
68  m_thread_data->m_input = nullptr;
69  m_thread_data->m_input_length = 0;
70 
71  m_thread_data->m_input_ready_semaphore.release(m_threads.size());
72 
73  for(auto& thread : m_threads)
74  thread->join();
75  }
76 
77 std::string Threaded_Fork::name() const
78  {
79  return "Threaded Fork";
80  }
81 
82 void Threaded_Fork::set_next(Filter* f[], size_t n)
83  {
84  Fork::set_next(f, n);
85  n = m_next.size();
86 
87  if(n < m_threads.size())
88  m_threads.resize(n);
89  else
90  {
91  m_threads.reserve(n);
92  for(size_t i = m_threads.size(); i != n; ++i)
93  {
94  m_threads.push_back(
95  std::shared_ptr<std::thread>(
96  new std::thread(
97  std::bind(&Threaded_Fork::thread_entry, this, m_next[i]))));
98  }
99  }
100  }
101 
102 void Threaded_Fork::send(const uint8_t input[], size_t length)
103  {
104  if(m_write_queue.size())
105  thread_delegate_work(m_write_queue.data(), m_write_queue.size());
106  thread_delegate_work(input, length);
107 
108  bool nothing_attached = true;
109  for(size_t j = 0; j != total_ports(); ++j)
110  if(m_next[j])
111  nothing_attached = false;
112 
113  if(nothing_attached)
114  m_write_queue += std::make_pair(input, length);
115  else
116  m_write_queue.clear();
117  }
118 
119 void Threaded_Fork::thread_delegate_work(const uint8_t input[], size_t length)
120  {
121  //Set the data to do.
122  m_thread_data->m_input = input;
123  m_thread_data->m_input_length = length;
124 
125  //Let the workers start processing.
126  m_thread_data->m_input_complete_barrier.wait(total_ports() + 1);
127  m_thread_data->m_input_ready_semaphore.release(total_ports());
128 
129  //Wait for all the filters to finish processing.
130  m_thread_data->m_input_complete_barrier.sync();
131 
132  //Reset the thread data
133  m_thread_data->m_input = nullptr;
134  m_thread_data->m_input_length = 0;
135  }
136 
137 void Threaded_Fork::thread_entry(Filter* filter)
138  {
139  while(true)
140  {
141  m_thread_data->m_input_ready_semaphore.acquire();
142 
143  if(!m_thread_data->m_input)
144  break;
145 
146  filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
147  m_thread_data->m_input_complete_barrier.sync();
148  }
149  }
150 
151 }
152 
153 #endif
void set_next(Filter *f[], size_t n)
Definition: filter.h:162
Definition: alg_id.cpp:13