#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Feb 21 16:23:06 2023
@author: psakic
"""
import datetime as dt
import os
import re
import subprocess
from pathlib import Path
from subprocess import PIPE
from typing import Union, List
import autorino.convert as arocnv
from geodezyx import utils, conv
#### Import the logger
import logging
import autorino.cfgenv.env_read as aroenv
logger = logging.getLogger("autorino")
logger.setLevel(aroenv.ARO_ENV_DIC["general"]["log_level"])
###################################################################
# +++++ conversion function
[docs]
def _convert_select(converter_inp, inp_raw_fpath=None):
"""
internal function for ``converter_run``.
Find the correct RAW > RINEX converter and gives its corresponding attributes
Returns directly those attributes based on `converter_inp` keyword,
or can do a basic research based on the RAW file extension
See also autorino.conv_fcts.select_conv_odd_file
for the converter selection of oddly named files
Parameters
----------
converter_inp : str
name of the converter used.
see ``converter_run`` help for more details
inp_raw_fpath : Path, optional
RAW file path. used for converter research based on the RAW
file extension. The default is None.
Returns
-------
converter_name : str
converter's name.
brand : str
converter's name/manufacturer.
cmd_build_fct : function
interface function with the converter to perform the conversion.
see ``converter_run``'s help for more details
conv_regex_fct : function
interface function to find the converted file with a regular expression.
bin_options : list
options for the conversion program. The default is [].
bin_kwoptions : dict
keyword options for the conversion program. The default is dict().
"""
if converter_inp == "auto" and not inp_raw_fpath:
logger.error(
"not converter nor input file given, \
unable to returns the right conversion fcts"
)
raise Exception
# + for RINEX handeling, inp_raw_fpath can ben an iterable (list)
# + thus we just keep the 1st elt
if utils.is_iterable(inp_raw_fpath):
inp_raw_fpath = inp_raw_fpath[0]
if converter_inp == "auto":
inp_raw_fpath = Path(inp_raw_fpath)
ext = inp_raw_fpath.suffix.upper()
fname = inp_raw_fpath.name.upper()
if ext in (".GZ", ".7Z", ".7ZIP", ".ZIP", ".Z"):
logger.debug("%s is compressed", fname)
ext = Path(Path(fname).stem).suffix.upper()
fname = Path(Path(fname).stem).name.upper()
else:
inp_raw_fpath = Path(inp_raw_fpath)
ext = ""
fname = inp_raw_fpath.name.upper()
# +++++ TRIMBLE
# preliminary tests: Trimble default converter must be extracted from the environment values
if ext in (".T00", ".T01", ".T02", ".T04") and converter_inp == "auto":
converter_inp = aroenv.ARO_ENV_DIC["general"]["trimble_default_software"]
logger.debug(
"Trimble default converter defined in environnement: %s", converter_inp
)
# main test for Trimble : choose the right converter
if ext in (".T00", ".T01", ".T02", ".T04") and converter_inp == "t0xconvert":
converter_name = "t0xconvert"
brand = "Trimble (official converter)"
cmd_build_fct = arocnv.cmd_build_t0xconvert
conv_regex_fct = arocnv.conv_regex_t0xconvert
bin_options = []
bin_kwoptions = dict()
elif ext in (".T00", ".T01", ".T02", ".T04") and converter_inp == "trm2rinex":
converter_name = "trm2rinex"
brand = "Trimble (unofficial Docker converter)"
cmd_build_fct = arocnv.cmd_build_trm2rinex
conv_regex_fct = arocnv.conv_regex_trm2rinex
bin_options = []
bin_kwoptions = dict()
elif ext == (".T00", ".T01", ".T02", ".T04") and converter_inp == "runpkr00":
converter_name = "runpkr00"
brand = "Trimble (legacy converter)"
cmd_build_fct = arocnv.cmd_build_runpkr00
conv_regex_fct = arocnv.conv_regex_runpkr00
bin_options = []
bin_kwoptions = dict()
elif ext in (".TGD", "TG!") or converter_inp == "teqc":
converter_name = "teqc"
brand = "Trimble"
cmd_build_fct = arocnv.cmd_build_teqc
conv_regex_fct = arocnv.conv_regex_teqc
bin_options = []
bin_kwoptions = dict()
# +++++ ASHTECH
elif re.match(".([0-9]{3})", ext) and fname[0] in ("U", "R", "B"):
converter_name = "teqc"
brand = "Ashtech"
cmd_build_fct = arocnv.cmd_build_teqc
conv_regex_fct = arocnv.conv_regex_teqc
yyyy, doy, week, dow, date = _ashtech_name_2_date(inp_raw_fpath)
ftype = fname[0].lower()
if ftype == "b":
ftype = "d"
bin_options = ["-ash " + ftype + " -week " + str(week)]
# bin_kwoptions = {"-week": str(week)}
bin_kwoptions = dict()
# +++++ LEICA
elif re.match(".(M[0-9]{2}|MDB)", ext) or converter_inp == "mdb2rinex":
converter_name = "mdb2rinex"
brand = "Leica"
cmd_build_fct = arocnv.cmd_build_mdb2rinex
conv_regex_fct = arocnv.conv_regex_mdb2rnx
bin_options = []
bin_kwoptions = dict()
# +++++ SEPTENTRIO
elif re.match(".[0-9]{2}_", ext) or converter_inp == "sbf2rin":
converter_name = "sbf2rin"
brand = "Septentrio"
cmd_build_fct = arocnv.cmd_build_sbf2rin
conv_regex_fct = arocnv.conv_regex_void
bin_options = []
bin_kwoptions = dict()
# +++++ GENERIC BINEX
elif ext == ".BNX" or converter_inp == "convbin":
converter_name = "convbin"
brand = "Generic BINEX"
cmd_build_fct = arocnv.cmd_build_convbin
conv_regex_fct = arocnv.conv_regex_convbin
bin_options = []
bin_kwoptions = dict()
# +++++ TOPCON
elif ext == ".TPS" or converter_inp == "tps2rin":
converter_name = "tps2rin"
brand = "Topcon"
cmd_build_fct = arocnv.cmd_build_tps2rin
conv_regex_fct = arocnv.conv_regex_tps2rin
bin_options = []
bin_kwoptions = dict()
# +++++ GFZRNX
elif converter_inp == "gfzrnx":
converter_name = "gfzrnx"
brand = "RINEX Handeling (GFZ)"
cmd_build_fct = arocnv.cmd_build_gfzrnx
conv_regex_fct = arocnv.conv_regex_gfzrnx
bin_options = []
bin_kwoptions = dict()
# +++++ CONVERTO
elif converter_inp == "converto":
converter_name = "converto"
brand = "RINEX Handeling (IGN)"
cmd_build_fct = arocnv.cmd_build_converto
conv_regex_fct = arocnv.conv_regex_converto
bin_options = []
bin_kwoptions = dict()
else:
logger.error("unable to find the right converter for %s", inp_raw_fpath)
logger.error(
"input-given converter: %s, maybe not implemented yet?", converter_inp
)
raise Exception
logger.debug("brand & converter selected: %s, %s", brand, converter_name)
return (
converter_name,
brand,
cmd_build_fct,
conv_regex_fct,
bin_options,
bin_kwoptions,
)
# set current user as constant
USER, GROUP = arocnv.get_current_user_grp()
[docs]
def converter_run(
inp_raw_fpath: Union[Path, str, List[Path], List[str]],
out_dir: Union[Path, str],
converter="auto",
timeout=180,
bin_options=[],
bin_kwoptions=dict(),
bin_path: Union[Path, str] = "",
remove_converted_annex_files=True,
cmd_build_fct=None,
conv_regex_fct=None,
):
"""
Generic function to run an external RAW > RINEX conversion program.
It will detect automatically which converter has to be executed based on
input RAW file extension,
but this can be customized
Parameters
----------
inp_raw_fpath : Union[Path,str]
path of the input RAW file.
for RINEX Handeling (e.g. splice) a list of path is allowed.
out_dir : Union[Path,str]
destination directory of the converted RINEX.
converter : str, optional
name of the converter used.
Supports :
* 'auto' (automatic choice based on the extension),
* 'trm2rnx' (Trimble unofficial),
* 't0xconvert' (Trimble official),
* 'runpkr00' (Trimble legacy),
* 'teqc' (legacy conversion & RINEX Handeling),
* 'mdb2rinex' (Leica),
* 'sbf2rin' (Septentrio),
* 'convbin' (BINEX),
* 'tps2rin' (Topcon),
* 'converto' (RINEX Handeling)
* 'gfzrnx' (RINEX Handeling)
see ``_convert_select`` function and ``cmd_build`` module
for more details.
The default is 'auto'.
timeout : int, optional
timeout in second for the conversion program call.
The default is 60.
bin_options : list, optional
options for the conversion program. The default is [].
bin_kwoptions : dict, optional
keyword options for the conversion program. The default is dict().
bin_path : Union[Path,str], optional
path of the conversion program bin. The default is "".
remove_converted_annex_files : bool, optional
remove or not the 'annex' converted files.
i.e. the not navigation files (e.g. the navigtation RINEXs).
The default is True.
cmd_build_fct : function, optional
A custom function which build the command calling
the conversion program
See `cmd_build` module for more details.
The default is None.
conv_regex_fct : function, optional
A custom function which build the regular expression to find
the RINEX created during the conversion.
See `cmd_regex` module for more details.
The default is None.
Raises
------
FileNotFoundError
Returns
-------
out_fpath
the path of the converted RINEX.
process_converter
The subprocess object which ran the conversion (for debug purposes).
"""
#### Convert the paths as Path objects
out_dir = Path(out_dir)
## for RINEX handeling, inp_raw_fpath can ben an iterable (list)
if utils.is_iterable(inp_raw_fpath):
raw_fpath_multi = [Path(e) for e in inp_raw_fpath]
raw_fpath_mono = raw_fpath_multi[0]
raw_fpath = raw_fpath_multi
else: # a single file, most common case
raw_fpath_multi = [Path(inp_raw_fpath)]
raw_fpath_mono = Path(inp_raw_fpath)
raw_fpath = raw_fpath_mono
#### Check if input file exists
for f in raw_fpath_multi:
logger.debug("input file: %s", f)
if not f.is_file():
logger.error("input file not found: %s", f)
raise FileNotFoundError
if len(raw_fpath_multi) < 2:
logger.info("input file for conversion: %s", raw_fpath_mono)
else:
logger.info("%i input file for conversion", len(raw_fpath_multi))
# _convert_select can manage both a single file of a list*
# then could handle both raw_fpath_mono or raw_fpath_multi
# thus alias variable raw_fpath will work in both cases
# *. but we thus we just keep the 1st list elt as the representent
# of the full list (we assue it homogene)
out_conv_sel = _convert_select(converter, raw_fpath)
(
converter_name,
brand,
cmd_build_fct_use,
conv_regex_fct_use,
bin_options_use,
bin_kwoptions_use,
) = out_conv_sel
#### Force the arocnv.cmd_build_fct, if any
if cmd_build_fct:
cmd_build_fct_use = cmd_build_fct
#### Force the arocnv.conv_regex_fct, if any
if conv_regex_fct:
conv_regex_fct_use = conv_regex_fct
#### Force the bin_options if any
if bin_options:
bin_options_use = bin_options
#### Force the bin_kwoptions if any
if bin_kwoptions:
bin_kwoptions_use = bin_kwoptions
#### build the command
cmd_use, cmd_list, cmd_str = cmd_build_fct_use(
raw_fpath, out_dir, bin_options_use, bin_kwoptions_use
)
##### BIN PATH !!!!! XXXXX
logger.debug("conversion command: %s", cmd_str)
############# run the external conversion programm #############
timeout_reached = False
start = dt.datetime.now()
try:
process_converter = subprocess.run(
cmd_use,
executable="/bin/bash",
shell=True,
stdout=PIPE,
stderr=PIPE,
timeout=timeout,
)
except subprocess.TimeoutExpired:
process_converter = None
timeout_reached = True
end = dt.datetime.now()
exec_time = (end - start).seconds + (end - start).microseconds * 10**-6
#################################################################
###### check the output on the conversion programm
if timeout_reached:
logger.error("Error while converting %s", raw_fpath_mono.name)
logger.error("Timeout reached (%s seconds)", timeout)
elif process_converter.returncode != 0:
logger.error("Error while converting %s", raw_fpath_mono.name)
logger.error("Converter's error message:")
logger.error(process_converter.stderr.strip())
else:
logger.debug("Conversion done (%7.4f sec.). Converter's output:", exec_time)
logger.debug(process_converter.stdout.strip())
###### get the converted file
#### generate the regex matching the theoretical name for the converted file
#### if a list of input file is given, the 1st one is used as output name
conv_regex_main, conv_regex_annex = conv_regex_fct_use(raw_fpath_mono)
logger.debug(
"regex for the converted files (main/annex.): %s,%s",
conv_regex_main,
conv_regex_annex,
)
#### find the converted file matching the regex
conv_files_main, conv_files_annex = find_conv_files(
out_dir, conv_regex_main, conv_regex_annex
)
if not conv_files_main:
out_fpath = ""
logger.error("✗ converted file not found")
else:
out_fpath = Path(conv_files_main[0])
logger.info(
"✓ conversion OK (%7.4f sec.), main file/size: %s %s",
exec_time,
out_fpath,
out_fpath.stat().st_size,
)
# change ownership
if out_fpath:
arocnv.change_owner(out_fpath, USER, GROUP)
if remove_converted_annex_files:
for f in conv_files_annex:
os.remove(f)
logger.debug("converted annex file removed: %s", f)
return str(out_fpath), process_converter
#############################################################################
### Low level functions
[docs]
def find_conv_files(directory, pattern_main, pattern_annex, n_sec=10):
"""
Searches for the files in a directory that were recently created (within the last n_sec seconds)
and match the main and annex patterns.
This function iterates over all files in the specified directory and checks
if they were created within the last n_sec seconds
and if their names match the main or annex patterns.
It returns two lists of files that match the main and annex patterns, respectively.
Parameters
----------
directory : Path
The directory in which to search for files.
pattern_main : str
The regular expression pattern that the main files should match.
pattern_annex : str
The regular expression pattern that the annex files should match.
n_sec : int, optional
The number of seconds in the past to consider for file creation. Default is 10.
Returns
-------
list
The list of main files that were found.
list
The list of annex files that were found.
"""
now = dt.datetime.now()
delta = dt.timedelta(seconds=n_sec)
files_main = []
files_annex = []
files_main_time = []
files_annex_time = []
for file in os.listdir(directory):
filepath = os.path.join(directory, file)
if os.path.isfile(filepath):
created_time = dt.datetime.fromtimestamp(os.path.getctime(filepath))
if now - created_time < delta and re.match(pattern_main, file):
files_main.append(filepath)
files_main_time.append(created_time)
elif now - created_time < delta and re.match(pattern_annex, file):
files_annex.append(filepath)
files_annex_time.append(created_time)
else:
pass
# Sort the files found
files_main = [x for _, x in sorted(zip(files_main_time, files_main))]
files_annex = [x for _, x in sorted(zip(files_annex_time, files_annex))]
if len(files_main) > 1:
logger.warning("Several converted main files found %s", files_main)
files_main = [files_main[-1]]
logger.warning("Keeping most recent only: %s", files_main[0])
return files_main, files_annex
## https://stackoverflow.com/questions/36495669/difference-between-terms-option-argument-and-parameter
## https://tinf2.vub.ac.be/~dvermeir/mirrors/www-wks.acs.ohio-state.edu/unix_course/intro-14.html
## https://discourse.ubuntu.com/t/command-structure/18556
[docs]
def _ashtech_name_2_date(inp_raw_fpath):
"""
Extracts the record date from an ASHTECH file name.
This function extracts the year, day of year, GPS week, and day of week from the name of an ASHTECH file.
It also returns the date as a Python datetime object.
Parameters
----------
inp_raw_fpath : Path
The path of the input ASHTECH file.
Returns
-------
int
The year extracted from the file name.
int
The day of the year extracted from the file name.
int
The GPS week extracted from the file name.
int
The day of the week extracted from the file name.
datetime
The date extracted from the file name as a Python datetime object.
"""
inp_raw_fpath = Path(inp_raw_fpath)
doy = int(inp_raw_fpath.suffix[1:])
yy = int(inp_raw_fpath.stem[-2:])
if yy < 80:
y2k = 2000
else:
y2k = 1900
yyyy = y2k + yy
date = conv.doy2dt(yyyy, doy)
week, dow = conv.dt2gpstime(date)
return yyyy, doy, week, dow, date