Source code for phidown.s5cmd_utils

import subprocess
import os
from typing import Optional, List, Sequence, Union
import configparser
import logging
import shlex
import time
import threading
from tqdm import tqdm

# Configure logging
logging.basicConfig(level=logging.INFO)
[docs] logger = logging.getLogger(__name__)
[docs] def _strip_wrapping_quotes(value: str) -> str: """Remove matching wrapping single/double quotes from a token.""" if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'): return value[1:-1] return value
[docs] def _split_command_args(command: str, platform_name: Optional[str] = None) -> List[str]: """Split an s5cmd command string into args with platform-aware parsing. On Windows (``nt``), ``shlex`` with ``posix=False`` correctly handles backslashes in paths. We then strip wrapping quotes from each token. """ effective_platform = platform_name or os.name if effective_platform == "nt": return [_strip_wrapping_quotes(arg) for arg in shlex.split(command, posix=False)] return shlex.split(command)
[docs] def run_s5cmd_with_config( command: Union[str, Sequence[str]], config_file: str = '.s5cfg', endpoint_url: Optional[str] = None, verbose: bool = False, ) -> str: """Run s5cmd command with configuration file. This function executes s5cmd commands using credentials from a configuration file and handles endpoint URL configuration for the Copernicus Data Space. Args: command: The s5cmd command to execute (without 's5cmd' prefix). Can be a shell-like string (e.g. ``"cp s3://... /tmp/out/"``) or a sequence of already-split args (e.g. ``["cp", "s3://...", "/tmp/out/"]``). config_file: Path to s5cmd configuration file (default: '.s5cfg') endpoint_url: Optional endpoint URL override verbose: Whether to print command being executed Returns: str: Command output as string Raises: subprocess.CalledProcessError: If command fails FileNotFoundError: If config file is not found Example: >>> output = run_s5cmd_with_config('ls s3://eodata/Sentinel-1/') """ if isinstance(command, str): if not command.strip(): raise ValueError('Command cannot be empty') command_args = _split_command_args(command) else: command_args = [str(arg) for arg in command] if not command_args or not any(arg.strip() for arg in command_args): raise ValueError('Command cannot be empty') # Parse the config file config = configparser.ConfigParser() if not os.path.exists(config_file): raise FileNotFoundError(f'Configuration file {config_file} not found') config.read(config_file) # Set environment variables from config env = os.environ.copy() if 'default' in config: default_section = config['default'] env['AWS_ACCESS_KEY_ID'] = default_section.get('aws_access_key_id', '').strip("'\"") env['AWS_SECRET_ACCESS_KEY'] = default_section.get('aws_secret_access_key', '').strip("'\"") env['AWS_DEFAULT_REGION'] = default_section.get('aws_region', 'us-east-1').strip("'\"") # Build command cmd_parts = ['s5cmd'] # Add endpoint URL if endpoint_url: cmd_parts.extend(['--endpoint-url', endpoint_url]) elif 'default' in config and 'host_base' in config['default']: host_base = config['default']['host_base'].strip("'\"") use_https = config['default'].get('use_https', 'true').strip("'\"").lower() == 'true' protocol = 'https' if use_https else 'http' cmd_parts.extend(['--endpoint-url', f'{protocol}://{host_base}']) cmd_parts.extend(command_args) if verbose: logger.info(f'Running command: {" ".join(cmd_parts)}') # Run the command and stream output so the user can see progress live # We collect stdout lines and also log them in real time. process = subprocess.Popen( cmd_parts, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env ) stdout_lines: List[str] = [] try: if process.stdout is None: raise RuntimeError('Failed to capture process stdout') for line in iter(process.stdout.readline, ''): # strip trailing newlines but preserve message text_line = line.rstrip('\n') if text_line: if verbose: logger.info(text_line) stdout_lines.append(text_line) returncode = process.wait() if returncode != 0: # Join collected output for better error context combined = "\n".join(stdout_lines) if verbose: logger.error(f'Command exited with non-zero status {returncode}. ' f'Output:\n{combined}') raise subprocess.CalledProcessError(returncode, cmd_parts, output=combined) return "\n".join(stdout_lines) except Exception: # If something goes wrong, ensure process is terminated try: process.kill() except Exception: pass raise
[docs] def get_directory_size(directory: str) -> int: """Calculate total size of all files in a directory recursively. Args: directory: Path to the directory Returns: int: Total size in bytes """ total_size = 0 try: for dirpath, dirnames, filenames in os.walk(directory): for filename in filenames: filepath = os.path.join(dirpath, filename) if os.path.exists(filepath): total_size += os.path.getsize(filepath) except Exception as e: logger.warning(f'Error calculating directory size: {e}') return total_size
[docs] def pull_down( s3_path: str, output_dir: str = '.', config_file: str = '.s5cfg', endpoint_url: str = 'https://eodata.dataspace.copernicus.eu', download_all: bool = True, reset: bool = False, total_size: Optional[int] = None, show_progress: bool = True ) -> bool: """Download Sentinel-1 SAFE directory from Copernicus Data Space. This function downloads either individual files or entire SAFE directories from the Copernicus Data Space Ecosystem using the optimized s5cmd tool. Optionally displays a progress bar based on downloaded file size. Args: s3_path: S3 path to the Sentinel-1 data (should start with /eodata/) output_dir: Local output directory for downloaded files config_file: Path to s5cmd configuration file endpoint_url: Copernicus Data Space endpoint URL download_all: If True, downloads entire directory with wildcard pattern reset: If True, prompts for new AWS credentials and resets config file total_size: Expected total size in bytes (for progress bar) show_progress: If True and total_size provided, shows tqdm progress bar Returns: bool: True if download was successful Raises: subprocess.CalledProcessError: If download fails ValueError: If s3_path format is invalid Example: >>> # Download entire SAFE directory with progress bar >>> output = pull_down( ... '/eodata/Sentinel-1/SAR/IW_RAW__0S/2024/05/03/' ... 'S1A_IW_RAW__0SDV_20240503T031926_20240503T031942_053701_0685FB_E003.SAFE', ... output_dir='/path/to/data', ... total_size=1073741824, # 1 GB ... show_progress=True ... ) Notes: - s5cmd is executed as a subprocess and its stdout/stderr are streamed in real time to the logger. This means when `pull_down` runs you will see file copy progress and status messages as they happen. - Ensure `s5cmd` is installed and available on PATH. There is no additional environment variable required to enable streaming; it's handled by this function. - Progress bar monitors actual disk usage and updates in real-time. """ if not s3_path: raise ValueError('S3 path cannot be empty') if not output_dir: raise ValueError('Output directory arg cannot be empty') if not os.path.isabs(output_dir): raise ValueError('Output directory must be an absolute path') # validate config file: # try to create one config file if it does not exist if not os.path.exists(config_file) or reset: access_key = input('Enter Access Key ID: ').strip() secret_key = input('Enter Secret Access Key: ').strip() config_content = f"""[default] aws_access_key_id = {access_key} aws_secret_access_key = {secret_key} aws_region = eu-central-1 host_base = eodata.dataspace.copernicus.eu host_bucket = eodata.dataspace.copernicus.eu use_https = true check_ssl_certificate = true """ with open(config_file, 'w') as f: f.write(config_content) logger.info(f'Created configuration file: {config_file}') if not os.path.exists(config_file): raise FileNotFoundError(f'Configuration file {config_file} still not found.') if not s3_path.startswith('/eodata/'): raise ValueError(f'S3 path must start with /eodata/, got: {s3_path}') # Create output directory with SAFE name safe_name = os.path.basename(s3_path.rstrip('/')) full_output_dir = os.path.join(output_dir, safe_name) os.makedirs(full_output_dir, exist_ok=True) # Construct proper S3 URL if download_all and not s3_path.endswith('/*'): # For directory download, add wildcard s3_url = f's3:/{s3_path}/*' else: s3_url = f's3:/{s3_path}' # Build cp command as argument list so paths with spaces work across platforms. command = ['cp', s3_url, f'{full_output_dir}/'] if show_progress and total_size and total_size > 0: # Suppress info logging when progress bar is shown pass else: logger.info(f'Downloading from: {s3_url}') logger.info(f'Output directory: {full_output_dir}') # If progress bar is enabled and total_size is provided if show_progress and total_size and total_size > 0: # Get initial size initial_size = get_directory_size(full_output_dir) # Create progress bar pbar = tqdm( total=total_size, initial=initial_size, unit='B', unit_scale=True, unit_divisor=1024, desc='Downloading' ) # Flag to stop monitoring thread stop_monitoring = threading.Event() def monitor_progress(): """Monitor download progress by checking disk usage.""" last_size = initial_size while not stop_monitoring.is_set(): time.sleep(0.5) # Update every 0.5 seconds current_size = get_directory_size(full_output_dir) delta = current_size - last_size if delta > 0: pbar.update(delta) last_size = current_size # Start monitoring thread monitor_thread = threading.Thread(target=monitor_progress, daemon=True) monitor_thread.start() try: run_s5cmd_with_config( command=command, config_file=config_file, endpoint_url=endpoint_url, verbose=False # Reduce logging noise when using progress bar ) finally: # Stop monitoring and ensure final update stop_monitoring.set() monitor_thread.join(timeout=2) # Final size check final_size = get_directory_size(full_output_dir) remaining = final_size - pbar.n if remaining > 0: pbar.update(remaining) pbar.close() else: # No progress bar - use original implementation run_s5cmd_with_config( command=command, config_file=config_file, endpoint_url=endpoint_url ) return True