"""DADA2 stage: run DADA2_Pipe.R and move the generated FASTA."""
from __future__ import annotations
import glob
import gzip
import importlib.resources as pkg_resources
import logging
import os
import shutil
import subprocess
import tempfile
import urllib.request
from pathlib import Path
from capellini.config import CapelliniConfig
from capellini.utils.io import download_if_missing
logger = logging.getLogger(__name__)
def _ensure_silva_references(cfg: CapelliniConfig) -> None:
"""Download SILVA training set + taxmap if missing (mirrors notebook DADA2 cell).
The taxmap is published as ``tax_slv_ssu_138.1.txt.gz``; we download the
gzipped file to ``cfg.silva_taxmap_path``, decompress it next to itself
(stripping a trailing ``.gz`` if present), and remove the gzipped copy.
"""
download_if_missing(cfg.silva_ref_url, cfg.silva_ref_path,
label="SILVA Reference")
if not os.path.isfile(cfg.silva_taxmap_path):
print(" • Download of SILVA TaxMap")
zipped_name = cfg.silva_taxmap_path + ".gz"
urllib.request.urlretrieve(cfg.silva_taxmap_url, zipped_name)
with gzip.open(zipped_name, "rb") as f_in:
with open(cfg.silva_taxmap_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(zipped_name)
else:
print("SILVA TaxMap found - skipping download")
def _get_r_script_path() -> Path:
"""Locate the bundled DADA2_Pipe.R inside the installed package.
Returns:
Absolute path to DADA2_Pipe.R.
"""
ref = pkg_resources.files("capellini.r_scripts").joinpath("DADA2_Pipe.R")
return Path(str(ref))
[docs]
def run_dada2(cfg: CapelliniConfig) -> Path:
"""Run the DADA2_Pipe.R script and move the output FASTA to the input folder.
The R script is called with six arguments matching the notebook:
Rscript DADA2_Pipe.R <bacterial_raw_fasta_folder> <dada2_folder>
<silva_ref_path> <silva_taxmap_path> <direction> <TRUE|FALSE>
After the run the produced ASV_sequences_{F|R|P}.fasta is moved to
``cfg.input_fasta_folder / cfg.bacteria_fasta_name``.
Args:
cfg: Populated CapelliniConfig instance.
Returns:
Path to the moved bacteria FASTA file.
Raises:
FileNotFoundError: If the FASTA is not found after the R run.
subprocess.CalledProcessError: If Rscript exits non-zero.
"""
logger.info("DADA2: starting R pipeline")
_ensure_silva_references(cfg)
r_script = _get_r_script_path()
fasta_generation_str = "TRUE" if cfg.fasta_generation else "FALSE"
cmd = [
"Rscript",
str(r_script),
cfg.bacterial_raw_fasta_folder,
cfg.dada2_folder,
cfg.silva_ref_path,
cfg.silva_taxmap_path,
cfg.direction,
fasta_generation_str,
]
logger.info("Running DADA2 Pipeline...")
logger.debug("Command: %s", " ".join(cmd))
result = subprocess.run(cmd, capture_output=True, text=True)
logger.debug("STDOUT:\n%s", result.stdout)
if result.returncode != 0:
raise subprocess.CalledProcessError(
result.returncode,
cmd,
output=result.stdout,
stderr=result.stderr,
)
# Move generated FASTA to input folder
bact_path = Path(cfg.input_fasta_folder) / cfg.bacteria_fasta_name
fasta_files = glob.glob(os.path.join(cfg.dada2_folder, "*.fasta"))
if fasta_files:
src = fasta_files[0]
shutil.move(src, str(bact_path))
logger.debug("Moved %s -> %s", src, bact_path)
if not bact_path.is_file():
raise FileNotFoundError(
f"Bacteria 16S FASTA not found after DADA2 run: {bact_path}"
)
logger.info("DADA2 complete: %s", bact_path.name)
return bact_path