Botan 3.11.0
Crypto and TLS for C&
threaded_fork.cpp
Go to the documentation of this file.
1/*
2* Threaded Fork
3* (C) 2013 Joel Low
4* 2013 Jack Lloyd
5*
6* Botan is released under the Simplified BSD License (see license.txt)
7*/
8
9#include <botan/filters.h>
10
11#if defined(BOTAN_HAS_THREAD_UTILS)
12
13 #include <botan/internal/barrier.h>
14 #include <botan/internal/semaphore.h>
15
16namespace Botan {
17
18struct Threaded_Fork_Data {
19 /*
20 * Semaphore for indicating that there is work to be done (or to
21 * quit)
22 */
23 Semaphore m_input_ready_semaphore;
24
25 /*
26 * Synchronises all threads to complete processing data in lock-step.
27 */
28 Barrier m_input_complete_barrier;
29
30 /*
31 * The work that needs to be done. This should be only when the threads
32 * are NOT running (i.e. before notifying the work condition, after
33 * the input_complete_barrier has reset.)
34 */
35 const uint8_t* m_input = nullptr;
36
37 /*
38 * The length of the work that needs to be done.
39 */
40 size_t m_input_length = 0;
41};
42
43/*
44* Threaded_Fork constructor
45*/
46Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) :
47 Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
48 Filter* filters[4] = {f1, f2, f3, f4};
49 set_next(filters, 4);
50}
51
52/*
53* Threaded_Fork constructor
54*/
55Threaded_Fork::Threaded_Fork(Filter* filters[], size_t count) :
56 Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
57 set_next(filters, count);
58}
59
60Threaded_Fork::~Threaded_Fork() {
61 m_thread_data->m_input = nullptr;
62 m_thread_data->m_input_length = 0;
63
64 m_thread_data->m_input_ready_semaphore.release(m_threads.size());
65
66 for(auto& thread : m_threads) {
67 thread->join();
68 }
69}
70
71std::string Threaded_Fork::name() const {
72 return "Threaded Fork";
73}
74
75void Threaded_Fork::set_next(Filter* f[], size_t n) {
76 Fork::set_next(f, n);
77 n = m_next.size();
78
79 if(n < m_threads.size()) {
80 m_threads.resize(n);
81 } else {
82 m_threads.reserve(n);
83 for(size_t i = m_threads.size(); i != n; ++i) {
84 m_threads.push_back(std::make_shared<std::thread>([this, next = m_next[i]] { thread_entry(next); }));
85 }
86 }
87}
88
89void Threaded_Fork::send(const uint8_t input[], size_t length) {
90 if(!m_write_queue.empty()) {
91 thread_delegate_work(m_write_queue.data(), m_write_queue.size());
92 }
93 thread_delegate_work(input, length);
94
95 bool nothing_attached = true;
96 for(size_t j = 0; j != total_ports(); ++j) {
97 if(m_next[j] != nullptr) {
98 nothing_attached = false;
99 }
100 }
101
102 if(nothing_attached) {
103 m_write_queue += std::make_pair(input, length);
104 } else {
105 m_write_queue.clear();
106 }
107}
108
109void Threaded_Fork::thread_delegate_work(const uint8_t input[], size_t length) {
110 //Set the data to do.
111 m_thread_data->m_input = input;
112 m_thread_data->m_input_length = length;
113
114 //Let the workers start processing.
115 m_thread_data->m_input_complete_barrier.wait(total_ports() + 1);
116 m_thread_data->m_input_ready_semaphore.release(total_ports());
117
118 //Wait for all the filters to finish processing.
119 m_thread_data->m_input_complete_barrier.sync();
120
121 //Reset the thread data
122 m_thread_data->m_input = nullptr;
123 m_thread_data->m_input_length = 0;
124}
125
126void Threaded_Fork::thread_entry(Filter* filter) {
127 while(true) {
128 m_thread_data->m_input_ready_semaphore.acquire();
129
130 if(m_thread_data->m_input == nullptr) {
131 break;
132 }
133
134 filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
135 m_thread_data->m_input_complete_barrier.sync();
136 }
137}
138
139} // namespace Botan
140
141#endif