Source code for pyeogpr.gee_processing

import requests
import datetime
import ee
import os
import importlib
import sys

[docs] class EarthEngine: """ pyeogpr.EarthEngine -------------------- projectID : string Your GEE projectID. Usually starts with "ee-...". You can find it next to your profile picture, top right corner on: https://code.earthengine.google.com sensor : string Satellite sensor to use. You can search for it on: https://developers.google.com/earth-engine/datasets/catalog Insert the string, as you would find it on the GEE catalog site: for example: "COPERNICUS/S3/OLCI" or "LANDSAT/LC08/C02/T1_L2" biovar : string Biophysical variable to process. Currently "built-in" variables available for each sensor: ====================== ===================================================== Satellite Level Available Products ====================== ===================================================== SENTINEL2_L1C Cab, Cm, Cw, FVC, LAI, laiCab, laiCm, laiCw SENTINEL2_L2A Cab, Cm, Cw, FVC, LAI, laiCab, laiCm, laiCw, CNC_Cab, CNC_Cprot, mangrove_LAI, mangrove_Cm, mangrove_Cw, mangrove_Cab SENTINEL3_OLCI_L1B FAPAR, FVC, LAI, LCC ====================== ===================================================== If you have your own model trained with ARTMO (https://artmotoolbox.com), you need to insert the directory of the model, for example: r"C:/User/Models/My_custom_model.py" These models need to be in ".py" format, in order to achieve it, please consult: `ARTMO to GEE <https://github.com/SentiFLEXinel/ARTMOtoGEE/>`_ and `ARTMO <https://artmotoolbox.com/>`_ bounding_box : list, ee.assetpath Your region of interest. Insert bbox as list. Can be selected from https://geojson.io/ (e.g.: [-4.55, 42.73,-4.48, 42.77]). Alternatively, you can insert shapefiles, that are already uploaded to your GEE assets. Just copy the ee.assetpath which stores your SHP shapefile that you already uploaded to GEE. temporal_extent : list Your temporal extent to be processed. (e.g.: ["2021-01-01", "2021-12-31"]) spatial_resolution : int Spatial resolution of the exported data. Value in meters. cloudmask : Boolean If set to "True", cloud masking will be done for Sentinel 2 and 3 sensors. Defaults to False. """ def __init__( self, projectID, sensor, biovar, bounding_box, temporal_extent, spatial_resolution, cloudmask=False, bands = None ): self.projectID = projectID # EE Auth ee.Authenticate() ee.Initialize(project=self.projectID) self.sensor = sensor if self.sensor == "SENTINEL2_L1C" or self.sensor == "COPERNICUS/S2_HARMONIZED": self.sensor = "COPERNICUS/S2_HARMONIZED" search_sensor = "SENTINEL2_L1C" if self.sensor == "SENTINEL2_L2A" or self.sensor == "COPERNICUS/S2_SR_HARMONIZED": self.sensor = "COPERNICUS/S2_SR_HARMONIZED" search_sensor = "SENTINEL2_L2A" if self.sensor == "SENTINEL3_L1B" or self.sensor == "COPERNICUS/S3/OLCI": self.sensor = "COPERNICUS/S3/OLCI" search_sensor = "SENTINEL3_L1B" if biovar[-3:] == ".py": self.custom_model = biovar self.biovar = biovar.split("\\")[-1].split(".")[0] else: self.biovar = biovar if type(bounding_box) is list: # To accept GeoJSON bbox self.bbox = ee.Geometry.BBox( bounding_box[0], bounding_box[1], bounding_box[2], bounding_box[3] ) if "projects/ee-" in bounding_box: # So it accepts shps too! self.bbox = ee.FeatureCollection(bounding_box).geometry() # self.bbox_fc2geo = ee.FeatureCollection(bounding_box).geometry() self.temporal_extent = temporal_extent self.spatial_resolution = spatial_resolution self.cloudmask = cloudmask self.bands = bands self.timeWindows = None self.assetpath = None self.gpr_model = None self.reducer = None # User defined or default models import if biovar[-3:] == ".py": self.custom_model = biovar spec = importlib.util.spec_from_file_location(biovar, self.custom_model) user_module = importlib.util.module_from_spec(spec) sys.modules["user_module"] = user_module spec.loader.exec_module(user_module) self.model_imported = user_module else: url = f"https://raw.githubusercontent.com/daviddkovacs/pyeogpr/refs/heads/main/models/{search_sensor}_{self.biovar}_GEE.py" response = requests.get(url) with open("model_imported.py", "w") as f: f.write("import ee\n") f.write(f"ee.Initialize(project = '{self.projectID}')\n") f.write(response.text) if "model_imported" in sys.modules: del sys.modules["model_imported"] # We do this to flush existing modules import model_imported self.model_imported = model_imported self.imcol = ee.ImageCollection(self.sensor).filterDate( ee.Date(self.temporal_extent[0]), ee.Date(self.temporal_extent[1]) )
[docs] def construct_datacube(self, composite=None): """ composite : Defaults to None The algorithm creates temporal composites, according to the number of days you assign. The temporal composites need to be 1 day smaller than the defined temporal_extent. """ self.timeWindows = composite print(f"Temporal composites: {self.timeWindows} days") dif = ( datetime.datetime.strptime(self.temporal_extent[1], "%Y-%m-%d").toordinal() - datetime.datetime.strptime( self.temporal_extent[0], "%Y-%m-%d" ).toordinal() ) if dif <= composite: raise ValueError( f"Temporal composite ({str(composite)} days) has to be shorter than defined temporal_extent" )
[docs] def process_map(self, assetpath=None): """ assetpath: str You need to define, which GEE asset (ImageCollection) you would want the maps to be exported to. To create your asset, go to top left corner on: https://code.earthengine.google.com/ You will find three tabs: Scripts, Docs and Assets. Go to Assets and create a new ImageCollection. When you created your ImageCollection, copy its ID as a string and assign it to assetpath. If left blank, the script will automatically generate you a default asset. """ if assetpath == None: assetpath = f"projects/{self.projectID}/assets/PyEOGPR_{self.biovar}" ee.data.createAsset({"type": "ImageCollection"}, assetpath) print(f"Assetpath NOT DEFINED:\nExporting to default EE asset: {assetpath}") else: print(f"Exporting to {assetpath}") self.assetpath = assetpath self.maploop()
def sequence_GREEN(self, variable): sequence_GREEN = [] model = self.model_imported for i in range(0, model.XTrain_dim_GREEN): sequence_GREEN.append(str(i)) return sequence_GREEN def getInputDates(self, i): fecha_inicio = ee.Date(self.temporal_extent[0]).advance( ee.Number(i).multiply(self.timeWindows), "day" ) fecha_fin = fecha_inicio.advance(self.timeWindows, "day") # fecha_fin = endDateGEE() fecha_str = datetime.datetime.utcfromtimestamp( fecha_inicio.getInfo()["value"] / 1000.0 ).strftime("%Y%m%d") return { "fecha_inicio": fecha_inicio, "fecha_fin": fecha_fin, "fecha_str": fecha_str, }
[docs] def quality_mask_olci(self, image): qa = ee.Image(image.select("quality_flags")) coastLine = 1 << 30 inLandWater = 1 << 29 bright = 1 << 27 invalid = 1 << 25 Oa12Sat = 1 << 9 mask = ( qa.bitwiseAnd(coastLine) .eq(0) .And(qa.bitwiseAnd(inLandWater).eq(0)) .And(qa.bitwiseAnd(bright).eq(0)) ) return image.updateMask(mask)
[docs] def quality_mask_sentinel2(self, image): qa = image.select('QA60') cloudBitMask = 1 << 10 cirrusBitMask = 1 << 11 mask = qa.bitwiseAnd(cloudBitMask).eq(0) \ .And(qa.bitwiseAnd(cirrusBitMask).eq(0)) return image.updateMask(mask)
def addVariables(image): date = ee.Date(image.get("system:time_start")) years = date.difference(ee.Date("1970-01-01"), "days") return image.addBands(ee.Image(years).rename("t").int()) def calculate_GREEN(self, fecha_inicio, fecha_fin, variable): model = self.model_imported if self.bands is None: self.bands = model.bands if self.sensor == "COPERNICUS/S2_HARMONIZED" or self.sensor == "COPERNICUS/S2_SR_HARMONIZED": image = ee.Image( self.imcol.filterDate(fecha_inicio, fecha_fin) .filterBounds(self.bbox) .filterMetadata('CLOUDY_PIXEL_PERCENTAGE', 'less_than', 40) .map(self.quality_mask_sentinel2) .select(self.bands) .max() .clip(self.bbox) ).divide(10000) if self.sensor == "COPERNICUS/S3/OLCI": image = ee.Image( self.imcol.filterDate(fecha_inicio, fecha_fin) .filterBounds(self.bbox) .map(self.quality_mask_olci) .select(self.bands) # .cast(model.bands_dict, model.bands) .max() .clip(self.bbox) ) im_norm_ell2D_hypell = ( image.subtract(model.mx_GREEN) .divide(model.sx_GREEN) .multiply(model.hyp_ell_GREEN) .toArray() .toArray(1) ) im_norm_ell2D = ( image.subtract(model.mx_GREEN).divide(model.sx_GREEN).toArray().toArray(1) ) PtTPt = ( im_norm_ell2D_hypell.matrixTranspose() .matrixMultiply(im_norm_ell2D) .arrayProject([0]) .multiply(-0.5) ) PtTDX = ( ee.Image(model.X_train_GREEN) .matrixMultiply(im_norm_ell2D_hypell) .arrayProject([0]) .arrayFlatten([self.sequence_GREEN(self.biovar)]) ) arg1 = PtTPt.exp().multiply(model.hyp_sig_GREEN) k_star = PtTDX.subtract(model.XDX_pre_calc_GREEN.multiply(0.5)).exp().toArray() mean_pred = k_star.arrayDotProduct( model.alpha_coefficients_GREEN.toArray() ).multiply(arg1) mean_pred = mean_pred.toArray(1).arrayProject([0]).arrayFlatten([[self.biovar]]) mean_pred = mean_pred.add(model.mean_model_GREEN) filterDown = mean_pred.gt(0) mean_pred = mean_pred.multiply(filterDown) if self.sensor == "COPERNICUS/S2_SR_HARMONIZED" and (self.biovar == "LAI" or self.biovar == "Cm" or self.biovar == "Cw" or self.biovar =="Cab" or self.biovar == "FVC" or self.biovar == "laiCab"): image = image.addBands(mean_pred) return image.select(self.biovar) else: k_star_uncert = ( PtTDX.subtract(model.XDX_pre_calc_GREEN.multiply(0.5)) .exp() .multiply(arg1) .toArray() ) Vvector = ( ee.Image(model.Linv_pre_calc_GREEN) .matrixMultiply(k_star_uncert.toArray(0).toArray(1)) .arrayProject([0]) ) Variance = ( ee.Image(model.hyp_sig_unc_GREEN) .toArray() .subtract(Vvector.arrayDotProduct(Vvector)) .abs() .sqrt() ) Variance = ( Variance.toArray(1) .arrayProject([0]) .arrayFlatten([[self.biovar + "_UNCERTAINTY"]]) ) image = image.addBands(mean_pred) image = image.addBands(Variance) return image.select(self.biovar, self.biovar + "_UNCERTAINTY") def maploop(self): startDate = datetime.datetime.strptime( self.temporal_extent[0], "%Y-%m-%d" ).date() daysIterations = abs( ( startDate - datetime.datetime.strptime(self.temporal_extent[1], "%Y-%m-%d").date() ) // self.timeWindows ).days for i in range(0, daysIterations): print(self.getInputDates(i)["fecha_str"]) imageHolder = ee.Image().set( "system:time_start", ee.Date(self.temporal_extent[0]) .advance(ee.Number(i).multiply(self.timeWindows), "days") .millis(), ) imagen = self.calculate_GREEN( self.getInputDates(i)["fecha_inicio"], self.getInputDates(i)["fecha_fin"], self.biovar, ) # .toInt32() imageHolder = imageHolder.addBands(imagen) image_export = imageHolder #image_export = imageHolder.select(self.biovar, self.biovar + "_UNCERTAINTY") # Optional masking bare_cover = ( ee.Image("COPERNICUS/Landcover/100m/Proba-V-C3/Global/2019") .select("bare-coverfraction") .lte(90) ) lakes = ( ee.Image("COPERNICUS/Landcover/100m/Proba-V-C3/Global/2019") .select("discrete_classification") .eq(80) ) lakemask = lakes.eq(0) image_export = image_export.mask(lakemask) image_export = image_export.mask(bare_cover) image_export = image_export.set( "system:time_start", ee.Date(self.temporal_extent[0]) .advance(ee.Number(i).multiply(self.timeWindows), "days") .millis(), ) image_export = image_export.clip(self.bbox) # Export the image to an asset exportar = ee.batch.Export.image.toAsset( assetId=self.assetpath + "/" + self.getInputDates(i)["fecha_str"] + "_" + self.biovar, image=image_export, maxPixels=17210617060, description=self.getInputDates(i)["fecha_str"] + "_" + self.biovar, scale=self.spatial_resolution, region = self.bbox, ) exportar.start() exportar.status()