try:
from pyraws.sys_cfg import PYRAWS_HOME_PATH, DATA_PATH
except: # noqa: E722
raise ValueError(
"sys_cfg.py not found. Please, refer to README.md for instructions on how to generate it."
)
import os
import pandas as pd
from termcolor import colored
import csv
from pyraws.utils.constants import DATABASE_FILE_DICTIONARY, BAND_NAMES_REAL_ORDER
from pathlib import Path
import xml.etree.ElementTree as ET
from shapely.geometry import Polygon
from ast import literal_eval
[docs]class DatabaseHandler:
"""Creates an Raw-L1C csv database for the APIs."""
def __init__(self, db_name, datapath=DATA_PATH):
"""
Initializes the Raw-L1C database.
Args:
db_name (str): The name of the dataset as named in the data folder.
datapath (str, optional): Paht to the folder where the database is located.
"""
self.datapath = Path(datapath) / db_name
self.fetcher()
columns = [
"ID_event",
"Start",
"End",
"Sat",
"class",
"Polygon",
"Raw_useful_granules",
"Raw_complementary_granules",
"raw_files",
"l1c_files",
"bbox_list",
]
self.db = pd.DataFrame({x: [] for x in columns})
[docs] def fetcher(self):
"""Fetches the products in the the database."""
self.Raw = self.datapath / "raw"
try:
self.L1 = self.datapath / "l1c"
self.L1_products = [x for x in self.L1.iterdir() if x.is_dir()]
print("Found L1c products: ", len(self.L1_products))
except: # noqa: E722
print(
"L1 folder not found. If this behaviour is not desidered, please, refer to README.md for instructions."
)
print("Fetching the database...")
self.Raw_products = [x for x in self.Raw.iterdir() if x.is_dir()]
assert len(self.Raw_products) > 0, "No Raw products found. Aborting..."
print("Completed.")
[docs] def single_parser(self, raw_Event_folderpath):
"""
Parses a single Raw product.
Args:
raw_Event_folderpath (str): datapath of the Raw event.
"""
id_event = raw_Event_folderpath.name
def gain_xml(prod_path: Path):
"""
Helper function to get an xml file.
Args:
prod_path (str): datapath of the Raw event.
Returns:
Invetory Metadata xml path of one of the Raw granules.
"""
xmls = [
x
for x in prod_path.glob("**/*")
if x.name.endswith("Inventory_Metadata.xml")
]
return xmls[0]
def meta_extract(xml_path):
"""
Helper function to get an xml file.
Args:
xml_path (str): datapath of the Raw event.
Returns:
meta (dict): metadata dictionary containing useful information for the event.
"""
tree = ET.parse(xml_path)
root = tree.getroot()
keys = [x for x in root.attrib.keys()]
# Find the xmlns attribute
xsi_xmlns = "{" + root.attrib[keys[0]] + "}"
info = {}
for child in root.iter():
for query in ["Satellite_Code"]:
if child.tag.removeprefix(xsi_xmlns) == query:
info[query] = child.text
try:
geo_localization = root.find(f".//{xsi_xmlns}Geographic_Localization")
points = geo_localization.findall(f".//{xsi_xmlns}Geo_Pnt")
# extract latitude and longitude from each point
coordinates = []
for point in points:
latitude = point.find(f"{xsi_xmlns}LATITUDE").text
longitude = point.find(f"{xsi_xmlns}LONGITUDE").text
coordinates.append([float(longitude), float(latitude)])
polygon = Polygon(coordinates)
info["polygon"] = polygon.wkt
except AttributeError:
info["polygon"] = None
return info
xml_path = gain_xml(raw_Event_folderpath)
meta = meta_extract(xml_path)
raw_files = [x.name for x in raw_Event_folderpath.iterdir()]
l1c_files = self.L1_products
l1c_event_folder = [x for x in l1c_files if x.name == id_event]
if len(l1c_event_folder) > 0:
l1c_subfiles = [x for x in l1c_event_folder[0].iterdir() if x.is_dir()]
else:
l1c_subfiles = []
return {
"event": f"{raw_Event_folderpath.name}",
"l1c_files": l1c_subfiles,
"raw_files": raw_files,
"Satellite": meta["Satellite_Code"],
"poly": meta["polygon"],
}
[docs] def parser(self):
"""Parses all the Raw products and creates the database."""
for idx, raw_path in enumerate(self.Raw_products):
meta = self.single_parser(raw_path)
data = {
"ID_event": meta["event"],
"Start": None,
"End": None,
"Sat": meta["Satellite"],
"class": None,
"Polygon": meta["poly"],
"Raw_useful_granules": None,
"Raw_complementary_granules": None,
"Raw_files": meta["raw_files"],
"l1c_files": meta["l1c_files"],
"bbox_list": None,
}
new_row = pd.DataFrame.from_dict(data, orient="index")
new_row = new_row.transpose()
self.db = pd.concat([self.db, new_row])
self.db = self.db.reset_index(drop=True)
[docs]def get_cfg_file_dict():
"""Returns a dictionary containing paths to the different pyraws directories.
Returns:
dict: cfg file dict.
"""
PYRAWS_PACKAGE_PATH = os.path.join(PYRAWS_HOME_PATH, "pyraws")
SCRIPTS_PATH = os.path.join(PYRAWS_PACKAGE_PATH, "scripts")
NOTEBOOKS_PATH = os.path.join(PYRAWS_PACKAGE_PATH, "notebooks")
SCRIPTS_PATH = os.path.join(PYRAWS_PACKAGE_PATH, "scripts")
DATABASE_PATH = os.path.join(PYRAWS_PACKAGE_PATH, "database")
GEOIDS_PATH = os.path.join(PYRAWS_PACKAGE_PATH, "geoids")
paths_list = [
SCRIPTS_PATH,
PYRAWS_PACKAGE_PATH,
PYRAWS_HOME_PATH,
DATA_PATH,
NOTEBOOKS_PATH,
SCRIPTS_PATH,
DATABASE_PATH,
GEOIDS_PATH,
]
cfg_dir_list = [
"scripts",
"PYRAWS_pkg",
"pyraws",
"data",
"notebooks",
"scripts",
"database",
"geoids",
]
return dict(zip(cfg_dir_list, paths_list))
[docs]def get_raw_shift_lut(
satellite, detector_number, downsampling=True, cfg_file_dict=None
):
"""Get Raw shift LUT.
Args:
satellite (str, optional): "S2A" or "S2B" respectively for "Sentinel-2A" data and "Sentinel-2B" data.
detector_number (int): Detectorn number.
downsampling (boolean, optional): if True, shift values for downsampled bands of the chosen satellite are used.
Otherwise, values for upsampled bands are used. Defaults to True.
cfg_file_dict (dict, optional): cfg_file_dict (dict, optional): dictionary containing paths to
the different pyraws directories. If None, internal CSV database will be parsed.
Defaults to None.
Returns:
dict: returns the Raw shift LUT.
"""
if cfg_file_dict is None:
cfg_file_dict = get_cfg_file_dict()
csv_path = os.path.join(cfg_file_dict["database"], "shift_lut.csv")
lut_df = pd.read_csv(csv_path)
lut_df = lut_df[lut_df["satellite"] == satellite]
if downsampling:
lut_df = lut_df[lut_df["registration_mode"] == "downsampling"]
else:
lut_df = lut_df[lut_df["registration_mode"] == "upsampling"]
lut_df = lut_df[lut_df["detector"] == detector_number]
BAND_SHIFTS_NAMES = [
"S08_02",
"S03_08",
"S10_03",
"S04_10",
"S05_04",
"S11_05",
"S06_11",
"S07_06",
"S8A_07",
"S12_8A",
"S01_12",
"S09_01",
]
return dict(
zip(
BAND_NAMES_REAL_ORDER[1:] + [BAND_NAMES_REAL_ORDER[0]],
[
[
-float(
lut_df[band_name]
.iloc[0]
.replace("[", "")
.replace("]", "")
.split(",")[0]
),
-float(
lut_df[band_name]
.iloc[0]
.replace("[", "")
.replace("]", "")
.split(",")[1]
),
]
for band_name in BAND_SHIFTS_NAMES
]
+ [0],
)
)
[docs]def get_id_raw_l1_dict(database="THRAWS", cfg_file_dict=None):
"""Returns a dictionary containing raw (directory name), l1c (product ID and correspondent granule) and class,
raw useful and complementary tiles.
Args:
database (string, optional): database name. Defaults to ""THRAWS"".
cfg_file_dict (dict, optional): dictionary containing paths to the different pyraws directories.
If None, internal CSV database will be parsed.
Returns:
dict: label_raw-l1_dict
"""
if cfg_file_dict is None:
cfg_file_dict = get_cfg_file_dict()
image_csv_path = os.path.join(
cfg_file_dict["database"], DATABASE_FILE_DICTIONARY[database]
)
try:
l1c_name_list = []
class_list = []
id_list = []
raw_name_list = []
raw_useful_granules_list = []
raw_complementary_granules_list = []
event_coordinates_list = []
requested_polygon_list = []
bbox_event_list = []
with open(image_csv_path, mode="r", encoding="utf-8-sig") as csv_file:
csv_reader = csv.DictReader(csv_file)
for row in csv_reader:
id_list.append(row["ID_event"])
l1c_name_list.append(row["ID_event"])
class_list.append(row["class"])
raw_name_list.append(row["ID_event"])
raw_useful_granules_list.append(row["Raw_useful_granules"])
raw_complementary_granules_list.append(
row["Raw_complementary_granules"]
)
event_coordinates_list.append(row["Coords (Lat, Lon)"])
polygon = row["Polygon"]
bbox_event_list.append(row["bbox_list"])
try:
polygon = polygon[10:-2].split(",")
requested_polygon_list.append(
[
[
float(polygon[0].split(" ")[0]),
float(polygon[0].split(" ")[1]),
]
]
+ [
[float(x[1:].split(" ")[0]), float(x[1:].split(" ")[1])]
for x in polygon[1:]
]
)
except: # noqa: E722
requested_polygon_list.append([[None, None]])
except: # noqa: E722
print(
colored("ERROR", "red")
+ ". Impossible to parse the file: "
+ colored(image_csv_path, "blue")
+ "."
)
return
#
id_raw_l1_dict_list = []
for n in range(len(id_list)):
id_raw_l1_dict_list.append(
{
"raw": raw_name_list[n],
"l1c": l1c_name_list[n],
"class": class_list[n],
"raw_useful_granules": raw_useful_granules_list[n],
"raw_complementary_granules": raw_complementary_granules_list[n],
"events_coords": event_coordinates_list[n],
"requested_polygon": requested_polygon_list[n],
"bbox_list": bbox_event_list[n],
}
)
return dict(zip(id_list, id_raw_l1_dict_list))
[docs]def get_event_granule_bb_dict(event_id, database="THRAWS", cfg_file_dict=None):
"""Function to extract the dictionary {useful_granule : bounding_box_list} from the bbox_str got from the database.
Args:
event_id (string): event ID.
database (string, optional): database name. Defaults to ""THRAWS"".
cfg_file_dict (dict, optional): dictionary containing paths to the different pyraws directories.
If None, internal CSV database will be parsed.
Returns:
dict: {useful_granule : bounding_box_list} for the requested event.
"""
event_dict = get_id_raw_l1_dict(database=database, cfg_file_dict=cfg_file_dict)[
event_id
]
raw_useful_granules_bb_dict = literal_eval(event_dict["bbox_list"])
return raw_useful_granules_bb_dict
[docs]def get_events_list(database="THRAWS", cfg_file_dict=None):
"""Returns the list of events in the database.
Args:
database (string, optional): database name. Defaults to ""THRAWS"".
cfg_file_dict (dict, optional): dictionary containing paths to the different pyraws directories.
If None, internal CSV database will be parsed.
Returns:
str: list of events.
"""
my_dict = get_id_raw_l1_dict(database=database, cfg_file_dict=cfg_file_dict)
return list(my_dict.keys())
[docs]def get_event_info(
event_id, cfg_file_dict=None, id_raw_l1_dict=None, database="THRAWS"
):
"""From the event_ID, it returns raw directory path, the path to the l1 image, to the L1C auxiliary file,
and the class of the image. If no `id_raw_l1_dict`is provided, it is done by parsing the internal CSV database file.
Args:
event_id (string): image name.
cfg_file_dict (dict, optional): dictionary containing paths to the different pyraws directories.
If None, internal CSV database will be parsed.
id_raw_l1_dict (dict, optional): id-raw-l1 dictionary. If None, internal CSV database will be parsed.
database (string, optional): database name. Defaults to "THRAWS".
Raises:
ValueError: If the image is not in the database.
Returns:
string: path to the Raw image.
string: path to the L1 image.
string: path to the post-processed L1C tiff file.
string: expected class.
list: list of raw useful tiles.
list: list of complementary raw tiles (to coregister without 0).
dict: event coordinates dict {"lat" : lat, "lon" : lon}.
list: list of the requested polygon coordinates.
"""
def parse_string(str, skip_marks=True, return_int=False):
str_tiles = []
str = str.replace(" ", "")
if not (len(str)):
return [""]
last_char_pos = 0
while str[last_char_pos:].find(",") != -1:
comma_pos = str[last_char_pos:].find(",") + last_char_pos
if skip_marks:
str_tile = str[last_char_pos + 1 : comma_pos - 1]
else:
str_tile = str[last_char_pos:comma_pos]
if return_int:
if str_tile == "None":
str_tiles.append(None)
else:
str_tiles.append(int(str_tile))
else:
str_tiles.append(str_tile)
last_char_pos = comma_pos + 1
if skip_marks:
str_tile = str[last_char_pos + 1 : -1]
else:
str_tile = str[last_char_pos:]
if return_int:
if str_tile == "None":
str_tiles.append(None)
else:
str_tiles.append(int(str_tile))
else:
str_tiles.append(str_tile)
return str_tiles
if cfg_file_dict is None:
cfg_file_dict = get_cfg_file_dict()
if id_raw_l1_dict is None:
id_raw_l1_dict = get_id_raw_l1_dict(database, cfg_file_dict)
try:
event_class = id_raw_l1_dict[event_id]["class"]
l1c_dir_name = id_raw_l1_dict[event_id]["l1c"]
raw_dir_name = id_raw_l1_dict[event_id]["raw"]
raw_useful_granules_str = id_raw_l1_dict[event_id]["raw_useful_granules"]
raw_complementary_granules_str = id_raw_l1_dict[event_id][
"raw_complementary_granules"
]
except: # noqa: E722
raise ValueError(
colored("ERROR", "red")
+ ". The image: "
+ colored(event_id, "blue")
+ " is not in the database "
+ colored(
os.path.join(cfg_file_dict["database"], database + ".csv"), "blue"
)
+ "."
)
if (raw_useful_granules_str is not None) and (len(raw_useful_granules_str) != 0):
raw_useful_granules_str = (
raw_useful_granules_str[1:-1].replace(" ", "").split("],")
)
if not ("[" in raw_useful_granules_str) and not (
"]" in raw_useful_granules_str
): # Single granules list
raw_useful_granules = [
int(x[0]) if ((x[0] is not None) and (x[0] != "None")) else [None]
for x in [x.split(",") for x in raw_useful_granules_str]
]
else:
raw_useful_granules = [
[int(x), int(y)] if (x != "None") and (y != "None") else [None, None]
for [x, y] in [
x.replace("[", "").replace("]", "").split(",")
for x in raw_useful_granules_str
]
]
else:
raw_useful_granules = []
if raw_complementary_granules_str is not None:
raw_complementary_granules_str = raw_complementary_granules_str[1:-1]
raw_img_path = os.path.join(cfg_file_dict["data"], database, "raw", raw_dir_name)
l1_img_path = os.path.join(cfg_file_dict["data"], database, "l1c", l1c_dir_name)
l1c_post_processed_path = os.path.join(
cfg_file_dict["data"], database, "l1c", "l1c_cropped_tif", event_id
)
raw_complementary_granules = parse_string(
raw_complementary_granules_str, skip_marks=False, return_int=True
)
coords = id_raw_l1_dict[event_id]["events_coords"]
requested_polygon = id_raw_l1_dict[event_id]["requested_polygon"]
return (
raw_img_path,
l1_img_path,
l1c_post_processed_path,
event_class,
raw_useful_granules,
raw_complementary_granules,
{
"lat": float(coords[1:].split(",")[0]),
"lon": float(coords[:-1].split(",")[1]),
},
requested_polygon,
)