Source code for downhill.dataset

# -*- coding: utf-8 -*-

r'''This module contains a class for handling batched datasets.

In many optimization tasks, parameters must be updated by optimizing them with
respect to estimates of a loss function. The loss function for many problems is
estimated using a set of data that we have measured.
'''

import collections
import numpy as np
import theano

from . import util


[docs]class Dataset: '''This class handles batching and shuffling a dataset. In ``downhill``, losses are optimized using sets of data collected from the problem that generated the loss. During optimization, data are grouped into "mini-batches"---that is, chunks that are larger than 1 sample and smaller than the entire set of samples; typically the size of a mini-batch is between 10 and 100, but the specific setting can be varied depending on your model, hardware, dataset, and so forth. These mini-batches must be presented to the optimization algorithm in pseudo-random order to match the underlying stochasticity assumptions of many optimization algorithms. This class handles the process of grouping data into mini-batches as well as iterating and shuffling these mini-batches dynamically as the dataset is consumed by the optimization algorithm. For many tasks, a dataset is obtained as a large block of sample data, which in Python is normally assembled as a ``numpy`` ndarray. To use this class on such a dataset, just pass in a list or tuple containing ``numpy`` arrays; the number of these arrays must match the number of inputs that your loss computation requires. There are some cases when a suitable set of training data would be prohibitively expensive to assemble in memory as a single ``numpy`` array. To handle these cases, this class can also handle a dataset that is provided via a Python callable. For more information on using callables to provide data to your model, see :ref:`data-using-callables`. Parameters ---------- inputs : callable or list of ndarray/sparse matrix/DataFrame/theano shared var One or more sets of data. If this parameter is callable, then mini-batches will be obtained by calling the callable with no arguments; the callable is expected to return a tuple of ndarray-like objects that will be suitable for optimizing the loss at hand. If this parameter is a list (or a tuple), it must contain array-like objects: ``numpy.ndarray``, ``scipy.sparse.csc_matrix``, ``scipy.sparse.csr_matrix``, ``pandas.DataFrame`` or ``theano.shared``. These are assumed to contain data for computing the loss, so the length of this tuple or list should match the number of inputs required by the loss computation. If multiple arrays are provided, their lengths along the axis given by the ``axis`` parameter (defaults to 0) must match. name : str, optional A string that is used to describe this dataset. Usually something like 'test' or 'train'. batch_size : int, optional The size of the mini-batches to create from the data sequences. If this is negative or zero, all data in the dataset will be used in one batch. Defaults to 32. This parameter has no effect if ``inputs`` is callable. iteration_size : int, optional The number of batches to yield for each call to iterate(). Defaults to the length of the data divided by batch_size. If the dataset is a callable, then the number is len(callable). If callable has no length, then the number is set to 100. axis : int, optional The axis along which to split the data arrays, if the first parameter is given as one or more ndarrays. If not provided, defaults to 0. rng : :class:`numpy.random.RandomState` or int, optional A random number generator, or an integer seed for a random number generator. If not provided, the random number generator will be created with an automatically chosen seed. ''' _count = 0
[docs] def __init__(self, inputs, name=None, batch_size=32, iteration_size=None, axis=0, rng=None): self.name = name or 'dataset{}'.format(Dataset._count) Dataset._count += 1 self.batch_size = batch_size self.iteration_size = iteration_size self.rng = rng if rng is None or isinstance(rng, int): self.rng = np.random.RandomState(rng) self._inputs = None self._slices = None self._callable = None if isinstance(inputs, collections.Callable): self._init_callable(inputs) else: self._init_arrays(inputs, axis)
def _init_callable(self, inputs): self._callable = inputs if not self.iteration_size: try: self.iteration_size = len(inputs) except (TypeError, AttributeError): # has no len self.iteration_size = 100 util.log('{0.name}: {0.iteration_size} mini-batches from callable', self) def _init_arrays(self, inputs, axis=0): if not isinstance(inputs, (tuple, list)): inputs = (inputs, ) shapes = [] self._inputs = [] for i, x in enumerate(inputs): self._inputs.append(x) if isinstance(x, np.ndarray): shapes.append(x.shape) continue if isinstance(x, theano.compile.SharedVariable): shapes.append(x.get_value(borrow=True).shape) continue if 'pandas.' in str(type(x)): # hacky but prevents a global import import pandas as pd if isinstance(x, (pd.Series, pd.DataFrame)): shapes.append(x.shape) continue if 'scipy.sparse.' in str(type(x)): # same here import scipy.sparse as ss if isinstance(x, (ss.csr.csr_matrix, ss.csc.csc_matrix)): shapes.append(x.shape) continue raise ValueError( 'input {} (type {}) must be numpy.array, theano.shared, ' 'or pandas.{{Series,DataFrame}}'.format(i, type(x))) L = shapes[0][axis] assert all(L == s[axis] for s in shapes), \ 'shapes do not match along axis {}: {}'.format( axis, '; '.join(str(s) for s in shapes)) B = L if self.batch_size <= 0 else self.batch_size self._index = 0 self._slices = [] for i in range(0, L, B): where = [] for shape in shapes: slices = [slice(None) for _ in shape] slices[axis] = slice(i, min(L, i + B)) where.append(tuple(slices)) self._slices.append(where) self.shuffle() if not self.iteration_size: self.iteration_size = len(self._slices) util.log('{0.name}: {0.iteration_size} of {1} mini-batches from {2}', self, len(self._slices), '; '.join(str(s) for s in shapes)) def __iter__(self): return self.iterate(True)
[docs] def shuffle(self): '''Shuffle the batches in the dataset. If this dataset was constructed using a callable, this method has no effect. ''' if self._slices is not None: self.rng.shuffle(self._slices)
[docs] def iterate(self, shuffle=True): '''Iterate over batches in the dataset. This method generates ``iteration_size`` batches from the dataset and then returns. Parameters ---------- shuffle : bool, optional Shuffle the batches in this dataset if the iteration reaches the end of the batch list. Defaults to True. Yields ------ batches : data batches A sequence of batches---often from a training, validation, or test dataset. ''' for _ in range(self.iteration_size): if self._callable is not None: yield self._callable() else: yield self._next_batch(shuffle)
def _next_batch(self, shuffle=True): batch = [x.iloc[i] if hasattr(x, 'iloc') else x[i] for x, i in zip(self._inputs, self._slices[self._index])] self._index += 1 if self._index >= len(self._slices): if shuffle: self.shuffle() self._index = 0 return batch