Program Listing for File parallel_factorizer.cpp
↰ Return to documentation for file (src/cpp/parallel_factorizer.cpp)
#include "parallel_factorizer.hpp"
#include "factorizer_helpers.hpp"
#include <algorithm>
#include <iostream>
#include <string>
#include <chrono>
namespace fs = std::filesystem;
namespace noLZSS {
// Helper functions lcp() and next_leaf() are now in factorizer_helpers.hpp
std::string ParallelFactorizer::create_temp_file_path(size_t thread_id) {
auto timestamp = std::chrono::system_clock::now().time_since_epoch().count();
std::string temp_dir = fs::temp_directory_path().string();
return temp_dir + "/noLZSS_temp_" + std::to_string(timestamp) + "_" +
std::to_string(thread_id) + ".bin";
}
size_t ParallelFactorizer::parallel_factorize(std::string_view text, const std::string& output_path,
size_t num_threads, size_t start_pos) {
if (text.empty()) return 0;
// Ensure start_pos is within bounds
if (start_pos >= text.length()) {
throw std::invalid_argument("start_pos must be less than text length");
}
// Determine optimal thread count if not specified
if (num_threads == 0) {
// Calculate maximum useful threads based on remaining text size
size_t remaining_length = text.length() - start_pos;
size_t max_useful_threads = remaining_length / MIN_CHARS_PER_THREAD;
// Use available hardware threads, but don't exceed what's useful for this input size
size_t hardware_threads = std::thread::hardware_concurrency();
num_threads = std::min(hardware_threads, max_useful_threads);
// Ensure at least one thread
num_threads = std::max(1UL, num_threads);
}
// Create suffix tree for all threads to use
std::string tmp(text);
cst_t cst;
construct_im(cst, tmp, 1);
// Create RMQ support once for all threads
sdsl::rmq_succinct_sct<> rmq(&cst.csa);
// Create contexts for each thread
std::vector<ThreadContext> contexts(num_threads);
// Divide work among threads, starting from start_pos
const size_t remaining_length = text.length() - start_pos;
const size_t chunk_size = remaining_length / num_threads;
for (size_t i = 0; i < num_threads; ++i) {
contexts[i].thread_id = i;
contexts[i].start_pos = start_pos + (i * chunk_size);
contexts[i].end_pos = (i + 1 < num_threads) ? (start_pos + (i + 1) * chunk_size) : text.length();
contexts[i].text_length = text.length();
// Thread 0 writes directly to output file; others use temp files
contexts[i].temp_file_path = (i == 0) ? output_path : create_temp_file_path(i);
contexts[i].is_last_thread = (i == num_threads - 1);
}
// Create mutexes for file access
std::vector<std::mutex> file_mutexes(num_threads);
// Track exceptions from threads
std::vector<std::exception_ptr> thread_exceptions(num_threads, nullptr);
std::mutex exception_mutex;
// Create and start worker threads with exception handling
std::vector<std::thread> threads;
for (size_t i = 0; i < num_threads; ++i) {
threads.emplace_back([this, &cst, &rmq, &contexts, &file_mutexes, &thread_exceptions, &exception_mutex, i]() {
try {
factorize_thread(cst, rmq, contexts[i], contexts, file_mutexes);
} catch (...) {
// Capture exception for the main thread to handle
std::lock_guard<std::mutex> lock(exception_mutex);
thread_exceptions[i] = std::current_exception();
}
});
}
// Wait for all threads to complete
for (auto& t : threads) {
if (t.joinable()) t.join();
}
// Check for exceptions from threads
for (size_t i = 0; i < num_threads; ++i) {
if (thread_exceptions[i]) {
// Cleanup temporary files before rethrowing
cleanup_temp_files(contexts);
std::rethrow_exception(thread_exceptions[i]);
}
}
// Merge temporary files and create final output
size_t total_factors = merge_temp_files(output_path, contexts);
// Cleanup temporary files
cleanup_temp_files(contexts);
return total_factors;
}
void ParallelFactorizer::factorize_thread(const cst_t& cst, const sdsl::rmq_succinct_sct<>& rmq,
ThreadContext& ctx,
std::vector<ThreadContext>& all_contexts,
std::vector<std::mutex>& file_mutexes) {
// Initialize factorization at our starting position
auto lambda = cst.select_leaf(cst.csa.isa[ctx.start_pos] + 1);
size_t lambda_node_depth = cst.node_depth(lambda);
size_t lambda_sufnum = ctx.start_pos;
// Create or truncate output file
{
std::lock_guard<std::mutex> lock(file_mutexes[ctx.thread_id]);
std::ofstream ofs(ctx.temp_file_path, std::ios::binary | std::ios::trunc);
}
// Track the next thread for convergence checking
ThreadContext* next_ctx = nullptr;
if (!ctx.is_last_thread && ctx.thread_id + 1 < all_contexts.size()) {
next_ctx = &all_contexts[ctx.thread_id + 1];
}
// Main factorization loop
while (lambda_sufnum < ctx.text_length) {
// Compute current factor
size_t d = 1;
size_t l = 1;
cst_t::node_type v;
size_t v_min_leaf_sufnum = 0;
size_t u_min_leaf_sufnum = 0;
Factor current_factor;
// Factor computation logic (similar to original nolzss)
while (true) {
v = cst.bp_support.level_anc(lambda, lambda_node_depth - d);
v_min_leaf_sufnum = cst.csa[rmq(cst.lb(v), cst.rb(v))];
l = cst.depth(v);
if (v_min_leaf_sufnum + l - 1 < lambda_sufnum) {
u_min_leaf_sufnum = v_min_leaf_sufnum;
++d;
continue;
}
auto u = cst.parent(v);
auto u_depth = cst.depth(u);
if (v_min_leaf_sufnum == lambda_sufnum) {
if (u == cst.root()) {
l = 1;
current_factor = Factor{static_cast<uint64_t>(lambda_sufnum),
static_cast<uint64_t>(l),
static_cast<uint64_t>(lambda_sufnum)};
} else {
l = u_depth;
current_factor = Factor{static_cast<uint64_t>(lambda_sufnum),
static_cast<uint64_t>(l),
static_cast<uint64_t>(u_min_leaf_sufnum)};
}
} else {
l = std::min(lcp(cst, lambda_sufnum, v_min_leaf_sufnum),
(lambda_sufnum - v_min_leaf_sufnum));
if (l <= u_depth) {
l = u_depth;
current_factor = Factor{static_cast<uint64_t>(lambda_sufnum),
static_cast<uint64_t>(l),
static_cast<uint64_t>(u_min_leaf_sufnum)};
} else {
current_factor = Factor{static_cast<uint64_t>(lambda_sufnum),
static_cast<uint64_t>(l),
static_cast<uint64_t>(v_min_leaf_sufnum)};
}
}
break;
}
// Write the factor to temporary file
write_factor(current_factor, ctx.temp_file_path, file_mutexes[ctx.thread_id]);
// Check for convergence when factor extends into next thread's region
// Use lambda_sufnum + l (exclusive end) to check against next thread's start
if (next_ctx && lambda_sufnum + l >= next_ctx->start_pos) {
size_t current_end = lambda_sufnum + l; // Exclusive end position
if (check_convergence(current_end, *next_ctx, file_mutexes[next_ctx->thread_id])) {
break; // Convergence detected
}
}
// Advance to next position
lambda = next_leaf(cst, lambda, l);
lambda_node_depth = cst.node_depth(lambda);
lambda_sufnum = cst.sn(lambda);
}
}
void ParallelFactorizer::factorize_dna_w_rc_thread(const cst_t& cst,
const sdsl::rmq_succinct_sct<>& rmqF,
const sdsl::rmq_succinct_sct<>& rmqRcEnd,
const sdsl::int_vector<64>& fwd_starts,
const sdsl::int_vector<64>& rc_ends,
uint64_t INF,
size_t N,
ThreadContext& ctx,
std::vector<ThreadContext>& all_contexts,
std::vector<std::mutex>& file_mutexes) {
// Initialize to the leaf of suffix starting at position ctx.start_pos
auto lambda = cst.select_leaf(cst.csa.isa[ctx.start_pos] + 1);
size_t lambda_node_depth = cst.node_depth(lambda);
size_t i = cst.sn(lambda); // Current position in text
// Create or truncate output file
{
std::lock_guard<std::mutex> lock(file_mutexes[ctx.thread_id]);
std::ofstream ofs(ctx.temp_file_path, std::ios::binary | std::ios::trunc);
}
// Track the next thread for convergence checking
ThreadContext* next_ctx = nullptr;
if (!ctx.is_last_thread && ctx.thread_id + 1 < all_contexts.size()) {
next_ctx = &all_contexts[ctx.thread_id + 1];
}
// Main factorization loop - only process the original sequence (0 to N)
while (i < N) {
// At factor start i (0-based in T), walk up ancestors and track best FWD and RC candidates independently
// Track best FWD candidate
bool have_fwd = false;
size_t best_fwd_start = 0; // start in T (for FWD)
size_t best_fwd_depth = 0; // tree depth of best FWD candidate
// Track best RC candidate
bool have_rc = false;
size_t best_rc_end = 0; // end in T (for RC)
size_t best_rc_posS = 0; // pos in S where RC candidate suffix starts (for LCP)
size_t best_rc_depth = 0; // tree depth of best RC candidate
// Walk from leaf to root via level_anc
for (size_t step = 1; step <= lambda_node_depth; ++step) {
auto v = cst.bp_support.level_anc(lambda, lambda_node_depth - step);
size_t ell = cst.depth(v);
if (ell == 0) break; // reached root
auto lb = cst.lb(v), rb = cst.rb(v);
// Forward candidate (min start in T within v's interval)
size_t kF = rmqF(lb, rb);
uint64_t jF = fwd_starts[kF];
bool okF = (jF != INF) && (jF + ell - 1 < i); // non-overlap: endF <= i-1
// RC candidate (min END in T within v's interval; monotone with depth)
size_t kR = rmqRcEnd(lb, rb);
uint64_t endRC = rc_ends[kR];
bool okR = (endRC != INF) && (endRC < i); // endRC <= i-1
if (!okF && !okR) {
// deeper nodes can only increase jF and the minimal RC end
// -> non-overlap won't become true again for either; stop
break;
}
// Update best FWD candidate independently
if (okF) {
if (ell > best_fwd_depth ||
(ell == best_fwd_depth && (jF + ell - 1) < (best_fwd_start + best_fwd_depth - 1))) {
have_fwd = true;
best_fwd_start = jF;
best_fwd_depth = ell;
}
}
// Update best RC candidate independently
if (okR) {
size_t posS_R = cst.csa[kR]; // suffix position in S for LCP
if (ell > best_rc_depth ||
(ell == best_rc_depth && (endRC < best_rc_end))) {
have_rc = true;
best_rc_end = endRC;
best_rc_posS = posS_R;
best_rc_depth = ell;
}
}
}
size_t emit_len = 1;
uint64_t emit_ref = i; // default for literal
if (!have_fwd && !have_rc) {
// No previous occurrence (FWD nor RC) — literal of length 1
Factor f{static_cast<uint64_t>(i), static_cast<uint64_t>(emit_len), static_cast<uint64_t>(emit_ref)};
write_factor(f, ctx.temp_file_path, file_mutexes[ctx.thread_id]);
} else {
// Compute true lengths for both candidates
size_t fwd_true_len = 0;
size_t rc_true_len = 0;
if (have_fwd) {
size_t cap = i - best_fwd_start; // non-overlap cap
size_t L = lcp(cst, i, best_fwd_start);
fwd_true_len = std::min(L, cap);
}
if (have_rc) {
rc_true_len = lcp(cst, i, best_rc_posS);
}
// Choose based on TRUE length, with FWD preference at equal length.
// A literal (self-reference, length 1) is always available as a fallback.
// Only use RC if it provides strictly longer match than both FWD and literal.
bool use_fwd = false;
bool use_literal = false;
if (have_fwd && fwd_true_len >= 1) {
// Have a real forward match
if (have_rc && rc_true_len > fwd_true_len) {
use_fwd = false; // RC is strictly longer
} else {
use_fwd = true; // FWD wins (including ties)
}
} else {
// No real forward match; compare RC against literal (length 1)
if (have_rc && rc_true_len > 1) {
use_fwd = false; // RC is strictly longer than literal
} else {
use_literal = true; // Use literal (self-reference)
}
}
if (use_literal) {
// Emit literal
emit_len = 1;
emit_ref = static_cast<uint64_t>(i);
} else if (use_fwd) {
emit_len = fwd_true_len;
emit_ref = static_cast<uint64_t>(best_fwd_start);
} else {
emit_len = rc_true_len;
size_t start_pos_val = best_rc_end - emit_len + 1; // start = end - len + 1
emit_ref = RC_MASK | static_cast<uint64_t>(start_pos_val); // start-anchored + RC flag
}
// Safety: ensure progress
if (emit_len == 0) {
throw std::runtime_error("emit_len must be positive to ensure factorization progress");
}
Factor f{static_cast<uint64_t>(i), static_cast<uint64_t>(emit_len), emit_ref};
write_factor(f, ctx.temp_file_path, file_mutexes[ctx.thread_id]);
}
// Check for convergence
if (next_ctx && i + emit_len >= next_ctx->start_pos) {
size_t current_end = i + emit_len;
if (check_convergence(current_end, *next_ctx, file_mutexes[next_ctx->thread_id])) {
break; // Convergence detected
}
}
// Advance to next position
lambda = next_leaf(cst, lambda, emit_len);
lambda_node_depth = cst.node_depth(lambda);
i = cst.sn(lambda);
}
}
std::optional<Factor> ParallelFactorizer::read_factor_at_index(const std::string& file_path,
size_t factor_index) {
std::ifstream ifs(file_path, std::ios::binary);
if (!ifs) {
return std::nullopt;
}
// Seek to the position of the requested factor
ifs.seekg(factor_index * sizeof(Factor), std::ios::beg);
if (!ifs) {
return std::nullopt; // Index out of bounds
}
// Read the factor
Factor factor;
ifs.read(reinterpret_cast<char*>(&factor), sizeof(Factor));
if (!ifs || ifs.gcount() != sizeof(Factor)) {
return std::nullopt; // Failed to read complete factor
}
return factor;
}
bool ParallelFactorizer::check_convergence(size_t current_end, ThreadContext& next_ctx,
std::mutex& next_file_mutex) {
// If we have a cached factor, check it first
if (next_ctx.last_read_factor.has_value()) {
const auto& cached_factor = next_ctx.last_read_factor.value();
size_t cached_end = cached_factor.start + cached_factor.length;
if (cached_end > current_end) {
// The cached factor ends beyond our current position,
// so we haven't reached convergence yet
return false;
}
if (cached_factor.start == current_end) {
// Found convergence: next thread's factor starts exactly where we end
return true;
}
// cached_end <= current_end but cached_factor.start < current_end
// Need to read the next factor
}
// Read factors one at a time until we find convergence or go past current_end
while (true) {
std::lock_guard<std::mutex> lock(next_file_mutex);
auto factor_opt = read_factor_at_index(next_ctx.temp_file_path,
next_ctx.next_thread_factor_index);
if (!factor_opt.has_value()) {
// No more factors in next thread's file - convergence not found yet
return false;
}
Factor factor = factor_opt.value();
next_ctx.last_read_factor = factor;
next_ctx.next_thread_factor_index++;
if (factor.start == current_end) {
// Found convergence
return true;
}
if (factor.start > current_end) {
// Next thread's factor starts beyond our current position
// We haven't reached convergence yet
return false;
}
// factor.start < current_end, keep reading next factors
}
}
void ParallelFactorizer::write_factor(const Factor& factor, const std::string& file_path,
std::mutex& file_mutex) {
std::lock_guard<std::mutex> lock(file_mutex);
std::ofstream ofs(file_path, std::ios::binary | std::ios::app);
if (!ofs) {
throw std::runtime_error("Cannot open temporary file for writing: " + file_path);
}
ofs.write(reinterpret_cast<const char*>(&factor), sizeof(Factor));
}
std::optional<Factor> ParallelFactorizer::read_factor_at(const std::string& file_path,
size_t index,
std::mutex& file_mutex) {
std::lock_guard<std::mutex> lock(file_mutex);
std::ifstream ifs(file_path, std::ios::binary);
if (!ifs) {
return std::nullopt;
}
// Seek to the position of the requested factor
ifs.seekg(index * sizeof(Factor));
Factor factor;
if (ifs.read(reinterpret_cast<char*>(&factor), sizeof(Factor))) {
return factor;
}
return std::nullopt;
}
size_t ParallelFactorizer::merge_temp_files(const std::string& output_path,
std::vector<ThreadContext>& contexts) {
// Thread 0 already wrote to output_path, so we open in append mode
// to add factors from other threads
size_t total_factors = 0;
uint64_t total_length = 0;
size_t current_position = 0; // Track the current end position in the merged output
size_t text_length = contexts.empty() ? 0 : contexts[0].text_length;
std::optional<Factor> last_written_factor;
// First, read thread 0's factors to find the last one and count them
{
std::ifstream ifs(output_path, std::ios::binary);
if (!ifs) {
throw std::runtime_error("Cannot open output file for reading: " + output_path);
}
Factor factor;
while (ifs.read(reinterpret_cast<char*>(&factor), sizeof(Factor))) {
total_factors++;
total_length += factor.length;
last_written_factor = factor;
size_t factor_end = factor.start + factor.length;
if (factor_end > current_position) {
current_position = factor_end;
}
}
}
// Process remaining threads if there are multiple threads and sequence isn't complete
if (contexts.size() > 1 && current_position < text_length) {
// Open output file in append mode for adding factors from other threads
std::ofstream ofs(output_path, std::ios::binary | std::ios::app);
if (!ofs) {
throw std::runtime_error("Cannot open output file for appending: " + output_path);
}
// Process remaining threads (i >= 1)
for (size_t i = 1; i < contexts.size(); i++) {
// Check if we've already covered the entire sequence
if (current_position >= text_length) {
break; // Stop merging - sequence is complete
}
std::ifstream ifs(contexts[i].temp_file_path, std::ios::binary);
if (!ifs) continue;
Factor factor;
bool found_convergence = false;
// For subsequent threads, skip factors until we find convergence point
// Convergence: last_written_factor.end == current_factor.start
if (!last_written_factor.has_value()) {
// No last written factor - shouldn't happen, but handle gracefully
continue;
}
size_t last_end = last_written_factor->start + last_written_factor->length;
// Read factors and look for convergence
while (ifs.read(reinterpret_cast<char*>(&factor), sizeof(Factor))) {
if (!found_convergence) {
// Still looking for convergence point
if (factor.start == last_end) {
// Found convergence! Start copying from this factor onwards
found_convergence = true;
} else {
// Skip this factor - it's before convergence
continue;
}
}
// Write factor to output (either we found convergence or we're continuing)
ofs.write(reinterpret_cast<const char*>(&factor), sizeof(Factor));
total_factors++;
total_length += factor.length;
last_written_factor = factor;
size_t factor_end = factor.start + factor.length;
if (factor_end > current_position) {
current_position = factor_end;
}
last_end = factor_end; // Update for next iteration
// Stop if we've covered the entire sequence
if (current_position >= text_length) {
break;
}
}
// If we've covered the entire sequence, stop processing more threads
if (current_position >= text_length) {
break;
}
}
}
// Write footer at the end (v2 format) - single location for all cases
std::ofstream ofs(output_path, std::ios::binary | std::ios::app);
if (!ofs) {
throw std::runtime_error("Cannot open output file for appending footer: " + output_path);
}
FactorFileFooter footer;
footer.num_factors = total_factors;
footer.num_sequences = 0; // Unknown for general (non-FASTA) factorization
footer.num_sentinels = 0; // No sentinels for general factorization
footer.footer_size = sizeof(FactorFileFooter);
footer.total_length = total_length;
ofs.write(reinterpret_cast<const char*>(&footer), sizeof(footer));
return total_factors;
}
void ParallelFactorizer::cleanup_temp_files(const std::vector<ThreadContext>& contexts) {
for (const auto& ctx : contexts) {
// Skip thread 0 - it wrote directly to the output file
if (ctx.thread_id == 0) continue;
try {
fs::remove(ctx.temp_file_path);
} catch (const std::exception& e) {
std::cerr << "Warning: Failed to remove temporary file: " << ctx.temp_file_path
<< " - " << e.what() << std::endl;
}
}
}
size_t ParallelFactorizer::parallel_factorize_file(const std::string& input_path,
const std::string& output_path,
size_t num_threads,
size_t start_pos) {
// Read the file content
std::ifstream is(input_path, std::ios::binary);
if (!is) {
throw std::runtime_error("Cannot open input file: " + input_path);
}
std::string data((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
return parallel_factorize(data, output_path, num_threads, start_pos);
}
size_t ParallelFactorizer::parallel_factorize_multiple_dna_w_rc(const std::string& prepared_string,
size_t original_length,
const std::string& output_path,
size_t num_threads,
size_t start_pos) {
if (prepared_string.empty() || original_length == 0) return 0;
const std::string& S = prepared_string;
// Validate minimum string length to avoid underflow and SDSL errors
// Format for w/ RC: T1!T2@...Tn$rt(Tn)%...rt(T1)&
// Minimum valid input: "A$A&" (4 chars: sequence + sentinel + reverse_complement + sentinel)
// - Position 0: Single nucleotide (e.g., 'A')
// - Position 1: Sentinel separating original from RC (e.g., '$')
// - Position 2: Reverse complement of the nucleotide (e.g., 'A' is revcomp of 'T')
// - Position 3: Final sentinel (e.g., '&')
if (S.size() < 4) {
std::cerr << "Warning: Input string too short for factorization with reverse complement (size="
<< S.size() << "). Returning 0 factors." << std::endl;
return 0;
}
const size_t N = (S.size() / 2) - 1;
if (N == 0) {
std::cerr << "Warning: Computed N=0 from input size=" << S.size()
<< ". Returning 0 factors." << std::endl;
return 0;
}
// Validate start_pos
if (start_pos >= N) {
throw std::invalid_argument("start_pos must be less than the original sequence length");
}
// For DNA w/ RC, we can only parallelize the original sequence part (start_pos to N)
// The algorithm needs the full prepared string with RC for suffix tree
// Determine optimal thread count if not specified
if (num_threads == 0) {
size_t remaining_length = N - start_pos;
size_t max_useful_threads = remaining_length / MIN_CHARS_PER_THREAD;
size_t hardware_threads = std::thread::hardware_concurrency();
num_threads = std::min(hardware_threads, max_useful_threads);
num_threads = std::max(1UL, num_threads);
}
// Build CST over the prepared string (T + sentinel + RC(T) + sentinel)
cst_t cst;
construct_im(cst, S, 1);
// Build RMQ structures for DNA w/ RC algorithm
const uint64_t INF = std::numeric_limits<uint64_t>::max() / 2ULL;
sdsl::int_vector<64> fwd_starts(cst.csa.size(), INF);
sdsl::int_vector<64> rc_ends(cst.csa.size(), INF);
const size_t T_end = N; // end of original (exclusive)
const size_t R_beg = N + 1; // first char of rc (skip sentinel at position N)
const size_t R_end = S.size() - 1; // exclude final sentinel
for (size_t k = 0; k < cst.csa.size(); ++k) {
size_t posS = cst.csa[k];
if (posS < T_end) {
// suffix starts in T
fwd_starts[k] = posS; // 0-based start in T
} else if (posS >= R_beg && posS < R_end) {
// suffix starts in R
size_t jR0 = posS - R_beg; // 0-based start in R
size_t endT0 = N - jR0 - 1; // mapped end in T (0-based)
rc_ends[k] = endT0;
}
}
sdsl::rmq_succinct_sct<> rmqF(&fwd_starts);
sdsl::rmq_succinct_sct<> rmqRcEnd(&rc_ends);
// Create contexts for each thread - divide only from start_pos to N
std::vector<ThreadContext> contexts(num_threads);
const size_t remaining_length = N - start_pos;
const size_t chunk_size = remaining_length / num_threads;
for (size_t i = 0; i < num_threads; ++i) {
contexts[i].thread_id = i;
contexts[i].start_pos = start_pos + (i * chunk_size);
contexts[i].end_pos = (i + 1 < num_threads) ? (start_pos + (i + 1) * chunk_size) : N;
contexts[i].text_length = N; // Only the original sequence length
contexts[i].temp_file_path = (i == 0) ? output_path : create_temp_file_path(i);
contexts[i].is_last_thread = (i == num_threads - 1);
}
// Create mutexes for file access
std::vector<std::mutex> file_mutexes(num_threads);
// Track exceptions from threads
std::vector<std::exception_ptr> thread_exceptions(num_threads, nullptr);
std::mutex exception_mutex;
// Lambda to pass RMQ structures to threads with exception handling
auto factorize_dna_thread = [&](ThreadContext& ctx) {
try {
factorize_dna_w_rc_thread(cst, rmqF, rmqRcEnd, fwd_starts, rc_ends, INF, N,
ctx, contexts, file_mutexes);
} catch (...) {
// Capture exception for the main thread to handle
std::lock_guard<std::mutex> lock(exception_mutex);
thread_exceptions[ctx.thread_id] = std::current_exception();
}
};
// Create and start worker threads
std::vector<std::thread> threads;
for (size_t i = 0; i < num_threads; ++i) {
threads.emplace_back(factorize_dna_thread, std::ref(contexts[i]));
}
// Wait for all threads to complete
for (auto& t : threads) {
if (t.joinable()) t.join();
}
// Check for exceptions from threads
for (size_t i = 0; i < num_threads; ++i) {
if (thread_exceptions[i]) {
// Cleanup temporary files before rethrowing
cleanup_temp_files(contexts);
std::rethrow_exception(thread_exceptions[i]);
}
}
// Merge temporary files and create final output
size_t total_factors = merge_temp_files(output_path, contexts);
// Cleanup temporary files
cleanup_temp_files(contexts);
return total_factors;
}
size_t ParallelFactorizer::parallel_factorize_dna_w_rc(std::string_view text,
const std::string& output_path,
size_t num_threads,
size_t start_pos) {
if (text.empty()) return 0;
// Prepare the DNA sequence with reverse complement
std::string text_str(text);
PreparedSequenceResult prep_result = prepare_multiple_dna_sequences_w_rc({text_str});
// Call the core function with the prepared string
return parallel_factorize_multiple_dna_w_rc(prep_result.prepared_string,
prep_result.original_length,
output_path,
num_threads,
start_pos);
}
size_t ParallelFactorizer::parallel_factorize_file_dna_w_rc(const std::string& input_path,
const std::string& output_path,
size_t num_threads) {
std::ifstream is(input_path, std::ios::binary);
if (!is) {
throw std::runtime_error("Cannot open input file: " + input_path);
}
std::string data((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
return parallel_factorize_dna_w_rc(data, output_path, num_threads);
}
} // namespace noLZSS