9#include <botan/filters.h>
11#if defined(BOTAN_HAS_THREAD_UTILS)
13 #include <botan/internal/barrier.h>
14 #include <botan/internal/semaphore.h>
19struct Threaded_Fork_Data {
24 Semaphore m_input_ready_semaphore;
29 Barrier m_input_complete_barrier;
36 const uint8_t* m_input =
nullptr;
41 size_t m_input_length = 0;
47Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) :
48 Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
49 Filter* filters[4] = {f1, f2, f3, f4};
56Threaded_Fork::Threaded_Fork(Filter* filters[],
size_t count) :
57 Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
58 set_next(filters, count);
61Threaded_Fork::~Threaded_Fork() {
62 m_thread_data->m_input =
nullptr;
63 m_thread_data->m_input_length = 0;
65 m_thread_data->m_input_ready_semaphore.release(m_threads.size());
67 for(
auto& thread : m_threads) {
72std::string Threaded_Fork::name()
const {
73 return "Threaded Fork";
76void Threaded_Fork::set_next(Filter* f[],
size_t n) {
80 if(n < m_threads.size()) {
84 for(
size_t i = m_threads.size(); i != n; ++i) {
85 m_threads.push_back(std::make_shared<std::thread>(std::bind(&Threaded_Fork::thread_entry,
this, m_next[i])));
90void Threaded_Fork::send(
const uint8_t input[],
size_t length) {
91 if(!m_write_queue.empty()) {
92 thread_delegate_work(m_write_queue.data(), m_write_queue.size());
94 thread_delegate_work(input, length);
96 bool nothing_attached =
true;
97 for(
size_t j = 0; j != total_ports(); ++j) {
99 nothing_attached =
false;
103 if(nothing_attached) {
104 m_write_queue += std::make_pair(input, length);
106 m_write_queue.clear();
110void Threaded_Fork::thread_delegate_work(
const uint8_t input[],
size_t length) {
112 m_thread_data->m_input = input;
113 m_thread_data->m_input_length = length;
116 m_thread_data->m_input_complete_barrier.wait(total_ports() + 1);
117 m_thread_data->m_input_ready_semaphore.release(total_ports());
120 m_thread_data->m_input_complete_barrier.sync();
123 m_thread_data->m_input =
nullptr;
124 m_thread_data->m_input_length = 0;
127void Threaded_Fork::thread_entry(Filter* filter) {
129 m_thread_data->m_input_ready_semaphore.acquire();
131 if(!m_thread_data->m_input) {
135 filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
136 m_thread_data->m_input_complete_barrier.sync();