Program Listing for File parallel_factorizer.hpp

Return to documentation for file (src/cpp/parallel_factorizer.hpp)

#pragma once
#include "factorizer.hpp"
#include <sdsl/suffix_trees.hpp>
#include <sdsl/rmq_succinct_sct.hpp>
#include <vector>
#include <thread>
#include <mutex>
#include <atomic>
#include <filesystem>
#include <fstream>
#include <optional>

namespace noLZSS {

// Define cst_t for use in parallel factorization
using cst_t = sdsl::cst_sada<>;

struct ThreadContext {
    size_t thread_id;               // Thread identifier
    size_t start_pos;               // Starting position in text
    size_t end_pos;                 // Position to stop (start of next thread)
    size_t text_length;             // Total text length
    std::string temp_file_path;     // Path to temporary file for this thread
    bool is_last_thread;            // Flag indicating if this is the last thread

    // Convergence tracking state
    size_t next_thread_factor_index = 0;  // Index of next factor to read from next thread
    std::optional<Factor> last_read_factor;  // Last factor read from next thread's file
};

class ParallelFactorizer {
public:
    static constexpr size_t MIN_CHARS_PER_THREAD = 100000;

    ParallelFactorizer() = default;

    ~ParallelFactorizer() = default;

    size_t parallel_factorize(std::string_view text, const std::string& output_path,
                             size_t num_threads = 0, size_t start_pos = 0);

    size_t parallel_factorize_file(const std::string& input_path, const std::string& output_path,
                                  size_t num_threads = 0, size_t start_pos = 0);

    size_t parallel_factorize_dna_w_rc(std::string_view text, const std::string& output_path,
                                      size_t num_threads = 0, size_t start_pos = 0);

    size_t parallel_factorize_multiple_dna_w_rc(const std::string& prepared_string,
                                                size_t original_length,
                                                const std::string& output_path,
                                                size_t num_threads = 0,
                                                size_t start_pos = 0);

    size_t parallel_factorize_file_dna_w_rc(const std::string& input_path, const std::string& output_path,
                                           size_t num_threads = 0);

    std::string create_temp_file_path(size_t thread_id);

    size_t merge_temp_files(const std::string& output_path,
                           std::vector<ThreadContext>& contexts);

    void cleanup_temp_files(const std::vector<ThreadContext>& contexts);

    void factorize_dna_w_rc_thread(const cst_t& cst,
                                   const sdsl::rmq_succinct_sct<>& rmqF,
                                   const sdsl::rmq_succinct_sct<>& rmqRcEnd,
                                   const sdsl::int_vector<64>& fwd_starts,
                                   const sdsl::int_vector<64>& rc_ends,
                                   uint64_t INF,
                                   size_t N,
                                   ThreadContext& ctx,
                                   std::vector<ThreadContext>& all_contexts,
                                   std::vector<std::mutex>& file_mutexes);

private:
    void factorize_thread(const cst_t& cst, const sdsl::rmq_succinct_sct<>& rmq,
                         ThreadContext& ctx,
                         std::vector<ThreadContext>& all_contexts,
                         std::vector<std::mutex>& file_mutexes);

    std::optional<Factor> read_factor_at_index(const std::string& file_path, size_t factor_index);

    bool check_convergence(size_t current_end, ThreadContext& next_ctx,
                          std::mutex& next_file_mutex);

    void write_factor(const Factor& factor, const std::string& file_path,
                      std::mutex& file_mutex);

    std::optional<Factor> read_factor_at(const std::string& file_path,
                                      size_t index,
                                      std::mutex& file_mutex);
};

} // namespace noLZSS