.. _program_listing_file_src_cpp_parallel_factorizer.cpp: Program Listing for File parallel_factorizer.cpp ================================================ |exhale_lsh| :ref:`Return to documentation for file ` (``src/cpp/parallel_factorizer.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "parallel_factorizer.hpp" #include "factorizer_helpers.hpp" #include #include #include #include namespace fs = std::filesystem; namespace noLZSS { // Helper functions lcp() and next_leaf() are now in factorizer_helpers.hpp std::string ParallelFactorizer::create_temp_file_path(size_t thread_id) { auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); std::string temp_dir = fs::temp_directory_path().string(); return temp_dir + "/noLZSS_temp_" + std::to_string(timestamp) + "_" + std::to_string(thread_id) + ".bin"; } size_t ParallelFactorizer::parallel_factorize(std::string_view text, const std::string& output_path, size_t num_threads, size_t start_pos) { if (text.empty()) return 0; // Ensure start_pos is within bounds if (start_pos >= text.length()) { throw std::invalid_argument("start_pos must be less than text length"); } // Determine optimal thread count if not specified if (num_threads == 0) { // Calculate maximum useful threads based on remaining text size size_t remaining_length = text.length() - start_pos; size_t max_useful_threads = remaining_length / MIN_CHARS_PER_THREAD; // Use available hardware threads, but don't exceed what's useful for this input size size_t hardware_threads = std::thread::hardware_concurrency(); num_threads = std::min(hardware_threads, max_useful_threads); // Ensure at least one thread num_threads = std::max(1UL, num_threads); } // Create suffix tree for all threads to use std::string tmp(text); cst_t cst; construct_im(cst, tmp, 1); // Create RMQ support once for all threads sdsl::rmq_succinct_sct<> rmq(&cst.csa); // Create contexts for each thread std::vector contexts(num_threads); // Divide work among threads, starting from start_pos const size_t remaining_length = text.length() - start_pos; const size_t chunk_size = remaining_length / num_threads; for (size_t i = 0; i < num_threads; ++i) { contexts[i].thread_id = i; contexts[i].start_pos = start_pos + (i * chunk_size); contexts[i].end_pos = (i + 1 < num_threads) ? (start_pos + (i + 1) * chunk_size) : text.length(); contexts[i].text_length = text.length(); // Thread 0 writes directly to output file; others use temp files contexts[i].temp_file_path = (i == 0) ? output_path : create_temp_file_path(i); contexts[i].is_last_thread = (i == num_threads - 1); } // Create mutexes for file access std::vector file_mutexes(num_threads); // Track exceptions from threads std::vector thread_exceptions(num_threads, nullptr); std::mutex exception_mutex; // Create and start worker threads with exception handling std::vector threads; for (size_t i = 0; i < num_threads; ++i) { threads.emplace_back([this, &cst, &rmq, &contexts, &file_mutexes, &thread_exceptions, &exception_mutex, i]() { try { factorize_thread(cst, rmq, contexts[i], contexts, file_mutexes); } catch (...) { // Capture exception for the main thread to handle std::lock_guard lock(exception_mutex); thread_exceptions[i] = std::current_exception(); } }); } // Wait for all threads to complete for (auto& t : threads) { if (t.joinable()) t.join(); } // Check for exceptions from threads for (size_t i = 0; i < num_threads; ++i) { if (thread_exceptions[i]) { // Cleanup temporary files before rethrowing cleanup_temp_files(contexts); std::rethrow_exception(thread_exceptions[i]); } } // Merge temporary files and create final output size_t total_factors = merge_temp_files(output_path, contexts); // Cleanup temporary files cleanup_temp_files(contexts); return total_factors; } void ParallelFactorizer::factorize_thread(const cst_t& cst, const sdsl::rmq_succinct_sct<>& rmq, ThreadContext& ctx, std::vector& all_contexts, std::vector& file_mutexes) { // Initialize factorization at our starting position auto lambda = cst.select_leaf(cst.csa.isa[ctx.start_pos] + 1); size_t lambda_node_depth = cst.node_depth(lambda); size_t lambda_sufnum = ctx.start_pos; // Create or truncate output file { std::lock_guard lock(file_mutexes[ctx.thread_id]); std::ofstream ofs(ctx.temp_file_path, std::ios::binary | std::ios::trunc); } // Track the next thread for convergence checking ThreadContext* next_ctx = nullptr; if (!ctx.is_last_thread && ctx.thread_id + 1 < all_contexts.size()) { next_ctx = &all_contexts[ctx.thread_id + 1]; } // Main factorization loop while (lambda_sufnum < ctx.text_length) { // Compute current factor size_t d = 1; size_t l = 1; cst_t::node_type v; size_t v_min_leaf_sufnum = 0; size_t u_min_leaf_sufnum = 0; Factor current_factor; // Factor computation logic (similar to original nolzss) while (true) { v = cst.bp_support.level_anc(lambda, lambda_node_depth - d); v_min_leaf_sufnum = cst.csa[rmq(cst.lb(v), cst.rb(v))]; l = cst.depth(v); if (v_min_leaf_sufnum + l - 1 < lambda_sufnum) { u_min_leaf_sufnum = v_min_leaf_sufnum; ++d; continue; } auto u = cst.parent(v); auto u_depth = cst.depth(u); if (v_min_leaf_sufnum == lambda_sufnum) { if (u == cst.root()) { l = 1; current_factor = Factor{static_cast(lambda_sufnum), static_cast(l), static_cast(lambda_sufnum)}; } else { l = u_depth; current_factor = Factor{static_cast(lambda_sufnum), static_cast(l), static_cast(u_min_leaf_sufnum)}; } } else { l = std::min(lcp(cst, lambda_sufnum, v_min_leaf_sufnum), (lambda_sufnum - v_min_leaf_sufnum)); if (l <= u_depth) { l = u_depth; current_factor = Factor{static_cast(lambda_sufnum), static_cast(l), static_cast(u_min_leaf_sufnum)}; } else { current_factor = Factor{static_cast(lambda_sufnum), static_cast(l), static_cast(v_min_leaf_sufnum)}; } } break; } // Write the factor to temporary file write_factor(current_factor, ctx.temp_file_path, file_mutexes[ctx.thread_id]); // Check for convergence when factor extends into next thread's region // Use lambda_sufnum + l (exclusive end) to check against next thread's start if (next_ctx && lambda_sufnum + l >= next_ctx->start_pos) { size_t current_end = lambda_sufnum + l; // Exclusive end position if (check_convergence(current_end, *next_ctx, file_mutexes[next_ctx->thread_id])) { break; // Convergence detected } } // Advance to next position lambda = next_leaf(cst, lambda, l); lambda_node_depth = cst.node_depth(lambda); lambda_sufnum = cst.sn(lambda); } } void ParallelFactorizer::factorize_dna_w_rc_thread(const cst_t& cst, const sdsl::rmq_succinct_sct<>& rmqF, const sdsl::rmq_succinct_sct<>& rmqRcEnd, const sdsl::int_vector<64>& fwd_starts, const sdsl::int_vector<64>& rc_ends, uint64_t INF, size_t N, ThreadContext& ctx, std::vector& all_contexts, std::vector& file_mutexes) { // Initialize to the leaf of suffix starting at position ctx.start_pos auto lambda = cst.select_leaf(cst.csa.isa[ctx.start_pos] + 1); size_t lambda_node_depth = cst.node_depth(lambda); size_t i = cst.sn(lambda); // Current position in text // Create or truncate output file { std::lock_guard lock(file_mutexes[ctx.thread_id]); std::ofstream ofs(ctx.temp_file_path, std::ios::binary | std::ios::trunc); } // Track the next thread for convergence checking ThreadContext* next_ctx = nullptr; if (!ctx.is_last_thread && ctx.thread_id + 1 < all_contexts.size()) { next_ctx = &all_contexts[ctx.thread_id + 1]; } // Main factorization loop - only process the original sequence (0 to N) while (i < N) { // At factor start i (0-based in T), walk up ancestors and track best FWD and RC candidates independently // Track best FWD candidate bool have_fwd = false; size_t best_fwd_start = 0; // start in T (for FWD) size_t best_fwd_depth = 0; // tree depth of best FWD candidate // Track best RC candidate bool have_rc = false; size_t best_rc_end = 0; // end in T (for RC) size_t best_rc_posS = 0; // pos in S where RC candidate suffix starts (for LCP) size_t best_rc_depth = 0; // tree depth of best RC candidate // Walk from leaf to root via level_anc for (size_t step = 1; step <= lambda_node_depth; ++step) { auto v = cst.bp_support.level_anc(lambda, lambda_node_depth - step); size_t ell = cst.depth(v); if (ell == 0) break; // reached root auto lb = cst.lb(v), rb = cst.rb(v); // Forward candidate (min start in T within v's interval) size_t kF = rmqF(lb, rb); uint64_t jF = fwd_starts[kF]; bool okF = (jF != INF) && (jF + ell - 1 < i); // non-overlap: endF <= i-1 // RC candidate (min END in T within v's interval; monotone with depth) size_t kR = rmqRcEnd(lb, rb); uint64_t endRC = rc_ends[kR]; bool okR = (endRC != INF) && (endRC < i); // endRC <= i-1 if (!okF && !okR) { // deeper nodes can only increase jF and the minimal RC end // -> non-overlap won't become true again for either; stop break; } // Update best FWD candidate independently if (okF) { if (ell > best_fwd_depth || (ell == best_fwd_depth && (jF + ell - 1) < (best_fwd_start + best_fwd_depth - 1))) { have_fwd = true; best_fwd_start = jF; best_fwd_depth = ell; } } // Update best RC candidate independently if (okR) { size_t posS_R = cst.csa[kR]; // suffix position in S for LCP if (ell > best_rc_depth || (ell == best_rc_depth && (endRC < best_rc_end))) { have_rc = true; best_rc_end = endRC; best_rc_posS = posS_R; best_rc_depth = ell; } } } size_t emit_len = 1; uint64_t emit_ref = i; // default for literal if (!have_fwd && !have_rc) { // No previous occurrence (FWD nor RC) — literal of length 1 Factor f{static_cast(i), static_cast(emit_len), static_cast(emit_ref)}; write_factor(f, ctx.temp_file_path, file_mutexes[ctx.thread_id]); } else { // Compute true lengths for both candidates size_t fwd_true_len = 0; size_t rc_true_len = 0; if (have_fwd) { size_t cap = i - best_fwd_start; // non-overlap cap size_t L = lcp(cst, i, best_fwd_start); fwd_true_len = std::min(L, cap); } if (have_rc) { rc_true_len = lcp(cst, i, best_rc_posS); } // Choose based on TRUE length, with FWD preference at equal length. // A literal (self-reference, length 1) is always available as a fallback. // Only use RC if it provides strictly longer match than both FWD and literal. bool use_fwd = false; bool use_literal = false; if (have_fwd && fwd_true_len >= 1) { // Have a real forward match if (have_rc && rc_true_len > fwd_true_len) { use_fwd = false; // RC is strictly longer } else { use_fwd = true; // FWD wins (including ties) } } else { // No real forward match; compare RC against literal (length 1) if (have_rc && rc_true_len > 1) { use_fwd = false; // RC is strictly longer than literal } else { use_literal = true; // Use literal (self-reference) } } if (use_literal) { // Emit literal emit_len = 1; emit_ref = static_cast(i); } else if (use_fwd) { emit_len = fwd_true_len; emit_ref = static_cast(best_fwd_start); } else { emit_len = rc_true_len; size_t start_pos_val = best_rc_end - emit_len + 1; // start = end - len + 1 emit_ref = RC_MASK | static_cast(start_pos_val); // start-anchored + RC flag } // Safety: ensure progress if (emit_len == 0) { throw std::runtime_error("emit_len must be positive to ensure factorization progress"); } Factor f{static_cast(i), static_cast(emit_len), emit_ref}; write_factor(f, ctx.temp_file_path, file_mutexes[ctx.thread_id]); } // Check for convergence if (next_ctx && i + emit_len >= next_ctx->start_pos) { size_t current_end = i + emit_len; if (check_convergence(current_end, *next_ctx, file_mutexes[next_ctx->thread_id])) { break; // Convergence detected } } // Advance to next position lambda = next_leaf(cst, lambda, emit_len); lambda_node_depth = cst.node_depth(lambda); i = cst.sn(lambda); } } std::optional ParallelFactorizer::read_factor_at_index(const std::string& file_path, size_t factor_index) { std::ifstream ifs(file_path, std::ios::binary); if (!ifs) { return std::nullopt; } // Seek to the position of the requested factor ifs.seekg(factor_index * sizeof(Factor), std::ios::beg); if (!ifs) { return std::nullopt; // Index out of bounds } // Read the factor Factor factor; ifs.read(reinterpret_cast(&factor), sizeof(Factor)); if (!ifs || ifs.gcount() != sizeof(Factor)) { return std::nullopt; // Failed to read complete factor } return factor; } bool ParallelFactorizer::check_convergence(size_t current_end, ThreadContext& next_ctx, std::mutex& next_file_mutex) { // If we have a cached factor, check it first if (next_ctx.last_read_factor.has_value()) { const auto& cached_factor = next_ctx.last_read_factor.value(); size_t cached_end = cached_factor.start + cached_factor.length; if (cached_end > current_end) { // The cached factor ends beyond our current position, // so we haven't reached convergence yet return false; } if (cached_factor.start == current_end) { // Found convergence: next thread's factor starts exactly where we end return true; } // cached_end <= current_end but cached_factor.start < current_end // Need to read the next factor } // Read factors one at a time until we find convergence or go past current_end while (true) { std::lock_guard lock(next_file_mutex); auto factor_opt = read_factor_at_index(next_ctx.temp_file_path, next_ctx.next_thread_factor_index); if (!factor_opt.has_value()) { // No more factors in next thread's file - convergence not found yet return false; } Factor factor = factor_opt.value(); next_ctx.last_read_factor = factor; next_ctx.next_thread_factor_index++; if (factor.start == current_end) { // Found convergence return true; } if (factor.start > current_end) { // Next thread's factor starts beyond our current position // We haven't reached convergence yet return false; } // factor.start < current_end, keep reading next factors } } void ParallelFactorizer::write_factor(const Factor& factor, const std::string& file_path, std::mutex& file_mutex) { std::lock_guard lock(file_mutex); std::ofstream ofs(file_path, std::ios::binary | std::ios::app); if (!ofs) { throw std::runtime_error("Cannot open temporary file for writing: " + file_path); } ofs.write(reinterpret_cast(&factor), sizeof(Factor)); } std::optional ParallelFactorizer::read_factor_at(const std::string& file_path, size_t index, std::mutex& file_mutex) { std::lock_guard lock(file_mutex); std::ifstream ifs(file_path, std::ios::binary); if (!ifs) { return std::nullopt; } // Seek to the position of the requested factor ifs.seekg(index * sizeof(Factor)); Factor factor; if (ifs.read(reinterpret_cast(&factor), sizeof(Factor))) { return factor; } return std::nullopt; } size_t ParallelFactorizer::merge_temp_files(const std::string& output_path, std::vector& contexts) { // Thread 0 already wrote to output_path, so we open in append mode // to add factors from other threads size_t total_factors = 0; uint64_t total_length = 0; size_t current_position = 0; // Track the current end position in the merged output size_t text_length = contexts.empty() ? 0 : contexts[0].text_length; std::optional last_written_factor; // First, read thread 0's factors to find the last one and count them { std::ifstream ifs(output_path, std::ios::binary); if (!ifs) { throw std::runtime_error("Cannot open output file for reading: " + output_path); } Factor factor; while (ifs.read(reinterpret_cast(&factor), sizeof(Factor))) { total_factors++; total_length += factor.length; last_written_factor = factor; size_t factor_end = factor.start + factor.length; if (factor_end > current_position) { current_position = factor_end; } } } // Process remaining threads if there are multiple threads and sequence isn't complete if (contexts.size() > 1 && current_position < text_length) { // Open output file in append mode for adding factors from other threads std::ofstream ofs(output_path, std::ios::binary | std::ios::app); if (!ofs) { throw std::runtime_error("Cannot open output file for appending: " + output_path); } // Process remaining threads (i >= 1) for (size_t i = 1; i < contexts.size(); i++) { // Check if we've already covered the entire sequence if (current_position >= text_length) { break; // Stop merging - sequence is complete } std::ifstream ifs(contexts[i].temp_file_path, std::ios::binary); if (!ifs) continue; Factor factor; bool found_convergence = false; // For subsequent threads, skip factors until we find convergence point // Convergence: last_written_factor.end == current_factor.start if (!last_written_factor.has_value()) { // No last written factor - shouldn't happen, but handle gracefully continue; } size_t last_end = last_written_factor->start + last_written_factor->length; // Read factors and look for convergence while (ifs.read(reinterpret_cast(&factor), sizeof(Factor))) { if (!found_convergence) { // Still looking for convergence point if (factor.start == last_end) { // Found convergence! Start copying from this factor onwards found_convergence = true; } else { // Skip this factor - it's before convergence continue; } } // Write factor to output (either we found convergence or we're continuing) ofs.write(reinterpret_cast(&factor), sizeof(Factor)); total_factors++; total_length += factor.length; last_written_factor = factor; size_t factor_end = factor.start + factor.length; if (factor_end > current_position) { current_position = factor_end; } last_end = factor_end; // Update for next iteration // Stop if we've covered the entire sequence if (current_position >= text_length) { break; } } // If we've covered the entire sequence, stop processing more threads if (current_position >= text_length) { break; } } } // Write footer at the end (v2 format) - single location for all cases std::ofstream ofs(output_path, std::ios::binary | std::ios::app); if (!ofs) { throw std::runtime_error("Cannot open output file for appending footer: " + output_path); } FactorFileFooter footer; footer.num_factors = total_factors; footer.num_sequences = 0; // Unknown for general (non-FASTA) factorization footer.num_sentinels = 0; // No sentinels for general factorization footer.footer_size = sizeof(FactorFileFooter); footer.total_length = total_length; ofs.write(reinterpret_cast(&footer), sizeof(footer)); return total_factors; } void ParallelFactorizer::cleanup_temp_files(const std::vector& contexts) { for (const auto& ctx : contexts) { // Skip thread 0 - it wrote directly to the output file if (ctx.thread_id == 0) continue; try { fs::remove(ctx.temp_file_path); } catch (const std::exception& e) { std::cerr << "Warning: Failed to remove temporary file: " << ctx.temp_file_path << " - " << e.what() << std::endl; } } } size_t ParallelFactorizer::parallel_factorize_file(const std::string& input_path, const std::string& output_path, size_t num_threads, size_t start_pos) { // Read the file content std::ifstream is(input_path, std::ios::binary); if (!is) { throw std::runtime_error("Cannot open input file: " + input_path); } std::string data((std::istreambuf_iterator(is)), std::istreambuf_iterator()); return parallel_factorize(data, output_path, num_threads, start_pos); } size_t ParallelFactorizer::parallel_factorize_multiple_dna_w_rc(const std::string& prepared_string, size_t original_length, const std::string& output_path, size_t num_threads, size_t start_pos) { if (prepared_string.empty() || original_length == 0) return 0; const std::string& S = prepared_string; // Validate minimum string length to avoid underflow and SDSL errors // Format for w/ RC: T1!T2@...Tn$rt(Tn)%...rt(T1)& // Minimum valid input: "A$A&" (4 chars: sequence + sentinel + reverse_complement + sentinel) // - Position 0: Single nucleotide (e.g., 'A') // - Position 1: Sentinel separating original from RC (e.g., '$') // - Position 2: Reverse complement of the nucleotide (e.g., 'A' is revcomp of 'T') // - Position 3: Final sentinel (e.g., '&') if (S.size() < 4) { std::cerr << "Warning: Input string too short for factorization with reverse complement (size=" << S.size() << "). Returning 0 factors." << std::endl; return 0; } const size_t N = (S.size() / 2) - 1; if (N == 0) { std::cerr << "Warning: Computed N=0 from input size=" << S.size() << ". Returning 0 factors." << std::endl; return 0; } // Validate start_pos if (start_pos >= N) { throw std::invalid_argument("start_pos must be less than the original sequence length"); } // For DNA w/ RC, we can only parallelize the original sequence part (start_pos to N) // The algorithm needs the full prepared string with RC for suffix tree // Determine optimal thread count if not specified if (num_threads == 0) { size_t remaining_length = N - start_pos; size_t max_useful_threads = remaining_length / MIN_CHARS_PER_THREAD; size_t hardware_threads = std::thread::hardware_concurrency(); num_threads = std::min(hardware_threads, max_useful_threads); num_threads = std::max(1UL, num_threads); } // Build CST over the prepared string (T + sentinel + RC(T) + sentinel) cst_t cst; construct_im(cst, S, 1); // Build RMQ structures for DNA w/ RC algorithm const uint64_t INF = std::numeric_limits::max() / 2ULL; sdsl::int_vector<64> fwd_starts(cst.csa.size(), INF); sdsl::int_vector<64> rc_ends(cst.csa.size(), INF); const size_t T_end = N; // end of original (exclusive) const size_t R_beg = N + 1; // first char of rc (skip sentinel at position N) const size_t R_end = S.size() - 1; // exclude final sentinel for (size_t k = 0; k < cst.csa.size(); ++k) { size_t posS = cst.csa[k]; if (posS < T_end) { // suffix starts in T fwd_starts[k] = posS; // 0-based start in T } else if (posS >= R_beg && posS < R_end) { // suffix starts in R size_t jR0 = posS - R_beg; // 0-based start in R size_t endT0 = N - jR0 - 1; // mapped end in T (0-based) rc_ends[k] = endT0; } } sdsl::rmq_succinct_sct<> rmqF(&fwd_starts); sdsl::rmq_succinct_sct<> rmqRcEnd(&rc_ends); // Create contexts for each thread - divide only from start_pos to N std::vector contexts(num_threads); const size_t remaining_length = N - start_pos; const size_t chunk_size = remaining_length / num_threads; for (size_t i = 0; i < num_threads; ++i) { contexts[i].thread_id = i; contexts[i].start_pos = start_pos + (i * chunk_size); contexts[i].end_pos = (i + 1 < num_threads) ? (start_pos + (i + 1) * chunk_size) : N; contexts[i].text_length = N; // Only the original sequence length contexts[i].temp_file_path = (i == 0) ? output_path : create_temp_file_path(i); contexts[i].is_last_thread = (i == num_threads - 1); } // Create mutexes for file access std::vector file_mutexes(num_threads); // Track exceptions from threads std::vector thread_exceptions(num_threads, nullptr); std::mutex exception_mutex; // Lambda to pass RMQ structures to threads with exception handling auto factorize_dna_thread = [&](ThreadContext& ctx) { try { factorize_dna_w_rc_thread(cst, rmqF, rmqRcEnd, fwd_starts, rc_ends, INF, N, ctx, contexts, file_mutexes); } catch (...) { // Capture exception for the main thread to handle std::lock_guard lock(exception_mutex); thread_exceptions[ctx.thread_id] = std::current_exception(); } }; // Create and start worker threads std::vector threads; for (size_t i = 0; i < num_threads; ++i) { threads.emplace_back(factorize_dna_thread, std::ref(contexts[i])); } // Wait for all threads to complete for (auto& t : threads) { if (t.joinable()) t.join(); } // Check for exceptions from threads for (size_t i = 0; i < num_threads; ++i) { if (thread_exceptions[i]) { // Cleanup temporary files before rethrowing cleanup_temp_files(contexts); std::rethrow_exception(thread_exceptions[i]); } } // Merge temporary files and create final output size_t total_factors = merge_temp_files(output_path, contexts); // Cleanup temporary files cleanup_temp_files(contexts); return total_factors; } size_t ParallelFactorizer::parallel_factorize_dna_w_rc(std::string_view text, const std::string& output_path, size_t num_threads, size_t start_pos) { if (text.empty()) return 0; // Prepare the DNA sequence with reverse complement std::string text_str(text); PreparedSequenceResult prep_result = prepare_multiple_dna_sequences_w_rc({text_str}); // Call the core function with the prepared string return parallel_factorize_multiple_dna_w_rc(prep_result.prepared_string, prep_result.original_length, output_path, num_threads, start_pos); } size_t ParallelFactorizer::parallel_factorize_file_dna_w_rc(const std::string& input_path, const std::string& output_path, size_t num_threads) { std::ifstream is(input_path, std::ios::binary); if (!is) { throw std::runtime_error("Cannot open input file: " + input_path); } std::string data((std::istreambuf_iterator(is)), std::istreambuf_iterator()); return parallel_factorize_dna_w_rc(data, output_path, num_threads); } } // namespace noLZSS