Program Listing for File parallel_fasta_processor.cpp

Return to documentation for file (src/cpp/parallel_fasta_processor.cpp)

#include "parallel_fasta_processor.hpp"
#include "parallel_factorizer.hpp"
#include "factorizer_core.hpp"
#include "factorizer_helpers.hpp"
#include <fstream>
#include <atomic>
#include <sdsl/rmq_succinct_sct.hpp>
#include <sdsl/construct.hpp>
#include <thread>
#include <mutex>
#include <vector>
#include <filesystem>

namespace fs = std::filesystem;
namespace noLZSS {

static void write_fasta_metadata(std::ofstream& os,
                                 const std::vector<std::string>& sequence_ids,
                                 const std::vector<uint64_t>& sentinel_factor_indices,
                                 size_t factor_count,
                                 uint64_t total_length) {
    // Calculate footer size
    size_t names_size = 0;
    for (const auto& name : sequence_ids) {
        names_size += name.length() + 1;  // +1 for null terminator
    }

    size_t footer_size = sizeof(FactorFileFooter) + names_size +
                        sentinel_factor_indices.size() * sizeof(uint64_t);

    // Write sequence names
    for (const auto& name : sequence_ids) {
        os.write(name.c_str(), name.length() + 1);  // Include null terminator
    }

    // Write sentinel factor indices
    for (uint64_t idx : sentinel_factor_indices) {
        os.write(reinterpret_cast<const char*>(&idx), sizeof(idx));
    }

    // Write footer at the end
    FactorFileFooter footer;
    footer.num_factors = factor_count;
    footer.num_sequences = sequence_ids.size();
    footer.num_sentinels = sentinel_factor_indices.size();
    footer.footer_size = footer_size;
    footer.total_length = total_length;

    os.write(reinterpret_cast<const char*>(&footer), sizeof(footer));
}

size_t parallel_write_factors_binary_file_fasta_multiple_dna_w_rc(
    const std::string& fasta_path,
    const std::string& out_path,
    size_t num_threads,
    FastaDnaSanitizationMode sanitization_mode) {

    // Parse FASTA file into individual sequences with IDs
    FastaParseResult parse_result = parse_fasta_sequences_and_ids(fasta_path, sanitization_mode);

    // Prepare sequences for factorization (this will validate nucleotides)
    PreparedSequenceResult prep_result = prepare_multiple_dna_sequences_w_rc(parse_result.sequences);

    // Use the core parallel function with the prepared string
    ParallelFactorizer factorizer;
    size_t factor_count = factorizer.parallel_factorize_multiple_dna_w_rc(
        prep_result.prepared_string,
        prep_result.original_length,
        out_path,
        num_threads);

    // Now identify sentinel factors by reading back the factors
    std::vector<uint64_t> sentinel_factor_indices;
    uint64_t total_length = 0;

    std::ifstream temp_is(out_path, std::ios::binary);
    if (!temp_is) {
        throw std::runtime_error("Cannot read output file to identify sentinels: " + out_path);
    }

    size_t sentinel_idx = 0;
    for (size_t i = 0; i < factor_count; ++i) {
        Factor f = {0, 0, 0};
        if (!temp_is.read(reinterpret_cast<char*>(&f), sizeof(Factor))) {
            throw std::runtime_error("Error reading factor from output file at index " + std::to_string(i));
        }
        total_length += f.length;

        // Check if this is a sentinel factor
        while (sentinel_idx < prep_result.sentinel_positions.size() &&
               prep_result.sentinel_positions[sentinel_idx] < f.start) {
            sentinel_idx++;
        }

        if (sentinel_idx < prep_result.sentinel_positions.size() &&
            f.start == prep_result.sentinel_positions[sentinel_idx]) {
            sentinel_factor_indices.push_back(i);
            sentinel_idx++;
        }
    }
    temp_is.close();

    // Truncate file to remove the basic footer written by merge_temp_files
    // We need to replace it with the metadata-enriched footer
    fs::resize_file(out_path, factor_count * sizeof(Factor));

    // Append metadata footer to the file
    std::ofstream os(out_path, std::ios::binary | std::ios::app);
    if (!os) {
        throw std::runtime_error("Cannot append metadata to output file: " + out_path);
    }

    write_fasta_metadata(os, parse_result.sequence_ids, sentinel_factor_indices, factor_count, total_length);

    return factor_count;
}

size_t parallel_write_factors_binary_file_fasta_multiple_dna_no_rc(
    const std::string& fasta_path,
    const std::string& out_path,
    size_t num_threads,
    FastaDnaSanitizationMode sanitization_mode) {

    // Parse FASTA file into individual sequences with IDs
    FastaParseResult parse_result = parse_fasta_sequences_and_ids(fasta_path, sanitization_mode);

    // Prepare sequences for factorization (this will validate nucleotides)
    PreparedSequenceResult prep_result = prepare_multiple_dna_sequences_no_rc(parse_result.sequences);

    // Use ParallelFactorizer for the actual work
    ParallelFactorizer factorizer;
    size_t factor_count = factorizer.parallel_factorize(prep_result.prepared_string, out_path, num_threads);

    // Now we need to identify sentinel factors by reading back the factors
    std::vector<uint64_t> sentinel_factor_indices;
    uint64_t total_length = 0;

    // Read factors back to identify sentinels
    std::ifstream temp_is(out_path, std::ios::binary);
    if (!temp_is) {
        throw std::runtime_error("Cannot read output file to identify sentinels: " + out_path);
    }

    size_t sentinel_idx = 0;
    for (size_t i = 0; i < factor_count; ++i) {
        Factor f = {0, 0, 0};
        if (!temp_is.read(reinterpret_cast<char*>(&f), sizeof(Factor))) {
            throw std::runtime_error("Error reading factor from output file at index " + std::to_string(i));
        }
        total_length += f.length;

        // Check if this is a sentinel factor
        while (sentinel_idx < prep_result.sentinel_positions.size() &&
               prep_result.sentinel_positions[sentinel_idx] < f.start) {
            sentinel_idx++;
        }

        if (sentinel_idx < prep_result.sentinel_positions.size() &&
            f.start == prep_result.sentinel_positions[sentinel_idx]) {
            sentinel_factor_indices.push_back(i);
            sentinel_idx++;
        }
    }
    temp_is.close();

    // Truncate file to remove the basic footer written by merge_temp_files
    // We need to replace it with the metadata-enriched footer
    fs::resize_file(out_path, factor_count * sizeof(Factor));

    // Append metadata footer to the file
    std::ofstream os(out_path, std::ios::binary | std::ios::app);
    if (!os) {
        throw std::runtime_error("Cannot append metadata to output file: " + out_path);
    }

    write_fasta_metadata(os, parse_result.sequence_ids, sentinel_factor_indices, factor_count, total_length);

    return factor_count;
}

size_t parallel_write_factors_dna_w_reference_fasta_files_to_binary(
    const std::string& reference_fasta_path,
    const std::string& target_fasta_path,
    const std::string& out_path,
    size_t num_threads,
    FastaDnaSanitizationMode sanitization_mode) {

    // Process both FASTA files and get prepared sequence with reverse complement
    FastaReferenceTargetResult ref_target_concat_w_rc =
        prepare_ref_target_dna_w_rc_from_fasta(reference_fasta_path, target_fasta_path, sanitization_mode);

    // Use the core parallel function with the prepared string and start position
    ParallelFactorizer factorizer;
    size_t factor_count = factorizer.parallel_factorize_multiple_dna_w_rc(
        ref_target_concat_w_rc.concatinated_sequences.prepared_string,
        ref_target_concat_w_rc.concatinated_sequences.original_length,
        out_path,
        num_threads,
        ref_target_concat_w_rc.target_start_index);

    // Now identify sentinel factors by reading back the factors
    std::vector<uint64_t> sentinel_factor_indices;
    uint64_t total_length = 0;

    std::ifstream temp_is(out_path, std::ios::binary);
    if (!temp_is) {
        throw std::runtime_error("Cannot read output file to identify sentinels: " + out_path);
    }

    size_t sentinel_idx = 0;
    for (size_t i = 0; i < factor_count; ++i) {
        Factor f = {0, 0, 0};
        if (!temp_is.read(reinterpret_cast<char*>(&f), sizeof(Factor))) {
            throw std::runtime_error("Error reading factor from output file at index " + std::to_string(i));
        }
        total_length += f.length;

        // Check if this is a sentinel factor
        while (sentinel_idx < ref_target_concat_w_rc.concatinated_sequences.sentinel_positions.size() &&
               ref_target_concat_w_rc.concatinated_sequences.sentinel_positions[sentinel_idx] < f.start) {
            sentinel_idx++;
        }

        if (sentinel_idx < ref_target_concat_w_rc.concatinated_sequences.sentinel_positions.size() &&
            f.start == ref_target_concat_w_rc.concatinated_sequences.sentinel_positions[sentinel_idx]) {
            sentinel_factor_indices.push_back(i);
            sentinel_idx++;
        }
    }
    temp_is.close();

    // Truncate file to remove the basic footer written by merge_temp_files
    // We need to replace it with the metadata-enriched footer
    fs::resize_file(out_path, factor_count * sizeof(Factor));

    // Append metadata footer to the file
    std::ofstream os(out_path, std::ios::binary | std::ios::app);
    if (!os) {
        throw std::runtime_error("Cannot append metadata to output file: " + out_path);
    }

    write_fasta_metadata(os, ref_target_concat_w_rc.sequence_ids, sentinel_factor_indices, factor_count, total_length);

    return factor_count;
}

static void write_single_sequence_factors(const std::vector<Factor>& factors,
                                          const std::string& sequence_id,
                                          const std::string& output_path) {
    std::ofstream os(output_path, std::ios::binary | std::ios::trunc);
    if (!os) {
        throw std::runtime_error("Cannot create output file: " + output_path);
    }

    // Write factors
    uint64_t total_length = 0;
    for (const auto& f : factors) {
        os.write(reinterpret_cast<const char*>(&f), sizeof(Factor));
        total_length += f.length;
    }

    // Write sequence ID
    os.write(sequence_id.c_str(), sequence_id.length() + 1);  // Include null terminator

    // Write footer
    size_t footer_size = sizeof(FactorFileFooter) + sequence_id.length() + 1;

    FactorFileFooter footer;
    footer.num_factors = factors.size();
    footer.num_sequences = 1;  // Single sequence per file
    footer.num_sentinels = 0;  // No sentinels in per-sequence factorization
    footer.footer_size = footer_size;
    footer.total_length = total_length;

    os.write(reinterpret_cast<const char*>(&footer), sizeof(footer));
}

static std::string sanitize_filename(const std::string& seq_id) {
    std::string safe_name = seq_id;
    for (char& c : safe_name) {
        // Replace problematic characters with underscore
        if (c == '/' || c == '\\' || c == ':' || c == '*' || c == '?' ||
            c == '"' || c == '<' || c == '>' || c == '|' || c == ' ') {
            c = '_';
        }
    }
    return safe_name;
}

size_t parallel_write_factors_binary_file_fasta_dna_w_rc_per_sequence(
    const std::string& fasta_path,
    const std::string& out_dir,
    size_t num_threads,
    FastaDnaSanitizationMode sanitization_mode) {

    // Parse FASTA file into individual sequences with IDs
    FastaParseResult parse_result = parse_fasta_sequences_and_ids(fasta_path, sanitization_mode);

    size_t num_sequences = parse_result.sequences.size();

    // Determine actual number of threads to use
    if (num_threads == 0) {
        num_threads = std::min(num_sequences, static_cast<size_t>(std::thread::hardware_concurrency()));
    }
    num_threads = std::min(num_threads, num_sequences);

    // Create output directory if it doesn't exist
    fs::create_directories(out_dir);

    // Storage for results
    std::vector<std::vector<Factor>> all_factors(num_sequences);
    std::atomic<size_t> total_factor_count(0);

    // Parallel processing of sequences
    if (num_threads == 1) {
        // Sequential processing
        for (size_t i = 0; i < num_sequences; ++i) {
            std::vector<std::string> single_seq = {parse_result.sequences[i]};
            PreparedSequenceResult prep_result = prepare_multiple_dna_sequences_w_rc(single_seq);
            all_factors[i] = factorize_multiple_dna_w_rc(prep_result.prepared_string);

            // Write to separate file
            std::string safe_id = sanitize_filename(parse_result.sequence_ids[i]);
            std::string output_path = out_dir + "/" + safe_id + ".bin";
            write_single_sequence_factors(all_factors[i], parse_result.sequence_ids[i], output_path);

            total_factor_count.fetch_add(all_factors[i].size());
        }
    } else {
        // Parallel processing
        std::vector<std::thread> threads;
        std::atomic<size_t> next_sequence(0);

        for (size_t t = 0; t < num_threads; ++t) {
            threads.emplace_back([&]() {
                while (true) {
                    size_t seq_idx = next_sequence.fetch_add(1);
                    if (seq_idx >= num_sequences) break;

                    std::vector<std::string> single_seq = {parse_result.sequences[seq_idx]};
                    PreparedSequenceResult prep_result = prepare_multiple_dna_sequences_w_rc(single_seq);
                    std::vector<Factor> factors = factorize_multiple_dna_w_rc(prep_result.prepared_string);

                    // Write to separate file
                    std::string safe_id = sanitize_filename(parse_result.sequence_ids[seq_idx]);
                    std::string output_path = out_dir + "/" + safe_id + ".bin";
                    write_single_sequence_factors(factors, parse_result.sequence_ids[seq_idx], output_path);

                    total_factor_count.fetch_add(factors.size());
                }
            });
        }

        for (auto& thread : threads) {
            thread.join();
        }
    }

    return total_factor_count.load();
}

size_t parallel_write_factors_binary_file_fasta_dna_no_rc_per_sequence(
    const std::string& fasta_path,
    const std::string& out_dir,
    size_t num_threads,
    FastaDnaSanitizationMode sanitization_mode) {

    // Parse FASTA file into individual sequences with IDs
    FastaParseResult parse_result = parse_fasta_sequences_and_ids(fasta_path, sanitization_mode);

    size_t num_sequences = parse_result.sequences.size();

    // Determine actual number of threads to use
    if (num_threads == 0) {
        num_threads = std::min(num_sequences, static_cast<size_t>(std::thread::hardware_concurrency()));
    }
    num_threads = std::min(num_threads, num_sequences);

    // Create output directory if it doesn't exist
    fs::create_directories(out_dir);

    // Storage for results
    std::vector<std::vector<Factor>> all_factors(num_sequences);
    std::atomic<size_t> total_factor_count(0);

    // Parallel processing of sequences
    if (num_threads == 1) {
        // Sequential processing
        for (size_t i = 0; i < num_sequences; ++i) {
            std::vector<std::string> single_seq = {parse_result.sequences[i]};
            PreparedSequenceResult prep_result = prepare_multiple_dna_sequences_no_rc(single_seq);
            // Remove the sentinel at the end
            std::string seq_without_sentinel = prep_result.prepared_string.substr(0, prep_result.prepared_string.length() - 1);
            all_factors[i] = factorize(seq_without_sentinel);

            // Write to separate file
            std::string safe_id = sanitize_filename(parse_result.sequence_ids[i]);
            std::string output_path = out_dir + "/" + safe_id + ".bin";
            write_single_sequence_factors(all_factors[i], parse_result.sequence_ids[i], output_path);

            total_factor_count.fetch_add(all_factors[i].size());
        }
    } else {
        // Parallel processing
        std::vector<std::thread> threads;
        std::atomic<size_t> next_sequence(0);

        for (size_t t = 0; t < num_threads; ++t) {
            threads.emplace_back([&]() {
                while (true) {
                    size_t seq_idx = next_sequence.fetch_add(1);
                    if (seq_idx >= num_sequences) break;

                    std::vector<std::string> single_seq = {parse_result.sequences[seq_idx]};
                    PreparedSequenceResult prep_result = prepare_multiple_dna_sequences_no_rc(single_seq);
                    // Remove the sentinel at the end
                    std::string seq_without_sentinel = prep_result.prepared_string.substr(0, prep_result.prepared_string.length() - 1);
                    std::vector<Factor> factors = factorize(seq_without_sentinel);

                    // Write to separate file
                    std::string safe_id = sanitize_filename(parse_result.sequence_ids[seq_idx]);
                    std::string output_path = out_dir + "/" + safe_id + ".bin";
                    write_single_sequence_factors(factors, parse_result.sequence_ids[seq_idx], output_path);

                    total_factor_count.fetch_add(factors.size());
                }
            });
        }

        for (auto& thread : threads) {
            thread.join();
        }
    }

    return total_factor_count.load();
}

} // namespace noLZSS