Botan 3.3.0
Crypto and TLS for C&
threaded_fork.cpp
Go to the documentation of this file.
1/*
2* Threaded Fork
3* (C) 2013 Joel Low
4* 2013 Jack Lloyd
5*
6* Botan is released under the Simplified BSD License (see license.txt)
7*/
8
9#include <botan/filters.h>
10
11#if defined(BOTAN_HAS_THREAD_UTILS)
12
13 #include <botan/internal/barrier.h>
14 #include <botan/internal/semaphore.h>
15 #include <functional>
16
17namespace Botan {
18
19struct Threaded_Fork_Data {
20 /*
21 * Semaphore for indicating that there is work to be done (or to
22 * quit)
23 */
24 Semaphore m_input_ready_semaphore;
25
26 /*
27 * Synchronises all threads to complete processing data in lock-step.
28 */
29 Barrier m_input_complete_barrier;
30
31 /*
32 * The work that needs to be done. This should be only when the threads
33 * are NOT running (i.e. before notifying the work condition, after
34 * the input_complete_barrier has reset.)
35 */
36 const uint8_t* m_input = nullptr;
37
38 /*
39 * The length of the work that needs to be done.
40 */
41 size_t m_input_length = 0;
42};
43
44/*
45* Threaded_Fork constructor
46*/
47Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) :
48 Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
49 Filter* filters[4] = {f1, f2, f3, f4};
50 set_next(filters, 4);
51}
52
53/*
54* Threaded_Fork constructor
55*/
56Threaded_Fork::Threaded_Fork(Filter* filters[], size_t count) :
57 Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
58 set_next(filters, count);
59}
60
61Threaded_Fork::~Threaded_Fork() {
62 m_thread_data->m_input = nullptr;
63 m_thread_data->m_input_length = 0;
64
65 m_thread_data->m_input_ready_semaphore.release(m_threads.size());
66
67 for(auto& thread : m_threads) {
68 thread->join();
69 }
70}
71
72std::string Threaded_Fork::name() const {
73 return "Threaded Fork";
74}
75
76void Threaded_Fork::set_next(Filter* f[], size_t n) {
77 Fork::set_next(f, n);
78 n = m_next.size();
79
80 if(n < m_threads.size()) {
81 m_threads.resize(n);
82 } else {
83 m_threads.reserve(n);
84 for(size_t i = m_threads.size(); i != n; ++i) {
85 m_threads.push_back(std::make_shared<std::thread>(std::bind(&Threaded_Fork::thread_entry, this, m_next[i])));
86 }
87 }
88}
89
90void Threaded_Fork::send(const uint8_t input[], size_t length) {
91 if(!m_write_queue.empty()) {
92 thread_delegate_work(m_write_queue.data(), m_write_queue.size());
93 }
94 thread_delegate_work(input, length);
95
96 bool nothing_attached = true;
97 for(size_t j = 0; j != total_ports(); ++j) {
98 if(m_next[j]) {
99 nothing_attached = false;
100 }
101 }
102
103 if(nothing_attached) {
104 m_write_queue += std::make_pair(input, length);
105 } else {
106 m_write_queue.clear();
107 }
108}
109
110void Threaded_Fork::thread_delegate_work(const uint8_t input[], size_t length) {
111 //Set the data to do.
112 m_thread_data->m_input = input;
113 m_thread_data->m_input_length = length;
114
115 //Let the workers start processing.
116 m_thread_data->m_input_complete_barrier.wait(total_ports() + 1);
117 m_thread_data->m_input_ready_semaphore.release(total_ports());
118
119 //Wait for all the filters to finish processing.
120 m_thread_data->m_input_complete_barrier.sync();
121
122 //Reset the thread data
123 m_thread_data->m_input = nullptr;
124 m_thread_data->m_input_length = 0;
125}
126
127void Threaded_Fork::thread_entry(Filter* filter) {
128 while(true) {
129 m_thread_data->m_input_ready_semaphore.acquire();
130
131 if(!m_thread_data->m_input) {
132 break;
133 }
134
135 filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
136 m_thread_data->m_input_complete_barrier.sync();
137 }
138}
139
140} // namespace Botan
141
142#endif