"""
This module provides classes for accessing site-level wind and solar data
from internal and external data stores.
"""
import concurrent.futures as cf
from configparser import ConfigParser
import multiprocessing
import numpy as np
import os
import pandas as pds
from urllib.request import urlretrieve
from .powerdata import GeneratorNodeCollection
from .nearestnodes import nearest_power_nodes, nearest_met_nodes
from .resourcedata import WindResource, SolarResource, ResourceList
[docs]class DataStore(object):
"""
Abstract class to define interface for accessing stores of resource data.
"""
META_ROOT = os.path.dirname(os.path.realpath(__file__))
META_ROOT = os.path.join(META_ROOT, 'library')
def __init__(self):
self._wind_meta = None
self._solar_meta = None
def __repr__(self):
"""
Print the type of datastore and its ROOT_PATH
Returns
---------
'str'
type of DataStore
"""
return self.__class__.__name__
@property
def wind_meta(self):
"""
Return wind meta data
Returns
---------
self._wind_meta : 'pandas.DataFrame'
DataFrame of wind resource meta data
"""
if self._wind_meta is None:
path = os.path.join(self.META_ROOT, 'wind_site_meta.json')
self._wind_meta = self.load_meta(path)
return self._wind_meta
@property
def solar_meta(self):
"""
Return solar meta data
Returns
---------
self._wind_meta : 'pandas.DataFrame'
DataFrame of solar resource meta data
"""
if self._solar_meta is None:
path = os.path.join(self.META_ROOT, 'solar_site_meta.json')
self._solar_meta = self.load_meta(path)
return self._solar_meta
[docs] @classmethod
def decode_config_entry(cls, entry):
"""
Decode config entry converting missing or 'None' entires to None
Parameters
----------
entry : 'str'
entry from ConfigParser call
Returns
---------
entry : 'str' or None
if config entry is not 'None' or empty the entry is returned
else None is returned
"""
if entry == 'None' or '':
entry = None
return entry
[docs]class InternalDataStore(DataStore):
"""
This class manages an internal cache of already downloaded resource data,
and other Resource Data Tool information that should persist.
The default location for the internal cache will be in the current \
working diretory, but the user can set a different location by passing
in a configuration file.
A configuration file can also be used to set user library locations, for
pointing to externally provided shapers and formatters.
"""
PKG_DIR = os.path.dirname(os.path.realpath(__file__))
PKG_DIR = os.path.dirname(PKG_DIR)
def __init__(self, cache_root=None, size=None):
"""
Initialize InternalDataStore object
Parameters
----------
cache_root : 'str'
Path to root directory in which local cache should be created
Default is ./R2PD/R2PD_Cache
size : 'float'
Maximum local cache size in GB
"""
super(InternalDataStore, self).__init__()
if cache_root is None:
cache_root = os.path.join(self.PKG_DIR, 'R2PD_Cache')
self._cache_root = cache_root
self._wind_root = os.path.join(self._cache_root, 'wind')
if not os.path.exists(self._wind_root):
os.makedirs(self._wind_root)
self._solar_root = os.path.join(self._cache_root, 'solar')
if not os.path.exists(self._solar_root):
os.makedirs(self._solar_root)
self._size = size
self.update_cache_meta()
def __repr__(self):
"""
Print the type of datastore and its ROOT_PATH
Returns
---------
'str'
type of DataStore and its ROOT_PATH
"""
return '{n} at {i}'.format(n=self.__class__.__name__,
i=self._cache_root)
[docs] @classmethod
def connect(cls, config=None):
"""
Reads the configuration. From configuration and defaults,
determines initializes InternalDataStore object.
Parameters
----------
config : 'str'
Path to .ini configuration file.
See library/config.ini for an example
Returns
---------
'InternalDataStore'
Initialized InternalDataStore object
"""
if config is None:
size = None
else:
config_parser = ConfigParser()
config_parser.read(config)
root_path = config_parser.get('local_cache', 'root_path')
root_path = cls.decode_config_entry(root_path)
size = cls.decode_config_entry(config_parser.get('local_cache',
'size'))
if size is not None:
size = float(size)
return cls(cache_root=root_path, size=size)
[docs] @staticmethod
def get_cache_size(cache_path):
"""
Searches all sub directories in path for .hdf5 files
computes total size in GB
Parameters
----------
cache_path : 'str'
Path to cache directory
Returns
---------
repo_size : 'float'
Returns total size of .hdf5 files in cache in GB
"""
repo_size = 0
for (path, _, files) in os.walk(cache_path):
for file in files:
if file.endswith('.hdf5'):
file_name = os.path.join(path, file)
repo_size += os.path.getsize(file_name) * 10**-9
return repo_size
[docs] @staticmethod
def get_cache_summary(cache_meta):
"""
Summarize the data available in the local cache
Parameters
----------
path : 'str'
Path to cache meta .csv
Returns
---------
summary : 'pandas.Series'
Summary table of number of sites, and corresponding resource types
in local cache
"""
summary = pds.Series()
summary['sites'] = len(cache_meta)
for col in cache_meta.columns:
summary[col] = cache_meta[col].sum()
return summary
@property
def cache_size(self):
"""
Calculate size of local cache and dataset caches in GB
Returns
---------
'tuple'
total, wind, and solar cache sizes in GB (floats)
"""
total_cache = self.get_cache_size(self._cache_root)
wind_cache = self.get_cache_size(self._wind_root)
solar_cache = self.get_cache_size(self._solar_root)
return total_cache, wind_cache, solar_cache
@property
def cache_summary(self):
"""
Summarize sites and resource types in cache
Returns
---------
'pandas.DataFrame'
Summary of Wind and Solar caches
"""
wind_summary = self.get_cache_summary(self._wind_cache)
wind_summary.name = 'wind'
solar_summary = self.get_cache_summary(self._solar_cache)
solar_summary.name = 'solar'
return pds.concat((wind_summary, solar_summary), axis=1).T
@property
def wind_cache(self):
"""
Scan wind cache and update cache meta
Returns
---------
cache_meta : 'pandas.DataFrame'
DataFrame of files in wind cache
"""
columns = ['met', 'power', 'fcst', 'fcst-prob']
cache_meta = pds.DataFrame(columns=columns)
cache_meta.index.name = 'site_id'
return self.scan_cache(cache_meta, self._wind_cache)
@property
def wind_cache(self):
"""
Scan solar cache and update cache meta
Returns
---------
cache_meta : 'pandas.DataFrame'
DataFrame of files in solar cache
"""
columns = ['met', 'power']
cache_meta = pds.DataFrame(columns=columns)
cache_meta.index.name = 'site_id'
return self.scan_cache(cache_meta, self._solar_cache)
[docs] @staticmethod
def scan_cache(cache_path, cache_meta):
"""
Scan cache_path and update cache_meta
Parameters
----------
cache_path : 'str'
Root directory to be scanned for .hdf5 files
cache_meta : 'pandas.DataFrame'
DataFrame of resource files in cache
Returns
---------
cache_meta : 'pandas.DataFrame'
Updated DataFrame of resource files in cache
"""
cache_sites = cache_meta.index
for file in os.listdir(cache_path):
if file.endswith('.hdf5'):
name = os.path.splitext(os.path.basename(file))[0]
_, resource, site_id = name.split('_')
site_id = int(site_id)
if site_id not in cache_sites:
cache_meta.loc[site_id] = False
cache_meta.loc[site_id, resource] = True
cache_sites = cache_meta.index
return cache_meta
[docs] def check_cache(self, dataset, site_id, resource_type=None):
"""
Check cache for presence of resource.
If resource_type is None check for any resource_type of site_id
else check for specific resource_type for site_id
Parameters
----------
dataset : 'str'
'wind' or 'solar'
site_id : 'int'
Site id number
resource_type : 'str'
type of resource
wind -> ('power', 'fcst', 'met')
solar -> ('power', 'fcst', 'met', 'irradiance')
Returns
---------
'bool'
Is site/resource present in cache
"""
if dataset == 'wind':
cache_meta = self._wind_cache
elif dataset == 'solar':
cache_meta = self._solar_cache
else:
msg = "Invalid dataset type, must be 'wind' or 'solar'"
raise ValueError(msg)
cache_sites = cache_meta.index
if site_id in cache_sites:
if resource_type is not None:
cached = bool(cache_meta.loc[site_id, resource_type])
else:
cached = True
else:
cached = False
return cached
[docs] def test_cache_size(self, download_size):
"""
Test to see if download will fit in cache
Parameters
----------
download_size : 'float'
Size of requested download in GB
"""
if self._size is not None:
cache_size, wind_size, solar_size = self.cache_size
open_cache = self._size - cache_size
if open_cache < download_size:
msg = ('Not enough space available in local cache:',
'\nDownload size = {:.2f}GB'.format(download_size),
'\nLocal cache = {:.2f}GB of'.format(cache_size),
' {:.2f}GB in use'.format(self._size),
'\n\tCached wind data = {:.2f}GB'.format(wind_size),
'\n\tCached solar data = {:.2f}GB'.format(solar_size))
raise RuntimeError(''.join(msg))
[docs]class ExternalDataStore(DataStore):
"""
Abstract class to define interface for accessing external stores
of resource data.
"""
# Average File size in MB currently estimates
WIND_FILE_SIZES = {'met': 14, 'power': 4.1, 'fcst': 1}
SOLAR_FILE_SIZES = {'met': 31, 'power': 8.4, 'fcst': 0}
def __init__(self, local_cache=None, threads=None):
"""
Initialize ExternalDataStore object
Parameters
----------
local_cache : 'InternalDataStore'
InternalDataStore object represening internal data cache
threads : 'int'
Number of threads to use during downloads
"""
super(ExternalDataStore, self).__init__()
if local_cache is None:
local_cache = InternalDataStore.connect()
elif not isinstance(local_cache, InternalDataStore):
msg = ("Expecting local_cache to be instance of",
"InternalDataStore,",
"but is {:}.".format(type(local_cache)))
raise RuntimeError(' '.join(msg))
self._local_cache = local_cache
if threads:
if not isinstance(threads, int):
threads = multiprocessing.cpu_count() // 2
else:
threads = None
self._threads = threads
[docs] @classmethod
def connect(cls, config):
"""
Reads the configuration. From configuration and defaults,
determines initializes ExternalDataStore object.
Parameters
----------
config : 'str'
Path to .ini configuration file.
See library/config.ini for an example
Returns
---------
'ExternalDataStore'
Initialized ExternalDataStore object
"""
config_parser = ConfigParser()
config_parser.read(config)
if config_parser.has_section('local_cache'):
local_cache = InternalDataStore.connect(config=config)
else:
local_cache = None
threads = config_parser.get('local_cache', 'threads', None)
return cls(local_cache=local_cache, threads=threads)
[docs] def get_download_size(self, dataset, numb_sites, resource_type,
forecasts=False):
"""
Estimate download size
Parameters
----------
dataset : 'str'
'wind' or 'solar'
num_sites : 'int'
Number of sites to be downloaded
resource_type : 'str'
type of resource
wind -> ('power', 'fcst', 'met')
solar -> ('power', 'fcst', 'met', 'irradiance')
forecasts : 'bool'
Boolean flag as to whether forecasts will be included in the
download or not
Returns
---------
download_size : 'float'
Estimated download size in GB
"""
if dataset == 'wind':
if resource_type == 'power':
download_size = numb_sites * self.WIND_FILE_SIZES['power']
if forecasts:
download_size += numb_sites * self.WIND_FILE_SIZES['fcst']
else:
download_size = numb_sites * self.WIND_FILE_SIZES['met']
elif dataset == 'solar':
if resource_type == 'power':
download_size = numb_sites * self.SOLAR_FILE_SIZES['power']
if forecasts:
download_size += numb_sites * self.SOLAR_FILE_SIZES['fcst']
else:
download_size = numb_sites * self.SOLAR_FILE_SIZES['met']
return download_size / 1000
[docs] def download(self, src, dst):
"""
Abstract method to download src to dst
Parameters
----------
src : 'str'
Path or URL to src file
dst : 'str'
Path to which file should be downloaded
"""
pass
[docs] def nearest_neighbors(self, node_collection):
"""
Find the nearest neighbor resource sites for all nodes in
Node_collection
Parameters
----------
node_collection : 'NodeCollection'
Collection of nodes for which resource sites are to be identified
Returns
---------
nearest_nodes : 'pandas.DataFrame'
Dataframe with the nearest neighbor resource sites for each node
"""
dataset = node_collection._dataset
resource_meta = self.get_meta(dataset)
if isinstance(node_collection, GeneratorNodeCollection):
nearest_nodes = nearest_power_nodes(node_collection,
resource_meta)
else:
nearest_nodes = nearest_met_nodes(node_collection,
resource_meta)
return nearest_nodes
[docs] def download_resource(self, dataset, site_id, resource_type):
"""
Download the resource site file from repository
Parameters
----------
dataset : 'str'
'wind' or 'solar'
site_ids : 'list'
List of site ids to be downloaded
resource_type : 'str'
power or met or fcst
"""
pass
[docs] def download_resource_data(self, dataset, site_ids, resource_type):
"""
Download resource files from repository
Parameters
----------
dataset : 'str'
'wind' or 'solar'
site_ids : 'list'
List of site ids to be downloaded
resource_type : 'str'
power or met
threads : 'int'
Number of threads to use for downloading
"""
if self._threads is None:
for site in site_ids:
self.download_resource(dataset, site, resource_type)
else:
with cf.ThreadPoolExecutor(max_workers=self._threads) as executor:
for site in site_ids:
executor.submit(self.download_resource,
dataset, site, resource_type)
[docs] def get_node_resource(self, dataset, site_id, frac=None):
"""
Initialize and return Resource class object for specified resource site
Parameters
----------
dataset : 'str'
'wind' or 'solar'
site_id : int
Resource site_id
frac : 'float'
Fraction of resource to use from resource site
Returns
---------
'Resource'
Wind or Solar Resource class instance
"""
cache = self._local_cache.check_cache(dataset, site_id)
if cache:
if dataset == 'wind':
return WindResource(self.wind_meta.loc[site_id],
self._local_cache._wind_root, frac=frac)
elif dataset == 'solar':
return SolarResource(self.solar_meta.loc[site_id],
self._local_cache._solar_root, frac=frac)
else:
msg = "Invalid dataset type, must be 'wind' or 'solar'"
raise ValueError(msg)
else:
raise RuntimeError('{d} site {s} is not in local cache!'
.format(d=dataset, s=site_id))
[docs] def get_resource(self, node_collection, forecasts=False):
"""
Finds nearest nodes, caches files to local datastore and assigns
resource to node_collection
Parameters
----------
node_collection : 'NodeCollection'
Collection of either weather of generator nodes
forecasts : 'bool'
Whether to download forecasts along with power data
Returns
---------
node_collection : 'NodeCollection'
Node collection with resources assigned to nodes
nearest_nodes : 'pandas.DataFrame'
DataFrame of the nearest neighbor matching between nodes
and resources
"""
nearest_nodes = self.nearest_neighbors(node_collection)
if isinstance(node_collection, GeneratorNodeCollection):
resource_type = 'power'
site_ids = np.concatenate(nearest_nodes['site_id'].values)
site_ids = np.unique(site_ids)
else:
resource_type = 'met'
site_ids = nearest_nodes['site_id'].values
dataset = node_collection._dataset
self.download_resource_data(dataset, site_ids, resource_type)
if resource_type == 'power' & forecasts:
self.download_resource_data(dataset, site_ids, 'fcst')
self._local_cache.update_cache_meta(dataset)
resources = []
for _, meta in nearest_nodes.iterrows():
site_id = meta['site_id']
if isinstance(site_id, list):
fracs = meta['site_fracs']
r = ResourceList([self.get_node_resource(dataset, site, frac=f)
for site, f in zip(site_id, fracs)])
else:
r = self.get_node_resource(dataset, site_id)
resources.append(r)
if forecasts:
node_collection.assign_resource(resources, forecasts=forecasts)
else:
node_collection.assign_resource(resources)
return node_collection, nearest_nodes
[docs]class DRPower(ExternalDataStore):
"""
Class object for External DataStore at DR Power (egrid.org)
"""
DATA_ROOT = 'https://dtn2.pnl.gov/drpower'
[docs] def download(self, src, dst):
"""
Download resource data from src URL to dst file path
Parameters
----------
src : 'str'
URL of resource data to be downloaded
dst : 'str'
Destination path of resource data (including file name)
"""
urlretrieve(src, dst)
[docs] def download_resource(self, dataset, site_id, resource_type):
"""
Download the resource site file from repo and add site to cache meta
Parameters
----------
dataset : 'str'
'wind' or 'solar'
site_ids : 'list'
List of site ids to be downloaded
resource_type : 'str'
power or met or fcst
"""
file_name = '{}_{}_{}.hdf5'.format(dataset, resource_type, site_id)
src = os.path.join(self.DATA_ROOT, dataset, file_name)
dst = os.path.join(self._local_cache._cache_root, dataset, file_name)
self.download(src, dst)