Source code for disdrodb.data_transfer.upload_data

import os
import click

from typing import Optional, List

from disdrodb.api.metadata import _read_yaml_file, get_list_metadata, _write_yaml_file
from disdrodb.utils.zenodo import _create_zenodo_deposition, _upload_file_to_zenodo
from disdrodb.utils.compression import _zip_dir


[docs]def click_upload_option(function: object): """Click command line options for DISDRODB archive upload transfer. Parameters ---------- function: object Function. """ function = click.option( "--data_sources", type=str, show_default=True, default="", help="""Data source folder name (eg: EPFL). If not provided (None), all data sources will be uploaded. Multiple data sources can be specified by separating them with spaces. """, )(function) function = click.option( "--campaign_names", type=str, show_default=True, default="", help="""Name of the campaign (eg: EPFL_ROOF_2012). If not provided (None), all campaigns will be uploaded. Multiple campaign names can be specified by separating them with spaces. """, )(function) function = click.option( "--station_names", type=str, show_default=True, default="", help="""Station name. If not provided (None), all stations will be uploaded. Multiple station names can be specified by separating them with spaces. """, )(function) function = click.option( "--platform", type=click.Choice(["zenodo"], case_sensitive=False), show_default=True, default="", help="Name of remote platform. If not provided (None), the default platform is Zenodo.", )(function) function = click.option( "-f", "--force", type=bool, show_default=True, default=True, help="Force uploading even if data already exists on another remote location.", )(function) return function
def _filter_already_uploaded(metadata_fpaths: List[str]) -> List[str]: """Filter metadata files that already have a remote url specified.""" filtered = [] for metadata_fpath in metadata_fpaths: metadata_dict = _read_yaml_file(metadata_fpath) if metadata_dict.get("data_url"): print(f"{metadata_fpath} already has a remote url specified. Skipping.") continue filtered.append(metadata_fpath) return filtered def _upload_data_to_zenodo(metadata_fpaths: List[str], sandbox: bool = False) -> None: """Upload data to Zenodo. Parameters ---------- metadata_fpaths: list of str List of metadata file paths. sandbox: bool If True, upload to Zenodo sandbox for testing purposes. """ deposition_id, bucket_url = _create_zenodo_deposition(sandbox) zenodo_host = "sandbox.zenodo.org" if sandbox else "zenodo.org" deposition_url = f"https://{zenodo_host}/deposit/{deposition_id}" print(f"Zenodo deposition created: {deposition_url}.") for metadata_fpath in metadata_fpaths: remote_path = _upload_station_data_to_zenodo(metadata_fpath, bucket_url) _update_metadata_with_zenodo_url(metadata_fpath, deposition_id, remote_path, sandbox) print("Data uploaded. Please review your deposition an publish it when ready.") def _generate_data_remote_path(metadata_fpath: str) -> str: """Generate data remote path from a metadata path. metadata_fpath has the form "disdrodb_dir/Raw/data_source/campaign_name/metadata/station_name.yml". The remote path has the form "data_source/campaign_name/station_name". Parameters ---------- metadata_fpath: str Metadata file path. """ remote_path = os.path.normpath(metadata_fpath) # Remove up to "Raw/" remote_path = remote_path.split("Raw" + os.sep)[1] # Remove "/metadata" remote_path = remote_path.replace(os.sep + "metadata", "") # Remove trailing ".yml" remote_path = os.path.splitext(remote_path)[0] return remote_path def _upload_station_data_to_zenodo(metadata_fpath: str, bucket_url: str) -> str: """Zip and upload station data to Zenodo. Update the metadata file with the remote url, and zip the data directory before uploading. Parameters ---------- metadata_fpath: str Metadata file path. bucket_url: str Zenodo bucket url. """ remote_path = _generate_data_remote_path(metadata_fpath) remote_url = f"{bucket_url}/{remote_path}.zip" temp_zip_path = _archive_station_data(metadata_fpath) _upload_file_to_zenodo(temp_zip_path, remote_url) os.remove(temp_zip_path) return remote_path def _archive_station_data(metadata_fpath: str) -> str: """Archive station data. Parameters ---------- metadata_fpath: str Metadata file path. """ data_path = metadata_fpath.replace("metadata", "data") data_path = os.path.splitext(data_path)[0] # remove trailing ".yml" temp_zip_path = _zip_dir(data_path) return temp_zip_path def _update_metadata_with_zenodo_url( metadata_fpath: str, deposition_id: int, remote_path: str, sandbox: bool = False ) -> None: """Update metadata with Zenodo zip file url. Parameters ---------- metadata_fpath: str Metadata file path. deposition_id: int Zenodo deposition id. remote_path: str Remote path of the zip file. sandbox: bool If True, set reference to Zenodo sandbox for testing purposes. """ zenodo_host = "sandbox.zenodo.org" if sandbox else "zenodo.org" metadata_dict = _read_yaml_file(metadata_fpath) metadata_dict["data_url"] = f"https://{zenodo_host}/record/{deposition_id}/files/{remote_path}.zip" _write_yaml_file(metadata_dict, metadata_fpath)
[docs]def upload_disdrodb_archives( platform: Optional[str] = None, force: bool = False, **kwargs, ) -> None: """Find all stations containing local data and upload them to a remote repository. Parameters ---------- platform: str, optional Name of the remote platform. If not provided (None), the default platform is Zenodo. The default is platform=None. force: bool, optional If True, upload even if data already exists on another remote location. The default is force=False. Other Parameters ---------------- disdrodb_dir: str, optional DisdroDB data folder path. Must end with DISDRODB. data_sources: str or list of str, optional Data source folder name (eg: EPFL). If not provided (None), all data sources will be uploaded. The default is data_source=None. campaign_names: str or list of str, optional Campaign name (eg: EPFL_ROOF_2012). If not provided (None), all campaigns will be uploaded. The default is campaign_name=None. station_names: str or list of str, optional Station name. If not provided (None), all stations will be uploaded. The default is station_name=None. """ metadata_fpaths = get_list_metadata( **kwargs, with_stations_data=True, ) if not force: metadata_fpaths = _filter_already_uploaded(metadata_fpaths) if len(metadata_fpaths) == 0: print("There is no data fulfilling the criteria.") return if platform == "zenodo": _upload_data_to_zenodo(metadata_fpaths) elif platform == "sandbox.zenodo": # Only for testing purposes, not available through CLI _upload_data_to_zenodo(metadata_fpaths, sandbox=True)