Source code for radar_api.download

# -----------------------------------------------------------------------------.
# MIT License

# Copyright (c) 2025 RADAR-API developers
#
# This file is part of RADAR-API.

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# -----------------------------------------------------------------------------.
"""Define download functions."""

import concurrent.futures
import datetime
import os
import time
from concurrent.futures import ThreadPoolExecutor

import numpy as np
import pandas as pd
from tqdm import tqdm
from trollsift import Parser

from radar_api.checks import (
    check_base_dir,
    check_download_protocol,
    check_network,
    check_product,
    check_radar,
    check_start_end_time,
)
from radar_api.configs import get_base_dir
from radar_api.info import get_info_from_filepath
from radar_api.io import get_directory_pattern, get_filesystem
from radar_api.search import find_files
from radar_api.utils.timing import print_elapsed_time

####--------------------------------------------------------------------------.


[docs] def create_local_directories(fpaths, exist_ok=True): """Create recursively local directories for the provided filepaths.""" _ = [os.makedirs(os.path.dirname(fpath), exist_ok=exist_ok) for fpath in fpaths]
[docs] def remove_corrupted_files(local_fpaths, bucket_fpaths, fs, return_corrupted_fpaths=True): """ Check and remove files from local disk which are corrupted. Corruption is evaluated by comparing the size of data on local storage against size of data located in the cloud bucket. Parameters ---------- local_fpaths : list List of filepaths on local storage. bucket_fpaths : list List of filepaths on cloud bucket. fs : fsspec.FileSystem fsspec filesystem instance. It must be cohrenet with the cloud bucket address of bucket_fpaths. return_corrupted_fpaths : bool, optional If True, it returns the list of corrupted files. If False, it returns the list of valid files. The default is True. Returns ------- tuple (list_local_filepaths, list_bucket_filepaths) """ l_corrupted_local = [] l_corrupted_bucket = [] l_valid_local = [] l_valid_bucket = [] for local_fpath, bucket_fpath in zip(local_fpaths, bucket_fpaths, strict=False): local_exists = os.path.isfile(local_fpath) if local_exists: bucket_size = fs.info(bucket_fpath)["size"] local_size = os.path.getsize(local_fpath) if bucket_size != local_size: os.remove(local_fpath) l_corrupted_local.append(local_fpath) l_corrupted_bucket.append(bucket_fpath) else: l_valid_local.append(local_fpath) l_valid_bucket.append(bucket_fpath) if return_corrupted_fpaths: return l_corrupted_local, l_corrupted_bucket return l_valid_local, l_valid_bucket
def _select_missing_fpaths(local_fpaths, bucket_fpaths): """Return local and bucket filepaths of files not present on the local storage.""" # Keep only non-existing local files idx_not_exist = [not os.path.exists(filepath) for filepath in local_fpaths] local_fpaths = np.array(local_fpaths)[idx_not_exist].tolist() bucket_fpaths = np.array(bucket_fpaths)[idx_not_exist].tolist() return local_fpaths, bucket_fpaths
[docs] def define_local_filepath(filename, network, product, radar, base_dir=None): """Define filepath where to save file locally on disk.""" base_dir = get_base_dir(base_dir) base_dir = check_base_dir(base_dir) # Get directory pattern directory_pattern = get_directory_pattern(protocol="local", network=network, product=product) info_dict = get_info_from_filepath(filename, network=network, product=product) time = info_dict["start_time"] # Define local directory path parser = Parser(directory_pattern) path = parser.compose({"time": time, "radar": radar, "base_dir": base_dir}) # Adapt path to window separator if the case if os.name == "nt": path = path.replace("/", "\\") filepath = os.path.join(path, filename) return filepath
def _get_local_from_bucket_fpaths(base_dir, network, product, radar, bucket_fpaths): """Convert cloud bucket filepaths to local storage filepaths.""" fpaths = [ define_local_filepath( filename=os.path.basename(fpath), network=network, product=product, radar=radar, base_dir=base_dir, ) for fpath in bucket_fpaths ] return fpaths def _fs_get_parallel(bucket_fpaths, local_fpaths, fs, n_threads=10, progress_bar=True): """ Run fs.get() asynchronously in parallel using multithreading. Parameters ---------- bucket_fpaths : list List of bucket filepaths to download. local_fpath : list List of filepaths where to save data on local storage. n_threads : int, optional Number of files to be downloaded concurrently. The default is 10. The max value is set automatically to 50. Returns ------- List of cloud bucket filepaths which were not downloaded. """ # Check n_threads n_threads = max(n_threads, 1) n_threads = min(n_threads, 50) ##------------------------------------------------------------------------. # Initialize progress bar if progress_bar: n_files = len(local_fpaths) pbar = tqdm(total=n_files) with ThreadPoolExecutor(max_workers=n_threads) as executor: dict_futures = { executor.submit(fs.get, bucket_path, local_fpath): bucket_path for bucket_path, local_fpath in zip(bucket_fpaths, local_fpaths, strict=False) } # List files that didn't work l_file_error = [] for future in concurrent.futures.as_completed(dict_futures.keys()): # Update the progress bar if progress_bar: pbar.update(1) # Collect all commands that caused problems if future.exception() is not None: l_file_error.append(dict_futures[future]) if progress_bar: pbar.close() ##------------------------------------------------------------------------. # Return list of bucket fpaths raising errors return l_file_error
[docs] def get_end_of_day(time): """Get datetime end of the day.""" time_end_of_day = time + datetime.timedelta(days=1) time_end_of_day = time_end_of_day.replace(hour=0, minute=0, second=0) return time_end_of_day
[docs] def get_start_of_day(time): """Get datetime start of the day.""" time_start_of_day = time time_start_of_day = time_start_of_day.replace(hour=0, minute=0, second=0) return time_start_of_day
[docs] def get_list_daily_time_blocks(start_time, end_time): """Return a list of (start_time, end_time) tuple of daily length.""" # Retrieve timedelta between start_time and end_time dt = end_time - start_time # If less than a day if dt.days == 0: return [(start_time, end_time)] # Otherwise split into daily blocks (first and last can be shorter) start_of_end_time = get_start_of_day(end_time) end_of_start_time = get_end_of_day(start_time) # Define list of daily blocks l_steps = pd.date_range(end_of_start_time, start_of_end_time, freq="1D", inclusive="both") l_steps = l_steps.to_pydatetime().tolist() l_steps.insert(0, start_time) l_steps.append(end_time) l_daily_blocks = [(l_steps[i], l_steps[i + 1]) for i in range(0, len(l_steps) - 1)] l_daily_blocks = [ (s, e) for s, e in l_daily_blocks if ((s - e) != datetime.timedelta(0)) ] # case when end_time is 00:00 return l_daily_blocks
####---------------------------------------------------------------------------. @print_elapsed_time def download_files( network, radar, start_time, end_time, product=None, n_threads=20, force_download=False, check_data_integrity=True, progress_bar=True, verbose=True, base_dir=None, protocol="s3", fs_args={}, ): """ Download files from a cloud bucket storage. Parameters ---------- radar : str The name of the radar. Use `radar_api.available_radars()` to retrieve the available satellites. network : str The name of the radar network. See `radar_api.available_network()` for available radar networks. product: str The product acronym. The default is None. It must be specified if for a given network, multiple products are available through radar_api. See `radar_api.available_products(network)` for available products. start_time : datetime.datetime The start (inclusive) time of the interval period for retrieving the filepaths. end_time : datetime.datetime The end (exclusive) time of the interval period for retrieving the filepaths. base_dir : str, optional The path to the directory where to store GOES data. If None, it use the one specified in the GOES-API config file. The default is None. fs_args : dict, optional Dictionary specifying optional settings to initiate the fsspec.filesystem. The default is an empty dictionary. Anonymous connection is set by default. n_threads: int Number of files to be downloaded concurrently. The default is 20. The max value is set automatically to 50. force_download: bool If True, it downloads and overwrites the files already existing on local storage. If False, it does not downloads files already existing on local storage. The default is False. check_data_integrity: bool If True, it checks that the downloaded files are not corrupted. Corruption is assessed by comparing file size between local and cloud bucket storage. The default is True. progress_bar: bool If True, it displays a progress bar showing the download status. The default is True. verbose : bool, optional If True, it print some information concerning the download process. The default is False. """ # -------------------------------------------------------------------------. # Get default directory base_dir = get_base_dir(base_dir) # Checks check_download_protocol(protocol) base_dir = check_base_dir(base_dir) network = check_network(network) radar = check_radar(radar=radar, network=network) product = check_product(network=network, product=product) start_time, end_time = check_start_end_time(start_time, end_time) # Initialize timing t_i = time.time() # -------------------------------------------------------------------------. # Get filesystem fs = get_filesystem(protocol=protocol, fs_args=fs_args) # Define list of daily time blocks (start_time, end_time) time_blocks = get_list_daily_time_blocks(start_time, end_time) if verbose: print("-------------------------------------------------------------------- ") print(f"Starting downloading {network.upper()} {radar} data between {start_time} and {end_time}.") # Loop over daily time blocks (to search for data) list_all_local_fpaths = [] list_all_bucket_fpaths = [] n_downloaded_files = 0 n_existing_files = 0 n_total_files = 0 for start_time, end_time in time_blocks: # Retrieve bucket fpaths bucket_fpaths = find_files( protocol=protocol, fs_args=fs_args, radar=radar, network=network, product=product, start_time=start_time, end_time=end_time, base_dir=None, verbose=False, ) # Check there are files to retrieve n_files = len(bucket_fpaths) n_total_files += n_files if n_files == 0: continue # Define local destination fpaths local_fpaths = _get_local_from_bucket_fpaths( base_dir=base_dir, network=network, radar=radar, product=product, bucket_fpaths=bucket_fpaths, ) # Record the local and bucket fpath queried list_all_local_fpaths = list_all_local_fpaths + local_fpaths list_all_bucket_fpaths = list_all_bucket_fpaths + bucket_fpaths # Optionally exclude files that already exist on disk if not force_download: local_fpaths, bucket_fpaths = _select_missing_fpaths( local_fpaths=local_fpaths, bucket_fpaths=bucket_fpaths, ) # Update count of existing files on disk n_existing_files += n_files - len(bucket_fpaths) # Check there are still files to retrieve n_files = len(local_fpaths) n_downloaded_files += n_files if n_files == 0: continue # Create local directories create_local_directories(local_fpaths) # Print # files to download if verbose: print(f" - Downloading {n_files} files from {start_time} to {end_time}") # Download data asynchronously with multithreading l_bucket_errors = _fs_get_parallel( bucket_fpaths=bucket_fpaths, local_fpaths=local_fpaths, fs=fs, n_threads=n_threads, progress_bar=progress_bar, ) # Report errors if occurred if verbose: n_errors = len(l_bucket_errors) if n_errors > 0: print(f" - Unable to download the following files: {l_bucket_errors}") # Report the total number of file downloaded if verbose: t_f = time.time() t_elapsed = round(t_f - t_i) if not force_download and n_existing_files > 0: print( f" - {n_existing_files}/{n_total_files} files were already present on disk !", ) if n_downloaded_files > 0: print( f" - {n_downloaded_files}/{n_total_files} files have been downloaded in {t_elapsed} seconds !", ) print("-------------------------------------------------------------------- ") # Check for data corruption if check_data_integrity: if verbose: print("Checking data integrity:") list_all_local_fpaths, _ = remove_corrupted_files( list_all_local_fpaths, list_all_bucket_fpaths, fs=fs, return_corrupted_fpaths=False, ) if verbose: n_corrupted = len(list_all_bucket_fpaths) - len(list_all_local_fpaths) print(f" - {n_corrupted} corrupted files were identified and removed.") print( "--------------------------------------------------------------------", ) # Return list of local fpaths return sorted(list_all_local_fpaths) ####---------------------------------------------------------------------------.