How dpEmu works¶
dpEmu consists of three components:
A system for building error generators
A system for running ML models with different error parameters
Tools for visualizing the results
Error Generation¶
For a quick hands-on introduction to error generation in dpEmu, see the Error Generation Basics tutorial.
Error generation in dpEmu consists of three simple steps:
Defining the structure of the data by constructing an error generation tree.
Attaching filters (error sources) to the tree.
Calling the
generate_error
method on the root node of the tree.
Creating an Error Generation Tree¶
Error generation trees consist of tree nodes. The most common type of leaf node
is the Array
, which can represent a Numpy array (or Python list) of any
dimension. Even a scalar value can be represented by an Array
node provided
that node is not the root of the tree. If the fundamental unit of your data is
a tuple (as is the case with, e.g. .wav audio data), use a Tuple node as the
leaf.
The simplest and most commonly used non-leaf node type is the Series
.
The Series
represents the leftmost dimension of any unit of data passed to
it. For example, you might choose to represent a matrix of data as a series of
rows. In that case you would then create an Array
node to represent a row
and provide it as the argument to a Series
node:
from dpemu.nodes import Array, Series
row_node = Array()
root_node = Series(row_node)
A TupleSeries represents a tuple where the first (i.e. leftmost) dimensions of the tuple elements are in some sense “the same”. For example, if we have one Numpy array, X, containing the input data and another, Y, containing each data point’s correct label, we may choose to represent (X, Y) as a TupleSeries.
There is usually more than one valid way to represent the structure of the data as a tree. For example, a 2d Numpy array can be represented as:
a matrix, i.e. a single Array node
a list of rows, i.e. a Series with an Array as its child
a list of lists of scalars, i.e. a Series whose child is a Series whose child is an Array.
Adding Filters (Error Sources)¶
Filters can be added to leaf nodes such as Array
or Tuple
nodes.
Dozens of filters (e.g. Snow
, Blur
and SensorDrift
) are provided
out of the box. They can be used to manipulate practically any kind of data,
including but not limited to images,time series and sound. Users can also
create their own custom error sources by subclassing the Filter
class.
To create a filter, call the constructor and provide string identifiers for
the error parameters of that filter. To attach the filter to a leaf node,
call the node’s addfilter
method with the filter object as the parameter.
Calling the generate_error
Method¶
Once you have defined your error generation tree and added the desired filters,
you can call the generate_error
method of the root node of the tree. The
method takes two arguments:
the data into which the errors are to be introduced, and
a dictionary of error parameters.
The parameter dictionary contains the error parameter values that are to be used in the error generation. The keys corresponding to the values are the error parameter identifier strings which you provided to the Filter constructor(s).
The generate_error
method does not overwrite the original data but returns
a copy instead.
This is an example of what the error generation process might look like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import numpy as np
from dpemu.filters.common import Missing
from dpemu.nodes import Array, TupleSeries
# Assume our data is a tuple of the form (x, y) where x has
# shape (100, 10) and y has shape (100,). We can think of each
# row i as a data point where x_i represents the values of the
# explanatory variables and y_i represents the corresponding
# value of the response variable.
x = np.random.rand(100, 10)
y = np.random.rand(100, 1)
data = (x, y)
# Build a data model tree.
x_node = Array()
y_node = Array()
root_node = TupleSeries([x_node, y_node])
# Suppose we want to introduce NaN values (i.e. missing data)
# to y only (thus keeping x intact).
probability = .3
y_node.addfilter(Missing("p", "missing_val"))
# Feed the data to the root node.
output = root_node.generate_error(data, {'p': probability, 'missing_val': np.nan})
print("Output type (should be tuple):", type(output))
print("Output length (should be 2):", len(output))
print("Shape of first member of output tuple (should be (100, 10)):",
output[0].shape)
print("Shape of second first member of output tuple (should be (100,)):",
output[1].shape)
print("Number of NaNs in x (should be 0):",
np.isnan(output[0]).sum())
print(f"Number of NaNs in y (should be close to {probability * y.size}):",
np.isnan(output[1]).sum())
|
In the example the error generation tree has a TupleSeries
as its
root node, which in turn has two Array
nodes as its children. Then on
line 19 we add a Missing
filter to one of the children, which will
transform some of the values in the 1-dimensional array y
to NaN.
The filter is given a parameter with value “p”, which means that the key
for the probability for transforming a number into NaN is going to be
“p” in the parameter dictionary.
We then create a GaussianNoise filter and attach it to x_node
, the
other child of the root node. The GaussianNoise filter takes two string
identifier arguments, corresponding to the mean and standard deviation
of the Gaussian distribution from which the noise is drawn.
Finally we call the generate_error
method of the root node, providing
it with the data and the error parameter dictionary. The method returns
an errorified copy of the data. However, if you wish to run a machine
learning model on the data, the ML runner – to be discussed next – will
call the method for you.
ML runner system¶
The ML runner system, or simply runner, is a system which is used for running
multiple machine learning models simultaneously with distinct filter error
parameters by using multithreading. After running all the models with all
desired parameter combinations the system returns a pandas.DataFrame
object which can be used for visualizing the results.
The runner needs to be given the following values when it is run: train data, test data, a preprocessor, an error generation tree, a list of error parameters, a list of ML models and their parameters and a boolean indicating whether or not to use interactive mode.
Train data and test data¶
These are the original train data and test data which will be given to the ML
models. A None
value can also be passed to the runner if there is no
training data.
Preprocessor¶
The preprocessor needs to implement a function run(train_data, test_data)
and it returns the preprocessed train and test data. The preprocessor can
return additional data as well, and it will be listed as separate columns in
the DataFrame
which the runner returns.
Here is a simple example of a preprocessor, which does nothing to the original
data, but returns also an array called “negative_data” which contains the
additive inverse of each test_data’s element.
1 2 3 4 5 6 7 | class Preprocessor:
def __init__(self):
self.random_state = RandomState(42)
def run(self, train_data, test_data):
negative_data = -test_data
return train_data, test_data, {"negative_data": negative_data}
|
Error generation tree¶
The root node of the error generation tree should be given to the runner. The structure of the error generation tree is described above.
Error parameter list¶
The list of error parameters is simply a list of dictionaries which contain the keys and error values for the error generation tree.
AI model parameter list¶
The list of AI model parameters is a list of dictionaries containing three keys: “model”, “params_list” and “use_clean_train_data”.
The value of “model” is a class instead of an object.
The given class should implement the function run(train_data, test_data,
parameters)
which runs the model on the train data and test data with given
parameters and returns a dictionary containing the scores and possibly
additional data.
The value of “params_list” is a list of dictionaries where each dictionary
contains one set of parameters for model. The model will be given these
parameters when the run(train_data, test_data, parameters)
function is
called.
If the “use_clean_train_data” boolean is true, then no error will be added to the train data.
Here is an example AI model parameter list and a model:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | from numpy.random import RandomState
from sklearn.cluster import KMeans
from sklearn.metrics import adjusted_rand_score
from sklearn.metrics import adjusted_mutual_info_score
# Model
class KMeansModel:
def __init__(self):
self.random_state = RandomState(42)
def run(self, train_data, test_data, model_params):
labels = model_params["labels"]
n_classes = len(np.unique(labels))
fitted_model = KMeans(n_clusters=n_classes,
random_state=self.random_state
).fit(test_data)
return {
"AMI": round(adjusted_mutual_info_score(labels,
fitted_model.labels_,
average_method="arithmetic"),
3),
"ARI": round(adjusted_rand_score(labels, fitted_model.labels_), 3),
}
# Parameter list
model_params_dict_list = [
{"model": KMeansModel, "params_list": [{"labels": labels}]}
]
|
Interactive mode¶
The final parameter of the runner system is a boolean indicating whether to use interactive mode or not. Some of the functions for visualizing the results require interactive mode; for others it is optional. Most visualization functions have no interactive functionality.
Basically what the interactive mode does is that it adds a column containing
the modified test data to the resulting DataFrame
object. The interactive
visualizer functions use this data to display points of data so that e.g.
the user can try to figure out why something was classified incorrectly.
Visualization functions¶
The dpemu.plotting_utils
module contains several functions for plotting and
visualizing the data.
A Complete Example¶
Here is an unrealistic but simple example which demonstrates all three components of dpEmu. In this example we are trying to predict the next value of data when we know all earlier values in the data. Our model tries to do estimate this by keeping a weighted average. In the end of the example a plot of scores is visualized.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | import sys
import matplotlib.pyplot as plt
import numpy as np
from dpemu import runner
from dpemu.plotting_utils import visualize_scores, print_results_by_model, visualize_best_model_params
from dpemu.nodes import Array
from dpemu.filters.common import GaussianNoise
class Preprocessor:
def run(self, train_data, test_data, params):
return train_data, test_data, {}
class PredictorModel:
def run(self, train_data, test_data, params):
# The model tries to predict the values of test_data
# by using a weighted average of previous values
estimate = 0
squared_error = 0
for elem in test_data:
# Calculate error
squared_error += (elem - estimate) * (elem - estimate)
# Update estimate
estimate = (1 - params["weight"]) * estimate + params["weight"] * elem
mean_squared_error = squared_error / len(test_data)
return {"MSE": mean_squared_error}
def get_data(argv):
train_data = None
test_data = np.arange(int(sys.argv[1]))
return train_data, test_data
def get_err_root_node():
# Create error generation tree that has an Array node
# as its root node and a GaussianNoise filter
err_root_node = Array()
err_root_node.addfilter(GaussianNoise("mean", "std"))
return err_root_node
def get_err_params_list():
# The standard deviation goes from 0 to 20
return [{"mean": 0, "std": std} for std in range(0, 21)]
def get_model_params_dict_list():
# The model is run with different weighted estimates
return [{
"model": PredictorModel,
"params_list": [{'weight': w} for w in [0.0, 0.05, 0.15, 0.5, 1.0]],
"use_clean_train_data": False
}]
def visualize(df):
# Visualize mean squared error for all used standard deviations
visualize_scores(
df=df,
score_names=["MSE"],
is_higher_score_better=[False],
err_param_name="std",
title="Mean squared error"
)
visualize_best_model_params(
df=df,
model_name="Predictor #1",
model_params=["weight"],
score_names=["MSE"],
is_higher_score_better=[False],
err_param_name="std",
title=f"Best model params"
)
plt.show()
def main(argv):
# Create some fake data
if len(argv) == 2:
train_data, test_data = get_data(argv)
else:
exit(0)
# Run the whole thing and get DataFrame for visualization
df = runner.run(
train_data=train_data,
test_data=test_data,
preproc=Preprocessor,
preproc_params=None,
err_root_node=get_err_root_node(),
err_params_list=get_err_params_list(),
model_params_dict_list=get_model_params_dict_list()
)
print_results_by_model(df)
visualize(df)
if __name__ == "__main__":
main(sys.argv)
|
Run the program with the command:
python3 examples/run_manual_predictor_example 1000
Here’s what the resulting image should look like: