import numpy as np
import pandas as pd
from collections.abc import Iterable
import os
from scipy import integrate
from tqdm import tqdm
from pvlib import iotools
class spectral_property(object):
"""
WRITE DOCSTRING HERE
"""
def load_file(filepath):
with open(filepath, 'r') as infile:
meta = next(infile)[:-1]
data = pd.read_csv(infile)
return spectral_property(data['value'], data['wavelength'],
interpolation=meta.split(':')[1])
def to_nm(wavelength, units):
unit_conversion = { 'nm': 1,
'um': 1000 }
# Verify units are in conversion table
if units not in unit_conversion:
print("Warning: Unknown unit specified. Options are {}.".format(
unit_conversion.keys()))
units = 'nm'
return wavelength * unit_conversion[units]
def _linear_interpolation(self, wavelength_nm):
# Find upper and lower index
upper_bound = self.data[self.data.index > wavelength_nm].index.min()
lower_bound = self.data[self.data.index < wavelength_nm].index.max()
# Determine values of surrounding indices
upper_val = self.data['value'][upper_bound]
lower_val = self.data['value'][lower_bound]
# Calculate deltas
delta_lambda = upper_bound - lower_bound
delta_val = upper_val - lower_val
return lower_val + delta_val*(wavelength_nm - lower_bound)/delta_lambda
def _nearest_interpolation(self, wavelength_nm):
# Find upper and lower index
upper_bound = self.data[self.data.index > wavelength_nm].index.min()
lower_bound = self.data[self.data.index < wavelength_nm].index.max()
# Determine which index is closer
if (upper_bound - wavelength_nm) < (wavelength_nm - lower_bound):
return self.data['value'][upper_bound]
return self.data['value'][lower_bound]
def _lower_interpolation(self, wavelength_nm):
# Find lower index
lower_bound = self.data[self.data.index < wavelength_nm].index.max()
return self.data['value'][lower_bound]
def _upper_interpolation(self, wavelength_nm):
# Find upper index
upper_bound = self.data[self.data.index > wavelength_nm].index.min()
return self.data['value'][upper_bound]
interpolation_methods = {
'linear': _linear_interpolation,
'nearest': _nearest_interpolation,
'lower': _lower_interpolation,
'upper': _upper_interpolation
}
def __init__(self, values, index, index_units='nm', interpolation=None):
# Verify lengths match
if len(values) != len(index):
print("Warning: Length of values and index must match.")
return
# Convert inputs to list
values = [ val for val in values ]
index = [ spectral_property.to_nm(idx, index_units) for idx in index ]
# Create DataFrame
self.data = pd.DataFrame()
self.data['value'] = values
self.data['wavelength'] = index
self.data = self.data.set_index('wavelength')
self.interpolation = None
if interpolation in spectral_property.interpolation_methods:
self.interpolation = \
spectral_property.interpolation_methods[interpolation]
self.interpolation_type = interpolation
elif interpolation:
print("Warning: Specified interpolation type unknown.")
def _get_single(self, wavelength, units):
# Convert wavelength to nm
wavelength = spectral_property.to_nm(wavelength, units)
if wavelength in self.data.index:
# If the value for that wavelength is known, return it
return self.data['value'][wavelength]
elif self.interpolation:
# Check wavelength is within range
if wavelength < self.data.index.min() or \
wavelength > self.data.index.max():
print("Warning: Requested wavelength outside spectrum.")
return None
# Return interpolated value
return self.interpolation(self, wavelength)
return None
def __getitem__(self, wavelength, units='nm'):
if isinstance(wavelength, Iterable):
return np.array([ self._get_single(wl, units) for wl in wavelength ])
return self._get_single(wavelength, units)
def to_file(self, filepath, append=False):
mode = 'w'
if append:
mode = 'a'
with open(filepath, mode) as outfile:
outfile.write(f"interpolation:{self.interpolation_type}\n")
self.data.to_csv(outfile)
def range(self):
# Find upper and lower index
upper_bound = self.data.index.max()
lower_bound = self.data.index.min()
return (lower_bound, upper_bound)
def scale_values(self, scaling_factor):
self.data['value'] *= scaling_factor
def spectral_albedo_smarts(zen, azm, material, min_wavelength=300,
max_wavelength=4000):
import pySMARTS
smarts_res = pySMARTS.SMARTSSpectraZenAzm('30 31', str(zen), str(azm), material,
min_wvl=str(min_wavelength),
max_wvl=str(max_wavelength))
return spectral_property(smarts_res['Local_ground_reflectance'],
smarts_res['Wvlgth'], interpolation='linear')
def spectral_irradiance_smarts(zen, azm, material='LiteSoil', min_wavelength=300,
max_wavelength=4000):
import pySMARTS
try:
smarts_res = pySMARTS.SMARTSSpectraZenAzm('2 3 4', str(zen), str(azm),
material=material,
min_wvl=str(min_wavelength),
max_wvl=str(max_wavelength))
except PermissionError as e:
msg = "{}".format(e)
raise PermissionError(msg + " Error accessing SMARTS. Make sure you have "
"SMARTS installed in a directory that you have read/write privileges for. ")
dni_spectrum = spectral_property(smarts_res['Direct_normal_irradiance'],
smarts_res['Wvlgth'], interpolation='linear')
dhi_spectrum = spectral_property(smarts_res['Difuse_horizn_irradiance'],
smarts_res['Wvlgth'], interpolation='linear')
ghi_spectrum = spectral_property(smarts_res['Global_horizn_irradiance'],
smarts_res['Wvlgth'], interpolation='linear')
return (dni_spectrum, dhi_spectrum, ghi_spectrum)
def spectral_irradiance_smarts_SRRL(YEAR, MONTH, DAY, HOUR, ZONE,
LATIT, LONGIT, ALTIT,
RH, TAIR, SEASON, TDAY, SPR, W,
TILT, WAZIM, HEIGHT,
ALPHA1, ALPHA2, OMEGL, GG, BETA, TAU5,
RHOG, material,
IOUT='2 3 4', min_wvl='280', max_wvl='4000'):
import pySMARTS
smarts_res = pySMARTS.SMARTSSRRL(IOUT=IOUT, YEAR=YEAR,MONTH=MONTH,DAY=DAY,HOUR=HOUR, ZONE=ZONE,
LATIT=LATIT, LONGIT=LONGIT, ALTIT=ALTIT,
RH=RH, TAIR=TAIR, SEASON=SEASON, TDAY=TDAY, SPR=SPR, W=W,
TILT=TILT, WAZIM=WAZIM, HEIGHT=HEIGHT,
ALPHA1 = ALPHA1, ALPHA2 = ALPHA2, OMEGL = OMEGL,
GG = GG, BETA = BETA, TAU5= TAU5,
RHOG=RHOG, material=material,
min_wvl=min_wvl, max_wvl=max_wvl)
dni_spectrum = spectral_property(smarts_res[smarts_res.keys()[1]],
smarts_res['Wvlgth'], interpolation='linear')
dhi_spectrum = spectral_property(smarts_res[smarts_res.keys()[2]],
smarts_res['Wvlgth'], interpolation='linear')
ghi_spectrum = spectral_property(smarts_res[smarts_res.keys()[3]],
smarts_res['Wvlgth'], interpolation='linear')
return (dni_spectrum, dhi_spectrum, ghi_spectrum)
def spectral_albedo_smarts_SRRL(YEAR, MONTH, DAY, HOUR, ZONE,
LATIT, LONGIT, ALTIT,
RH, TAIR, SEASON, TDAY, SPR, W,
TILT, WAZIM, HEIGHT,
ALPHA1, ALPHA2, OMEGL, GG, BETA, TAU5,
RHOG, material,
IOUT='30 31', min_wvl='280', max_wvl='4000'):
import pySMARTS
smarts_res = pySMARTS.SMARTSSRRL(IOUT=IOUT, YEAR=YEAR,MONTH=MONTH,DAY=DAY,HOUR=HOUR, ZONE=ZONE,
LATIT=LATIT, LONGIT=LONGIT, ALTIT=ALTIT,
RH=RH, TAIR=TAIR, SEASON=SEASON, TDAY=TDAY, SPR=SPR, W=W,
TILT=TILT, WAZIM=WAZIM, HEIGHT=HEIGHT,
ALPHA1 = ALPHA1, ALPHA2 = ALPHA2, OMEGL = OMEGL,
GG = GG, BETA = BETA, TAU5= TAU5,
RHOG=RHOG, material=material,
min_wvl=min_wvl, max_wvl=max_wvl)
return spectral_property(smarts_res['Local_ground_reflectance'],
smarts_res['Wvlgth'], interpolation='linear')
[docs]
def generate_spectra(metdata, simulation_path, ground_material='Gravel', spectra_folder=None, scale_spectra=False,
scale_albedo=False, scale_albedo_nonspectral_sim=False, scale_upper_bound=2500, min_wavelength=280, max_wavelength=4000):
"""
generate spectral curve for particular material. Requires pySMARTS
Parameters
----------
metdata : bifacial_radiance MetObj
MetObj containing weather data, with a datetime index.
simulation_path: string or path
path of simulation directory
ground_material : string, optional
type of ground material for spectral simulation. Options include:
Grass, Gravel, Snow, Seasonal etc.
The default is 'Gravel'.
spectra_folder : path, optional
location to save spectral data. The default is None.
scale_spectra : bool, optional
DESCRIPTION. The default is False.
scale_albedo : bool, optional
DESCRIPTION. The default is False.
scale_albedo_nonspectral_sim : bool, optional
DESCRIPTION. The default is False.
scale_upper_bound: integer, optional
Set an upper bound for the wavelength when taking the mean
or integral of any generated spectra.
Returns
-------
spectral_alb : spectral_property class
spectral_alb.data: dataframe with frequency and magnitude data.
spectral_dni : spectral_property class
spectral_dni.data: dataframe with frequency and magnitude data.
spectral_dhi : spectral_property class
spectral_dhi.data: dataframe with frequency and magnitude data.
weighted_alb : pd.series
datetime-indexed series of weighted albedo values
"""
# make the datetime easily readable and indexed
dts = pd.Series(data=metdata.datetime)
# weighted albedo data frame
walb = pd.Series(index=np.array(metdata.datetime),dtype='float64')
# print useful reminders
if scale_albedo_nonspectral_sim:
print(' -= Non-Spectral Simulation =- \n Spectra files will NOT be saved.')
else:
print(' -= Spectral Simulation =- \n Spectra files will be saved.')
for dt in tqdm(dts,ncols=100,desc='Generating Spectra'):
# scrape all the necessary metadata
idx = dts.index[dts==dt][0]
dni = metdata.dni[idx]
dhi = metdata.dhi[idx]
ghi = metdata.ghi[idx]
if metdata.albedo is not None:
alb = metdata.albedo[idx]
else:
alb = 0.2
solpos = metdata.solpos.iloc[idx]
zen = float(solpos.zenith)
azm = float(solpos.azimuth) - 180
lat = metdata.latitude
# create file names
suffix = f'_{str(dt.year)[-2:]}_{dt.month:02}_{dt.day:02}_{dt.hour:02}_{dt.minute:02}.txt'
dni_file = os.path.join(simulation_path, spectra_folder, "dni"+suffix)
dhi_file = os.path.join(simulation_path, spectra_folder, "dhi"+suffix)
ghi_file = os.path.join(simulation_path, spectra_folder, "ghi"+suffix)
alb_file = os.path.join(simulation_path, spectra_folder, "alb"+suffix)
# generate the base spectra
try:
spectral_dni, spectral_dhi, spectral_ghi = spectral_irradiance_smarts(zen, azm, min_wavelength=min_wavelength, max_wavelength=max_wavelength)
except:
if scale_albedo_nonspectral_sim:
walb[dt] = 0.0
continue
# limit dataframes for calculations by scaling upper bound
tdni = spectral_dni.data[spectral_dni.data.index <= scale_upper_bound]
tdhi = spectral_dhi.data[spectral_dhi.data.index <= scale_upper_bound]
tghi = spectral_ghi.data[spectral_ghi.data.index <= scale_upper_bound]
# scaling spectra
if scale_spectra:
dni_scale = dni / integrate.trapezoid(tdni.value, x=tdni.index)
dhi_scale = dhi / integrate.trapezoid(tdhi.value, x=tdhi.index)
ghi_scale = ghi / integrate.trapezoid(tghi.value, x=tghi.index)
spectral_dni.scale_values(dni_scale)
spectral_dhi.scale_values(dhi_scale)
spectral_ghi.scale_values(ghi_scale)
# Determine Seasonal ground cover, if necessary
north = [1,2,3,4,10,11,12]
south = [5,6,7,8,9,10]
if lat < 0: winter = north
if lat > 0: winter = south
if ground_material == 'Seasonal':
MONTH = metdata.datetime[idx].month
if MONTH in winter :
if alb >= 0.6:
ground_material = 'Snow'
else:
ground_material = 'DryGrass'
else:
ground_material = 'Grass'
# Generate base spectral albedo
spectral_alb = spectral_albedo_smarts(zen, azm, ground_material, min_wavelength=280)
# Limit albedo by upper bound wavelength
talb = spectral_alb.data[spectral_alb.data.index <= scale_upper_bound]
# scaling albedo
if scale_albedo:
#***
# Currently using simple scaling model (scale by mean)
#***
denom = talb.values.mean()
scale_factor = alb / denom
spectral_alb.scale_values(scale_factor)
# If performing a non-spectral simulation, generate single albedo weighted by spectra
if scale_albedo_nonspectral_sim:
#SR = SR[SR.index <= scale_upper_bound] # placeholder for Spectral Responsivity
num = talb * tghi #* SR
num = integrate.trapezoid(num.value, x=num.index)
denom = tghi #* SR
denom = integrate.trapezoid(denom.value, x=denom.index)
alb_weighted = num / denom
walb[dt] = alb_weighted
# only save the files if performing a spectral simulation
if not scale_albedo_nonspectral_sim:
spectral_alb.to_file(alb_file)
spectral_dhi.to_file(dhi_file)
spectral_dni.to_file(dni_file)
spectral_ghi.to_file(ghi_file)
# save a basic csv of weighted albedo, indexed by datetime
if scale_albedo_nonspectral_sim:
walbPath = os.path.join(simulation_path,spectra_folder,'albedo_scaled_nonSpec.csv')
walb.to_csv(walbPath)
print('Weighted albedo CSV saved.')
weighted_alb = walb
else:
weighted_alb = None
return (spectral_alb, spectral_dni, spectral_dhi, weighted_alb)
[docs]
def generate_spectral_tmys(wavelengths, spectra_folder, metdata, location_name, output_folder):
"""
Generate a series of TMY-like files with per-wavelength irradiance. There will be one file per
wavelength. These are necessary to run a spectral simulation with gencumsky
Paramters:
----------
wavelengths: (np.array or list)
array or list of integer wavelengths to simulate, in units [nm]. example: [300,325,350]
spectra_folder: (path or str)
File path or path-like string pointing to the folder contained the SMARTS generated spectra
metdata: pandas DataFrame
DataFrame containing the weather data, with a datetime index.
location_name:
_description_
output_folder:
File path or path-like string pointing to the destination folder for spectral TMYs
"""
# -- read in the spectra files
spectra_files = next(os.walk(spectra_folder))[2]
spectra_files.sort()
# -- read in the weather file and format
tmydata = metdata.tmydata.copy()
tmydata.rename(columns={'dni':'DNI',
'dhi':'DHI',
'temp_air':'DryBulb',
'wind_speed':'Wspd',
'ghi':'GHI',
'relative_humidity':'RH',
'albedo':'Alb'
}, inplace=True)
dtindex = tmydata.index
# -- grab the weather file header to reproduce location meta-data
header = metdata.metadata.copy()
# -- read in a spectra file to copy wavelength-index
temp = pd.read_csv(os.path.join(spectra_folder,spectra_files[0]), header=1, index_col = 0)
# -- copy and reproduce the datetime index
dates = []
for file in spectra_files:
take = file[4:-4]
if take not in dates:
dates.append(take)
dates = pd.to_datetime(dates,format='%y_%m_%d_%H_%M').tz_localize(dtindex.tz)
# -- create a multi-index of columns [timeindex:alb,dni,dhi,ghi]
iterables = [dates,['ALB','DHI','DNI','GHI']]
multi_index = pd.MultiIndex.from_product(iterables, names=['time_index','irr_type'])
# -- create empty dataframe
spectra_df = pd.DataFrame(index=temp.index,columns=multi_index)
# -- fill with irradiance data
for file in spectra_files:
a = pd.to_datetime(file[4:-4],format='%y_%m_%d_%H_%M').tz_localize(dtindex.tz)
b = file[:3].upper()
spectra_df[a,b] = pd.read_csv(os.path.join(spectra_folder,file),header=1, index_col=0)
# -- reorder the columns to match TMYs
spectra_df.columns.set_levels(['Alb','DHI','DNI','GHI'],level=1)
# -- create arrays of zeros for data outside the array
zeros = np.zeros(len(dtindex))
# -- build the blank tmy-like data frame
blank_df = pd.DataFrame(index=dtindex, data={'Date (MM/DD/YYYY)':dtindex.strftime('%#m/%#d/%Y'),
'Time (HH:MM)':dtindex.strftime('%H:%M'),
'Wspd':tmydata['Wspd'],'Dry-bulb':tmydata['DryBulb'],
'DHI':zeros,'DNI':zeros,'GHI':zeros,'ALB':zeros})
# column names for transfer
irrs = ['DNI','DHI','GHI','ALB']
# -- grab data, save file
for wave in tqdm(wavelengths, ncols=100, desc='Generating Spectral TMYs'):
fileName = f'{location_name}_TMY_w{wave:04}.csv'
fileName = os.path.join(output_folder,fileName)
wave_df = blank_df.copy()
for col in spectra_df.columns:
wave_df.loc[col[0],col[1]] = spectra_df[col].loc[wave]
with open(fileName, 'w', newline='') as ict:
# for line in header:
# ict.write(line)
wave_df.to_csv(ict, index=False)
[docs]
def integrated_spectrum(spectra_folder, metdata ):
"""
Generate integrated sums across the full spectra
Paramters:
----------
spectra_folder: (path or str)
File path or path-like string pointing to the folder contained the SMARTS generated spectra
metdata: pandas DataFrame
DataFrame containing the weather data, with a datetime index.
Returns:
--------
integrated_sums: (list)
list of integrated sums for DNI, DHI, DNI*ALB, DHI*ALB
"""
# -- read in the spectra files
spectra_files = next(os.walk(spectra_folder))[2]
spectra_files.sort()
# -- read in the weather file and format
tmydata = metdata.tmydata.copy()
#tmydata.index = tmydata.index+pd.Timedelta(hours=1)
tmydata.rename(columns={'dni':'DNI',
'dhi':'DHI',
'temp_air':'DryBulb',
'wind_speed':'Wspd',
'ghi':'GHI',
'relative_humidity':'RH',
'albedo':'Alb'
}, inplace=True)
dtindex = tmydata.index
# -- grab the weather file header to reproduce location meta-data
header = metdata.metadata.copy()
# -- read in a spectra file to copy wavelength-index
temp = pd.read_csv(os.path.join(spectra_folder,spectra_files[0]), header=1, index_col = 0)
# -- copy and reproduce the datetime index
dates = []
for file in spectra_files:
take = file[4:-4]
if take not in dates:
dates.append(take)
dates = pd.to_datetime(dates,format='%y_%m_%d_%H_%M').tz_localize(dtindex.tz)
# -- create a multi-index of columns [timeindex:alb,dni,dhi,ghi]
iterables = [dates,['ALB','DHI','DNI','GHI']]
multi_index = pd.MultiIndex.from_product(iterables, names=['time_index','irr_type'])
# -- create empty dataframe
spectra_df = pd.DataFrame(index=temp.index,columns=multi_index)
# -- fill with irradiance data
for file in spectra_files:
a = pd.to_datetime(file[4:-4],format='%y_%m_%d_%H_%M').tz_localize(dtindex.tz)
b = file[:3].upper()
spectra_df[a,b] = pd.read_csv(os.path.join(spectra_folder,file),header=1, index_col=0)
integrated_sums = pd.DataFrame(index=dates, columns=['Sum_DNI', 'Sum_DHI', 'Sum_DNI_ALB', 'Sum_DHI_ALB'])
for col in spectra_df.columns:
integrated_sums.loc[col[0], 'Sum_DNI'] = integrate.trapezoid(spectra_df[col[0], 'DNI'], spectra_df.index)
integrated_sums.loc[col[0], 'Sum_DHI'] = integrate.trapezoid(spectra_df[col[0], 'DHI'], spectra_df.index)
integrated_sums.loc[col[0], 'Sum_DNI_ALB'] = integrate.trapezoid(spectra_df[col[0], 'DNI'] * spectra_df[col[0], 'ALB'], spectra_df.index)
integrated_sums.loc[col[0], 'Sum_DHI_ALB'] = integrate.trapezoid(spectra_df[col[0], 'DHI'] * spectra_df[col[0], 'ALB'], spectra_df.index)
return integrated_sums