9#include <botan/filters.h>
11#if defined(BOTAN_HAS_THREAD_UTILS)
13 #include <botan/internal/barrier.h>
14 #include <botan/internal/semaphore.h>
18struct Threaded_Fork_Data {
23 Semaphore m_input_ready_semaphore;
28 Barrier m_input_complete_barrier;
35 const uint8_t* m_input =
nullptr;
40 size_t m_input_length = 0;
47 Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
48 Filter* filters[4] = {f1, f2, f3, f4};
55Threaded_Fork::Threaded_Fork(Filter* filters[],
size_t count) :
56 Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
57 set_next(filters, count);
60Threaded_Fork::~Threaded_Fork() {
61 m_thread_data->m_input =
nullptr;
62 m_thread_data->m_input_length = 0;
64 m_thread_data->m_input_ready_semaphore.release(m_threads.size());
66 for(
auto& thread : m_threads) {
71std::string Threaded_Fork::name()
const {
72 return "Threaded Fork";
75void Threaded_Fork::set_next(Filter* f[],
size_t n) {
79 if(n < m_threads.size()) {
83 for(
size_t i = m_threads.size(); i != n; ++i) {
84 m_threads.push_back(std::make_shared<std::thread>([
this, next = m_next[i]] { thread_entry(next); }));
89void Threaded_Fork::send(
const uint8_t input[],
size_t length) {
90 if(!m_write_queue.empty()) {
91 thread_delegate_work(m_write_queue.data(), m_write_queue.size());
93 thread_delegate_work(input, length);
95 bool nothing_attached =
true;
96 for(
size_t j = 0; j != total_ports(); ++j) {
97 if(m_next[j] !=
nullptr) {
98 nothing_attached =
false;
102 if(nothing_attached) {
103 m_write_queue += std::make_pair(input, length);
105 m_write_queue.clear();
109void Threaded_Fork::thread_delegate_work(
const uint8_t input[],
size_t length) {
111 m_thread_data->m_input = input;
112 m_thread_data->m_input_length = length;
115 m_thread_data->m_input_complete_barrier.wait(total_ports() + 1);
116 m_thread_data->m_input_ready_semaphore.release(total_ports());
119 m_thread_data->m_input_complete_barrier.sync();
122 m_thread_data->m_input =
nullptr;
123 m_thread_data->m_input_length = 0;
126void Threaded_Fork::thread_entry(Filter* filter) {
128 m_thread_data->m_input_ready_semaphore.acquire();
130 if(m_thread_data->m_input ==
nullptr) {
134 filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
135 m_thread_data->m_input_complete_barrier.sync();