Source code for autorino.common.decompress

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Jan 29 11:53:09 2024

@author: psakic

This module, decompress.py, provides functions for decompressing files,
specifically those that are gzipped or in Hatanaka-compressed RINEX format.
"""

import gzip

import os
import shutil
from pathlib import Path

import hatanaka

from geodezyx import conv

#### Import the logger
import logging
import autorino.cfgenv.env_read as aroenv

logger = logging.getLogger("autorino")
logger.setLevel(aroenv.ARO_ENV_DIC["general"]["log_level"])


[docs] def is_compressed(file_inp): """ Checks if a file is compressed based on its extension. Parameters ---------- file_inp : str The input file to check. Returns ------- bool True if the file is compressed, False otherwise. """ file_inp2 = Path(file_inp) ext = file_inp2.suffix.lower() if ext in (".gz",): bool_compress = True else: bool_compress = False return bool_compress
[docs] def decomp_gzip(gzip_file_inp, out_dir_inp=None, force=False): """ Decompresses a gzipped file. Parameters ---------- gzip_file_inp : str The input gzipped file to decompress. out_dir_inp : str, optional The output directory where the decompressed file will be stored. If not provided, the decompressed file will be stored in the same directory as the input file. force : bool, optional If True, the file will be decompressed even if a decompressed file already exists. Returns ------- str The path to the decompressed file. """ gzip_file_inp = str(gzip_file_inp) gzip_file2 = Path(gzip_file_inp) if not out_dir_inp: out_dir = gzip_file2.parent else: out_dir = Path(out_dir_inp) file_out = out_dir.joinpath(gzip_file2.stem) if file_out.exists() and not force: pass else: with gzip.open(gzip_file_inp, "rb") as f_in: with open(file_out, "wb") as f_out: shutil.copyfileobj(f_in, f_out) logger.debug("decompress (gzip): %s > %s", gzip_file2.name, file_out) return str(file_out)
[docs] def decomp_hatanaka(crx_file_inp, out_dir_inp=None, force=False): """ Decompresses a Hatanaka-compressed RINEX file. Parameters ---------- crx_file_inp : str The input Hatanaka-compressed RINEX file to decompress. out_dir_inp : str, optional The output directory where the decompressed file will be stored. If not provided, the decompressed file will be stored in the same directory as the input file. force : bool, optional If True, the file will be decompressed even if a decompressed file already exists. Returns ------- str The path to the decompressed file. """ crx_file_inp = str(crx_file_inp) crx_file_inp2 = Path(crx_file_inp) if out_dir_inp: out_dir = out_dir_inp crx_file = shutil.copy2(crx_file_inp, out_dir) dell = True else: out_dir = os.path.dirname(crx_file_inp) crx_file = crx_file_inp dell = False rnx_name_potential = os.path.basename(crx_file).split(".")[0] + ".rnx" rnx_file_potential = os.path.join(out_dir, rnx_name_potential) if os.path.isfile(rnx_file_potential) and not force: rnx_file_out = rnx_file_potential else: rnx_file_out = hatanaka.decompress_on_disk(crx_file, delete=dell) logger.debug("decompress (hatanaka): %s > %s", crx_file_inp2.name, rnx_file_out) return str(rnx_file_out)
[docs] def decompress_file(file_inp, out_dir_inp=None, force=False): """ Decompresses a file. The file can be gzipped or in Hatanaka-compressed RINEX format. Parameters ---------- file_inp : str The input file to decompress. out_dir_inp : str, optional The output directory where the decompressed file will be stored. If not provided, the decompressed file will be stored in the same directory as the input file. force : bool, optional If True, the file will be decompressed even if a decompressed file already exists. Returns ------- str The path to the decompressed file. bool True if the file was decompressed, False otherwise. """ file_inp = str(file_inp) file_inp2 = Path(file_inp) ext = file_inp2.suffix.lower() if not os.path.isfile(file_inp): logger.warning("unable to decompress, file not exists: %s", file_inp2.name) file_out = file_inp bool_decomp_out = False ## RINEX Case elif conv.rinex_regex_search_tester(file_inp, compressed=True): file_out = decomp_hatanaka(file_inp, out_dir_inp, force=force) bool_decomp_out = True ## Generic gzipped case (e.g. RAW file) elif ext == ".gz": file_out = decomp_gzip(file_inp, out_dir_inp, force=force) bool_decomp_out = True else: logger.debug("no valid compression for %s, nothing is done", file_inp2.name) file_out = file_inp bool_decomp_out = False return file_out, bool_decomp_out