Program Listing for File parallel_factorizer.cpp

Return to documentation for file (src/cpp/parallel_factorizer.cpp)

#include "parallel_factorizer.hpp"
#include "factorizer_helpers.hpp"
#include <algorithm>
#include <iostream>
#include <string>
#include <chrono>

namespace fs = std::filesystem;
namespace noLZSS {
// Helper functions lcp() and next_leaf() are now in factorizer_helpers.hpp

std::string ParallelFactorizer::create_temp_file_path(size_t thread_id) {
    auto timestamp = std::chrono::system_clock::now().time_since_epoch().count();
    std::string temp_dir = fs::temp_directory_path().string();
    return temp_dir + "/noLZSS_temp_" + std::to_string(timestamp) + "_" +
           std::to_string(thread_id) + ".bin";
}

size_t ParallelFactorizer::parallel_factorize(std::string_view text, const std::string& output_path,
                                           size_t num_threads, size_t start_pos) {
    if (text.empty()) return 0;

    // Ensure start_pos is within bounds
    if (start_pos >= text.length()) {
        throw std::invalid_argument("start_pos must be less than text length");
    }

    // Determine optimal thread count if not specified
    if (num_threads == 0) {
        // Calculate maximum useful threads based on remaining text size
        size_t remaining_length = text.length() - start_pos;
        size_t max_useful_threads = remaining_length / MIN_CHARS_PER_THREAD;

        // Use available hardware threads, but don't exceed what's useful for this input size
        size_t hardware_threads = std::thread::hardware_concurrency();
        num_threads = std::min(hardware_threads, max_useful_threads);

        // Ensure at least one thread
        num_threads = std::max(1UL, num_threads);
    }

    // Create suffix tree for all threads to use
    std::string tmp(text);
    cst_t cst;
    construct_im(cst, tmp, 1);

    // Create RMQ support once for all threads
    sdsl::rmq_succinct_sct<> rmq(&cst.csa);

    // Create contexts for each thread
    std::vector<ThreadContext> contexts(num_threads);

    // Divide work among threads, starting from start_pos
    const size_t remaining_length = text.length() - start_pos;
    const size_t chunk_size = remaining_length / num_threads;
    for (size_t i = 0; i < num_threads; ++i) {
        contexts[i].thread_id = i;
        contexts[i].start_pos = start_pos + (i * chunk_size);
        contexts[i].end_pos = (i + 1 < num_threads) ? (start_pos + (i + 1) * chunk_size) : text.length();
        contexts[i].text_length = text.length();
        // Thread 0 writes directly to output file; others use temp files
        contexts[i].temp_file_path = (i == 0) ? output_path : create_temp_file_path(i);
        contexts[i].is_last_thread = (i == num_threads - 1);
    }

    // Create mutexes for file access
    std::vector<std::mutex> file_mutexes(num_threads);

    // Track exceptions from threads
    std::vector<std::exception_ptr> thread_exceptions(num_threads, nullptr);
    std::mutex exception_mutex;

    // Create and start worker threads with exception handling
    std::vector<std::thread> threads;
    for (size_t i = 0; i < num_threads; ++i) {
        threads.emplace_back([this, &cst, &rmq, &contexts, &file_mutexes, &thread_exceptions, &exception_mutex, i]() {
            try {
                factorize_thread(cst, rmq, contexts[i], contexts, file_mutexes);
            } catch (...) {
                // Capture exception for the main thread to handle
                std::lock_guard<std::mutex> lock(exception_mutex);
                thread_exceptions[i] = std::current_exception();
            }
        });
    }

    // Wait for all threads to complete
    for (auto& t : threads) {
        if (t.joinable()) t.join();
    }

    // Check for exceptions from threads
    for (size_t i = 0; i < num_threads; ++i) {
        if (thread_exceptions[i]) {
            // Cleanup temporary files before rethrowing
            cleanup_temp_files(contexts);
            std::rethrow_exception(thread_exceptions[i]);
        }
    }

    // Merge temporary files and create final output
    size_t total_factors = merge_temp_files(output_path, contexts);

    // Cleanup temporary files
    cleanup_temp_files(contexts);

    return total_factors;
}

void ParallelFactorizer::factorize_thread(const cst_t& cst, const sdsl::rmq_succinct_sct<>& rmq,
                                        ThreadContext& ctx,
                                        std::vector<ThreadContext>& all_contexts,
                                        std::vector<std::mutex>& file_mutexes) {
    // Initialize factorization at our starting position
    auto lambda = cst.select_leaf(cst.csa.isa[ctx.start_pos] + 1);
    size_t lambda_node_depth = cst.node_depth(lambda);
    size_t lambda_sufnum = ctx.start_pos;

    // Create or truncate output file
    {
        std::lock_guard<std::mutex> lock(file_mutexes[ctx.thread_id]);
        std::ofstream ofs(ctx.temp_file_path, std::ios::binary | std::ios::trunc);
    }

    // Track the next thread for convergence checking
    ThreadContext* next_ctx = nullptr;
    if (!ctx.is_last_thread && ctx.thread_id + 1 < all_contexts.size()) {
        next_ctx = &all_contexts[ctx.thread_id + 1];
    }

    // Main factorization loop
    while (lambda_sufnum < ctx.text_length) {
        // Compute current factor
        size_t d = 1;
        size_t l = 1;
        cst_t::node_type v;
        size_t v_min_leaf_sufnum = 0;
        size_t u_min_leaf_sufnum = 0;
        Factor current_factor;

        // Factor computation logic (similar to original nolzss)
        while (true) {
            v = cst.bp_support.level_anc(lambda, lambda_node_depth - d);
            v_min_leaf_sufnum = cst.csa[rmq(cst.lb(v), cst.rb(v))];
            l = cst.depth(v);

            if (v_min_leaf_sufnum + l - 1 < lambda_sufnum) {
                u_min_leaf_sufnum = v_min_leaf_sufnum;
                ++d;
                continue;
            }

            auto u = cst.parent(v);
            auto u_depth = cst.depth(u);

            if (v_min_leaf_sufnum == lambda_sufnum) {
                if (u == cst.root()) {
                    l = 1;
                    current_factor = Factor{static_cast<uint64_t>(lambda_sufnum),
                                         static_cast<uint64_t>(l),
                                         static_cast<uint64_t>(lambda_sufnum)};
                } else {
                    l = u_depth;
                    current_factor = Factor{static_cast<uint64_t>(lambda_sufnum),
                                         static_cast<uint64_t>(l),
                                         static_cast<uint64_t>(u_min_leaf_sufnum)};
                }
            } else {
                l = std::min(lcp(cst, lambda_sufnum, v_min_leaf_sufnum),
                           (lambda_sufnum - v_min_leaf_sufnum));
                if (l <= u_depth) {
                    l = u_depth;
                    current_factor = Factor{static_cast<uint64_t>(lambda_sufnum),
                                         static_cast<uint64_t>(l),
                                         static_cast<uint64_t>(u_min_leaf_sufnum)};
                } else {
                    current_factor = Factor{static_cast<uint64_t>(lambda_sufnum),
                                         static_cast<uint64_t>(l),
                                         static_cast<uint64_t>(v_min_leaf_sufnum)};
                }
            }
            break;
        }

        // Write the factor to temporary file
        write_factor(current_factor, ctx.temp_file_path, file_mutexes[ctx.thread_id]);

        // Check for convergence when factor extends into next thread's region
        // Use lambda_sufnum + l (exclusive end) to check against next thread's start
        if (next_ctx && lambda_sufnum + l >= next_ctx->start_pos) {
            size_t current_end = lambda_sufnum + l;  // Exclusive end position
            if (check_convergence(current_end, *next_ctx, file_mutexes[next_ctx->thread_id])) {
                break;  // Convergence detected
            }
        }

        // Advance to next position
        lambda = next_leaf(cst, lambda, l);
        lambda_node_depth = cst.node_depth(lambda);
        lambda_sufnum = cst.sn(lambda);
    }
}

void ParallelFactorizer::factorize_dna_w_rc_thread(const cst_t& cst,
                                                  const sdsl::rmq_succinct_sct<>& rmqF,
                                                  const sdsl::rmq_succinct_sct<>& rmqRcEnd,
                                                  const sdsl::int_vector<64>& fwd_starts,
                                                  const sdsl::int_vector<64>& rc_ends,
                                                  uint64_t INF,
                                                  size_t N,
                                                  ThreadContext& ctx,
                                                  std::vector<ThreadContext>& all_contexts,
                                                  std::vector<std::mutex>& file_mutexes) {
    // Initialize to the leaf of suffix starting at position ctx.start_pos
    auto lambda = cst.select_leaf(cst.csa.isa[ctx.start_pos] + 1);
    size_t lambda_node_depth = cst.node_depth(lambda);
    size_t i = cst.sn(lambda); // Current position in text

    // Create or truncate output file
    {
        std::lock_guard<std::mutex> lock(file_mutexes[ctx.thread_id]);
        std::ofstream ofs(ctx.temp_file_path, std::ios::binary | std::ios::trunc);
    }

    // Track the next thread for convergence checking
    ThreadContext* next_ctx = nullptr;
    if (!ctx.is_last_thread && ctx.thread_id + 1 < all_contexts.size()) {
        next_ctx = &all_contexts[ctx.thread_id + 1];
    }

    // Main factorization loop - only process the original sequence (0 to N)
    while (i < N) {
        // At factor start i (0-based in T), walk up ancestors and track best FWD and RC candidates independently

        // Track best FWD candidate
        bool   have_fwd       = false;
        size_t best_fwd_start = 0;  // start in T (for FWD)
        size_t best_fwd_depth = 0;  // tree depth of best FWD candidate

        // Track best RC candidate
        bool   have_rc        = false;
        size_t best_rc_end    = 0;  // end in T (for RC)
        size_t best_rc_posS   = 0;  // pos in S where RC candidate suffix starts (for LCP)
        size_t best_rc_depth  = 0;  // tree depth of best RC candidate

        // Walk from leaf to root via level_anc
        for (size_t step = 1; step <= lambda_node_depth; ++step) {
            auto v = cst.bp_support.level_anc(lambda, lambda_node_depth - step);
            size_t ell = cst.depth(v);
            if (ell == 0) break; // reached root

            auto lb = cst.lb(v), rb = cst.rb(v);

            // Forward candidate (min start in T within v's interval)
            size_t kF = rmqF(lb, rb);
            uint64_t jF = fwd_starts[kF];
            bool okF = (jF != INF) && (jF + ell - 1 < i); // non-overlap: endF <= i-1

            // RC candidate (min END in T within v's interval; monotone with depth)
            size_t kR = rmqRcEnd(lb, rb);
            uint64_t endRC = rc_ends[kR];
            bool okR = (endRC != INF) && (endRC < i); // endRC <= i-1

            if (!okF && !okR) {
                // deeper nodes can only increase jF and the minimal RC end
                // -> non-overlap won't become true again for either; stop
                break;
            }

            // Update best FWD candidate independently
            if (okF) {
                if (ell > best_fwd_depth ||
                    (ell == best_fwd_depth && (jF + ell - 1) < (best_fwd_start + best_fwd_depth - 1))) {
                    have_fwd = true;
                    best_fwd_start = jF;
                    best_fwd_depth = ell;
                }
            }

            // Update best RC candidate independently
            if (okR) {
                size_t posS_R = cst.csa[kR]; // suffix position in S for LCP
                if (ell > best_rc_depth ||
                    (ell == best_rc_depth && (endRC < best_rc_end))) {
                    have_rc = true;
                    best_rc_end = endRC;
                    best_rc_posS = posS_R;
                    best_rc_depth = ell;
                }
            }
        }

        size_t emit_len = 1;
        uint64_t emit_ref = i; // default for literal

        if (!have_fwd && !have_rc) {
            // No previous occurrence (FWD nor RC) — literal of length 1
            Factor f{static_cast<uint64_t>(i), static_cast<uint64_t>(emit_len), static_cast<uint64_t>(emit_ref)};
            write_factor(f, ctx.temp_file_path, file_mutexes[ctx.thread_id]);
        } else {
            // Compute true lengths for both candidates
            size_t fwd_true_len = 0;
            size_t rc_true_len = 0;

            if (have_fwd) {
                size_t cap = i - best_fwd_start; // non-overlap cap
                size_t L = lcp(cst, i, best_fwd_start);
                fwd_true_len = std::min(L, cap);
            }

            if (have_rc) {
                rc_true_len = lcp(cst, i, best_rc_posS);
            }

            // Choose based on TRUE length, with FWD preference at equal length.
            // A literal (self-reference, length 1) is always available as a fallback.
            // Only use RC if it provides strictly longer match than both FWD and literal.
            bool use_fwd = false;
            bool use_literal = false;

            if (have_fwd && fwd_true_len >= 1) {
                // Have a real forward match
                if (have_rc && rc_true_len > fwd_true_len) {
                    use_fwd = false;  // RC is strictly longer
                } else {
                    use_fwd = true;   // FWD wins (including ties)
                }
            } else {
                // No real forward match; compare RC against literal (length 1)
                if (have_rc && rc_true_len > 1) {
                    use_fwd = false;  // RC is strictly longer than literal
                } else {
                    use_literal = true;  // Use literal (self-reference)
                }
            }

            if (use_literal) {
                // Emit literal
                emit_len = 1;
                emit_ref = static_cast<uint64_t>(i);
            } else if (use_fwd) {
                emit_len = fwd_true_len;
                emit_ref = static_cast<uint64_t>(best_fwd_start);
            } else {
                emit_len = rc_true_len;
                size_t start_pos_val = best_rc_end - emit_len + 1;  // start = end - len + 1
                emit_ref = RC_MASK | static_cast<uint64_t>(start_pos_val); // start-anchored + RC flag
            }

            // Safety: ensure progress
            if (emit_len == 0) {
                throw std::runtime_error("emit_len must be positive to ensure factorization progress");
            }

            Factor f{static_cast<uint64_t>(i), static_cast<uint64_t>(emit_len), emit_ref};
            write_factor(f, ctx.temp_file_path, file_mutexes[ctx.thread_id]);
        }

        // Check for convergence
        if (next_ctx && i + emit_len >= next_ctx->start_pos) {
            size_t current_end = i + emit_len;
            if (check_convergence(current_end, *next_ctx, file_mutexes[next_ctx->thread_id])) {
                break; // Convergence detected
            }
        }

        // Advance to next position
        lambda = next_leaf(cst, lambda, emit_len);
        lambda_node_depth = cst.node_depth(lambda);
        i = cst.sn(lambda);
    }
}

std::optional<Factor> ParallelFactorizer::read_factor_at_index(const std::string& file_path,
                                                                size_t factor_index) {
    std::ifstream ifs(file_path, std::ios::binary);
    if (!ifs) {
        return std::nullopt;
    }

    // Seek to the position of the requested factor
    ifs.seekg(factor_index * sizeof(Factor), std::ios::beg);
    if (!ifs) {
        return std::nullopt; // Index out of bounds
    }

    // Read the factor
    Factor factor;
    ifs.read(reinterpret_cast<char*>(&factor), sizeof(Factor));
    if (!ifs || ifs.gcount() != sizeof(Factor)) {
        return std::nullopt; // Failed to read complete factor
    }

    return factor;
}

bool ParallelFactorizer::check_convergence(size_t current_end, ThreadContext& next_ctx,
                                        std::mutex& next_file_mutex) {
    // If we have a cached factor, check it first
    if (next_ctx.last_read_factor.has_value()) {
        const auto& cached_factor = next_ctx.last_read_factor.value();
        size_t cached_end = cached_factor.start + cached_factor.length;

        if (cached_end > current_end) {
            // The cached factor ends beyond our current position,
            // so we haven't reached convergence yet
            return false;
        }

        if (cached_factor.start == current_end) {
            // Found convergence: next thread's factor starts exactly where we end
            return true;
        }

        // cached_end <= current_end but cached_factor.start < current_end
        // Need to read the next factor
    }

    // Read factors one at a time until we find convergence or go past current_end
    while (true) {
        std::lock_guard<std::mutex> lock(next_file_mutex);

        auto factor_opt = read_factor_at_index(next_ctx.temp_file_path,
                                               next_ctx.next_thread_factor_index);

        if (!factor_opt.has_value()) {
            // No more factors in next thread's file - convergence not found yet
            return false;
        }

        Factor factor = factor_opt.value();
        next_ctx.last_read_factor = factor;
        next_ctx.next_thread_factor_index++;

        if (factor.start == current_end) {
            // Found convergence
            return true;
        }

        if (factor.start > current_end) {
            // Next thread's factor starts beyond our current position
            // We haven't reached convergence yet
            return false;
        }

        // factor.start < current_end, keep reading next factors
    }
}

void ParallelFactorizer::write_factor(const Factor& factor, const std::string& file_path,
                                   std::mutex& file_mutex) {
    std::lock_guard<std::mutex> lock(file_mutex);

    std::ofstream ofs(file_path, std::ios::binary | std::ios::app);
    if (!ofs) {
        throw std::runtime_error("Cannot open temporary file for writing: " + file_path);
    }

    ofs.write(reinterpret_cast<const char*>(&factor), sizeof(Factor));
}

std::optional<Factor> ParallelFactorizer::read_factor_at(const std::string& file_path,
                                                     size_t index,
                                                     std::mutex& file_mutex) {
    std::lock_guard<std::mutex> lock(file_mutex);

    std::ifstream ifs(file_path, std::ios::binary);
    if (!ifs) {
        return std::nullopt;
    }

    // Seek to the position of the requested factor
    ifs.seekg(index * sizeof(Factor));

    Factor factor;
    if (ifs.read(reinterpret_cast<char*>(&factor), sizeof(Factor))) {
        return factor;
    }

    return std::nullopt;
}

size_t ParallelFactorizer::merge_temp_files(const std::string& output_path,
                                         std::vector<ThreadContext>& contexts) {
    // Thread 0 already wrote to output_path, so we open in append mode
    // to add factors from other threads

    size_t total_factors = 0;
    uint64_t total_length = 0;
    size_t current_position = 0;  // Track the current end position in the merged output
    size_t text_length = contexts.empty() ? 0 : contexts[0].text_length;
    std::optional<Factor> last_written_factor;

    // First, read thread 0's factors to find the last one and count them
    {
        std::ifstream ifs(output_path, std::ios::binary);
        if (!ifs) {
            throw std::runtime_error("Cannot open output file for reading: " + output_path);
        }

        Factor factor;
        while (ifs.read(reinterpret_cast<char*>(&factor), sizeof(Factor))) {
            total_factors++;
            total_length += factor.length;
            last_written_factor = factor;

            size_t factor_end = factor.start + factor.length;
            if (factor_end > current_position) {
                current_position = factor_end;
            }
        }
    }

    // Process remaining threads if there are multiple threads and sequence isn't complete
    if (contexts.size() > 1 && current_position < text_length) {
        // Open output file in append mode for adding factors from other threads
        std::ofstream ofs(output_path, std::ios::binary | std::ios::app);
        if (!ofs) {
            throw std::runtime_error("Cannot open output file for appending: " + output_path);
        }

        // Process remaining threads (i >= 1)
        for (size_t i = 1; i < contexts.size(); i++) {
            // Check if we've already covered the entire sequence
            if (current_position >= text_length) {
                break;  // Stop merging - sequence is complete
            }

            std::ifstream ifs(contexts[i].temp_file_path, std::ios::binary);
            if (!ifs) continue;

            Factor factor;
            bool found_convergence = false;

            // For subsequent threads, skip factors until we find convergence point
            // Convergence: last_written_factor.end == current_factor.start

            if (!last_written_factor.has_value()) {
                // No last written factor - shouldn't happen, but handle gracefully
                continue;
            }

            size_t last_end = last_written_factor->start + last_written_factor->length;

            // Read factors and look for convergence
            while (ifs.read(reinterpret_cast<char*>(&factor), sizeof(Factor))) {
                if (!found_convergence) {
                    // Still looking for convergence point
                    if (factor.start == last_end) {
                        // Found convergence! Start copying from this factor onwards
                        found_convergence = true;
                    } else {
                        // Skip this factor - it's before convergence
                        continue;
                    }
                }

                // Write factor to output (either we found convergence or we're continuing)
                ofs.write(reinterpret_cast<const char*>(&factor), sizeof(Factor));
                total_factors++;
                total_length += factor.length;
                last_written_factor = factor;

                size_t factor_end = factor.start + factor.length;
                if (factor_end > current_position) {
                    current_position = factor_end;
                }
                last_end = factor_end;  // Update for next iteration

                // Stop if we've covered the entire sequence
                if (current_position >= text_length) {
                    break;
                }
            }

            // If we've covered the entire sequence, stop processing more threads
            if (current_position >= text_length) {
                break;
            }
        }
    }

    // Write footer at the end (v2 format) - single location for all cases
    std::ofstream ofs(output_path, std::ios::binary | std::ios::app);
    if (!ofs) {
        throw std::runtime_error("Cannot open output file for appending footer: " + output_path);
    }

    FactorFileFooter footer;
    footer.num_factors = total_factors;
    footer.num_sequences = 0;  // Unknown for general (non-FASTA) factorization
    footer.num_sentinels = 0;  // No sentinels for general factorization
    footer.footer_size = sizeof(FactorFileFooter);
    footer.total_length = total_length;

    ofs.write(reinterpret_cast<const char*>(&footer), sizeof(footer));

    return total_factors;
}

void ParallelFactorizer::cleanup_temp_files(const std::vector<ThreadContext>& contexts) {
    for (const auto& ctx : contexts) {
        // Skip thread 0 - it wrote directly to the output file
        if (ctx.thread_id == 0) continue;

        try {
            fs::remove(ctx.temp_file_path);
        } catch (const std::exception& e) {
            std::cerr << "Warning: Failed to remove temporary file: " << ctx.temp_file_path
                     << " - " << e.what() << std::endl;
        }
    }
}

size_t ParallelFactorizer::parallel_factorize_file(const std::string& input_path,
                                                const std::string& output_path,
                                                size_t num_threads,
                                                size_t start_pos) {
    // Read the file content
    std::ifstream is(input_path, std::ios::binary);
    if (!is) {
        throw std::runtime_error("Cannot open input file: " + input_path);
    }

    std::string data((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
    return parallel_factorize(data, output_path, num_threads, start_pos);
}

size_t ParallelFactorizer::parallel_factorize_multiple_dna_w_rc(const std::string& prepared_string,
                                                                size_t original_length,
                                                                const std::string& output_path,
                                                                size_t num_threads,
                                                                size_t start_pos) {
    if (prepared_string.empty() || original_length == 0) return 0;

    const std::string& S = prepared_string;

    // Validate minimum string length to avoid underflow and SDSL errors
    // Format for w/ RC: T1!T2@...Tn$rt(Tn)%...rt(T1)&
    // Minimum valid input: "A$A&" (4 chars: sequence + sentinel + reverse_complement + sentinel)
    // - Position 0: Single nucleotide (e.g., 'A')
    // - Position 1: Sentinel separating original from RC (e.g., '$')
    // - Position 2: Reverse complement of the nucleotide (e.g., 'A' is revcomp of 'T')
    // - Position 3: Final sentinel (e.g., '&')
    if (S.size() < 4) {
        std::cerr << "Warning: Input string too short for factorization with reverse complement (size="
                  << S.size() << "). Returning 0 factors." << std::endl;
        return 0;
    }

    const size_t N = (S.size() / 2) - 1;
    if (N == 0) {
        std::cerr << "Warning: Computed N=0 from input size=" << S.size()
                  << ". Returning 0 factors." << std::endl;
        return 0;
    }

    // Validate start_pos
    if (start_pos >= N) {
        throw std::invalid_argument("start_pos must be less than the original sequence length");
    }

    // For DNA w/ RC, we can only parallelize the original sequence part (start_pos to N)
    // The algorithm needs the full prepared string with RC for suffix tree

    // Determine optimal thread count if not specified
    if (num_threads == 0) {
        size_t remaining_length = N - start_pos;
        size_t max_useful_threads = remaining_length / MIN_CHARS_PER_THREAD;
        size_t hardware_threads = std::thread::hardware_concurrency();
        num_threads = std::min(hardware_threads, max_useful_threads);
        num_threads = std::max(1UL, num_threads);
    }

    // Build CST over the prepared string (T + sentinel + RC(T) + sentinel)
    cst_t cst;
    construct_im(cst, S, 1);

    // Build RMQ structures for DNA w/ RC algorithm
    const uint64_t INF = std::numeric_limits<uint64_t>::max() / 2ULL;
    sdsl::int_vector<64> fwd_starts(cst.csa.size(), INF);
    sdsl::int_vector<64> rc_ends(cst.csa.size(), INF);

    const size_t T_end = N;           // end of original (exclusive)
    const size_t R_beg = N + 1;       // first char of rc (skip sentinel at position N)
    const size_t R_end = S.size() - 1;   // exclude final sentinel

    for (size_t k = 0; k < cst.csa.size(); ++k) {
        size_t posS = cst.csa[k];
        if (posS < T_end) {
            // suffix starts in T
            fwd_starts[k] = posS;     // 0-based start in T
        } else if (posS >= R_beg && posS < R_end) {
            // suffix starts in R
            size_t jR0   = posS - R_beg;         // 0-based start in R
            size_t endT0 = N - jR0 - 1;          // mapped end in T (0-based)
            rc_ends[k] = endT0;
        }
    }

    sdsl::rmq_succinct_sct<> rmqF(&fwd_starts);
    sdsl::rmq_succinct_sct<> rmqRcEnd(&rc_ends);

    // Create contexts for each thread - divide only from start_pos to N
    std::vector<ThreadContext> contexts(num_threads);
    const size_t remaining_length = N - start_pos;
    const size_t chunk_size = remaining_length / num_threads;

    for (size_t i = 0; i < num_threads; ++i) {
        contexts[i].thread_id = i;
        contexts[i].start_pos = start_pos + (i * chunk_size);
        contexts[i].end_pos = (i + 1 < num_threads) ? (start_pos + (i + 1) * chunk_size) : N;
        contexts[i].text_length = N; // Only the original sequence length
        contexts[i].temp_file_path = (i == 0) ? output_path : create_temp_file_path(i);
        contexts[i].is_last_thread = (i == num_threads - 1);
    }

    // Create mutexes for file access
    std::vector<std::mutex> file_mutexes(num_threads);

    // Track exceptions from threads
    std::vector<std::exception_ptr> thread_exceptions(num_threads, nullptr);
    std::mutex exception_mutex;

    // Lambda to pass RMQ structures to threads with exception handling
    auto factorize_dna_thread = [&](ThreadContext& ctx) {
        try {
            factorize_dna_w_rc_thread(cst, rmqF, rmqRcEnd, fwd_starts, rc_ends, INF, N,
                                     ctx, contexts, file_mutexes);
        } catch (...) {
            // Capture exception for the main thread to handle
            std::lock_guard<std::mutex> lock(exception_mutex);
            thread_exceptions[ctx.thread_id] = std::current_exception();
        }
    };

    // Create and start worker threads
    std::vector<std::thread> threads;
    for (size_t i = 0; i < num_threads; ++i) {
        threads.emplace_back(factorize_dna_thread, std::ref(contexts[i]));
    }

    // Wait for all threads to complete
    for (auto& t : threads) {
        if (t.joinable()) t.join();
    }

    // Check for exceptions from threads
    for (size_t i = 0; i < num_threads; ++i) {
        if (thread_exceptions[i]) {
            // Cleanup temporary files before rethrowing
            cleanup_temp_files(contexts);
            std::rethrow_exception(thread_exceptions[i]);
        }
    }

    // Merge temporary files and create final output
    size_t total_factors = merge_temp_files(output_path, contexts);

    // Cleanup temporary files
    cleanup_temp_files(contexts);

    return total_factors;
}

size_t ParallelFactorizer::parallel_factorize_dna_w_rc(std::string_view text,
                                                    const std::string& output_path,
                                                    size_t num_threads,
                                                    size_t start_pos) {
    if (text.empty()) return 0;

    // Prepare the DNA sequence with reverse complement
    std::string text_str(text);
    PreparedSequenceResult prep_result = prepare_multiple_dna_sequences_w_rc({text_str});

    // Call the core function with the prepared string
    return parallel_factorize_multiple_dna_w_rc(prep_result.prepared_string,
                                               prep_result.original_length,
                                               output_path,
                                               num_threads,
                                               start_pos);
}

size_t ParallelFactorizer::parallel_factorize_file_dna_w_rc(const std::string& input_path,
                                                         const std::string& output_path,
                                                         size_t num_threads) {
    std::ifstream is(input_path, std::ios::binary);
    if (!is) {
        throw std::runtime_error("Cannot open input file: " + input_path);
    }

    std::string data((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
    return parallel_factorize_dna_w_rc(data, output_path, num_threads);
}

} // namespace noLZSS