Botan 2.19.2
Crypto and TLS for C&
threaded_fork.cpp
Go to the documentation of this file.
1/*
2* Threaded Fork
3* (C) 2013 Joel Low
4* 2013 Jack Lloyd
5*
6* Botan is released under the Simplified BSD License (see license.txt)
7*/
8
9#include <botan/filters.h>
10
11#if defined(BOTAN_HAS_THREAD_UTILS)
12
13#include <botan/internal/semaphore.h>
14#include <botan/internal/barrier.h>
15#include <functional>
16
17namespace Botan {
18
19struct Threaded_Fork_Data
20 {
21 /*
22 * Semaphore for indicating that there is work to be done (or to
23 * quit)
24 */
25 Semaphore m_input_ready_semaphore;
26
27 /*
28 * Synchronises all threads to complete processing data in lock-step.
29 */
30 Barrier m_input_complete_barrier;
31
32 /*
33 * The work that needs to be done. This should be only when the threads
34 * are NOT running (i.e. before notifying the work condition, after
35 * the input_complete_barrier has reset.)
36 */
37 const uint8_t* m_input = nullptr;
38
39 /*
40 * The length of the work that needs to be done.
41 */
42 size_t m_input_length = 0;
43 };
44
45/*
46* Threaded_Fork constructor
47*/
48Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) :
49 Fork(nullptr, static_cast<size_t>(0)),
50 m_thread_data(new Threaded_Fork_Data)
51 {
52 Filter* filters[4] = { f1, f2, f3, f4 };
53 set_next(filters, 4);
54 }
55
56/*
57* Threaded_Fork constructor
58*/
59Threaded_Fork::Threaded_Fork(Filter* filters[], size_t count) :
60 Fork(nullptr, static_cast<size_t>(0)),
61 m_thread_data(new Threaded_Fork_Data)
62 {
63 set_next(filters, count);
64 }
65
66Threaded_Fork::~Threaded_Fork()
67 {
68 m_thread_data->m_input = nullptr;
69 m_thread_data->m_input_length = 0;
70
71 m_thread_data->m_input_ready_semaphore.release(m_threads.size());
72
73 for(auto& thread : m_threads)
74 thread->join();
75 }
76
77std::string Threaded_Fork::name() const
78 {
79 return "Threaded Fork";
80 }
81
82void Threaded_Fork::set_next(Filter* f[], size_t n)
83 {
84 Fork::set_next(f, n);
85 n = m_next.size();
86
87 if(n < m_threads.size())
88 m_threads.resize(n);
89 else
90 {
91 m_threads.reserve(n);
92 for(size_t i = m_threads.size(); i != n; ++i)
93 {
94 m_threads.push_back(
95 std::shared_ptr<std::thread>(
96 new std::thread(
97 std::bind(&Threaded_Fork::thread_entry, this, m_next[i]))));
98 }
99 }
100 }
101
102void Threaded_Fork::send(const uint8_t input[], size_t length)
103 {
104 if(m_write_queue.size())
105 thread_delegate_work(m_write_queue.data(), m_write_queue.size());
106 thread_delegate_work(input, length);
107
108 bool nothing_attached = true;
109 for(size_t j = 0; j != total_ports(); ++j)
110 if(m_next[j])
111 nothing_attached = false;
112
113 if(nothing_attached)
114 m_write_queue += std::make_pair(input, length);
115 else
116 m_write_queue.clear();
117 }
118
119void Threaded_Fork::thread_delegate_work(const uint8_t input[], size_t length)
120 {
121 //Set the data to do.
122 m_thread_data->m_input = input;
123 m_thread_data->m_input_length = length;
124
125 //Let the workers start processing.
126 m_thread_data->m_input_complete_barrier.wait(total_ports() + 1);
127 m_thread_data->m_input_ready_semaphore.release(total_ports());
128
129 //Wait for all the filters to finish processing.
130 m_thread_data->m_input_complete_barrier.sync();
131
132 //Reset the thread data
133 m_thread_data->m_input = nullptr;
134 m_thread_data->m_input_length = 0;
135 }
136
137void Threaded_Fork::thread_entry(Filter* filter)
138 {
139 while(true)
140 {
141 m_thread_data->m_input_ready_semaphore.acquire();
142
143 if(!m_thread_data->m_input)
144 break;
145
146 filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
147 m_thread_data->m_input_complete_barrier.sync();
148 }
149 }
150
151}
152
153#endif
std::string name
Definition: alg_id.cpp:13