API Documentation

CodeFlare Pipelines Datamodel

codeflare.pipelines.Datamodel The core data model structures are defined here. These include the various aspects for creating a DAG, the input and output to the DAG itself.

  • Pipeline: The class that defines the pipeline graph is captured, which is supported by Node and Edge constructs.

  • Node: The node class that has two implementations, EstimatorNode and AndNode, the details of these are captured in a separate document on nodes and their semantics defined by the type, firing semantics, and state.

  • PipelineInput: The input for pipeline, supported by Xy and XYRef holder classes.

  • PipelineOutput: The output for pipeline (after execution), supported by Xy and XYRef holder classes.

  • Xy: A basic holder class for X and y, both of which are numpy arrays.

  • XYRef: A basic holder class for pointers to X and y, pointers are reference to objects in Plasma store of Ray.

  • PipelineParam: Finally, the data model allows for morphing of pipeline based on parameterizations, these parameterizations can be for grid search or for other such similar reasons.

class codeflare.pipelines.Datamodel.AndEstimator[source]

Bases: sklearn.base.BaseEstimator

An and estimator, is part of the AndNode, it is very similar to a standard estimator, however the key difference is that it takes a xy_list as input and outputs an xy, contrasting to the EstimatorNode, which takes an input as xy and outputs xy_t.

In the pipeline execution, we expect three modes: (a) FIT: A regressor or classifier will call the fit and then pass on the transform results downstream, a non-regressor/classifier will call the fit_transform method, (b) PREDICT: A regressor or classifier will call the predict method, whereas a non-regressor/classifier will call the transform method, and (c) SCORE: A regressor will call the score method, and a non-regressor/classifer will call the transform method.

Examples

Here is a simple FeatureUnion as an AndEstimator:

class FeatureUnion(dm.AndEstimator):
    def __init__(self):
        pass

    def get_estimator_type(self):
        return 'transform'

    def clone(self):
        return base.clone(self)

    def fit_transform(self, xy_list):
        return self.transform(xy_list)

    def transform(self, xy_list):
        X_list = []
        y_vec = None
        for xy in xy_list:
            X_list.append(xy.get_x())
            y_vec = xy.get_y()
        X_concat = np.concatenate(X_list, axis=1)
        return dm.Xy(X_concat, y_vec)

The above is doing a simple feature union by combining inputs from multiple edges and sending “back” a single concatenated xy. As a simple transform, it needs to only implement the transform, fit_transform, clone, and get_estimator_type methods.

abstract clone()[source]

Abstract method and all estimators are supposed to implement these. Can be as simple as the basic python clone.

Returns

A cloned estimator

abstract fit(xy_list: list)[source]

An abstract method that needs to be implemented if a regressor or a classifier

Parameters

xy_list – List of xy

Returns

A single xy

abstract fit_transform(xy_list: list)[source]

An abstract method that needs to be implemented if a simple estimator

Parameters

xy_list – List of xy

Returns

A single xy

abstract get_estimator_type()[source]

Any and estimator needs to implement this type, it is ‘transform’ if a simple transformer or is a ‘classifier’ if classifier and ‘regressor’ if a regressor.

Returns

The type of the estimator

abstract predict(xy_list: list) codeflare.pipelines.Datamodel.Xy[source]

An abstract method that needs to be implemented if a regressor or a classifer

Parameters

xy_list – List of xy

Returns

A single xy

abstract score(xy_list: list) codeflare.pipelines.Datamodel.Xy[source]

An abstract method that needs to be implemented if a regressor or a classifier

Parameters

xy_list – List of xy

Returns

A single xy

abstract transform(xy_list: list) codeflare.pipelines.Datamodel.Xy[source]

An abstract method that needs to be implemented if a simple estimator

Parameters

xy_list – List of xy

Returns

A single xy

class codeflare.pipelines.Datamodel.AndNode(node_name: str, and_estimator: codeflare.pipelines.Datamodel.AndEstimator)[source]

Bases: codeflare.pipelines.Datamodel.Node

Basic and node, that’s capable of combining inputs from multiple edges. As such, it needs to have a AndEstimator implemented. The AndEstimator itself inherits from sklearn.BaseEstimator.

This estimator node is a typical AND node, with ANY firing semantics, and STATELESS state.

Examples

An example building on the one from AndEstimator:

feature_union_and_estimator = FeatureUnion()
node_union_and = dm.AndNode('and_node_sample', feature_union_and_estimator)
clone()[source]
class codeflare.pipelines.Datamodel.Edge(from_node: codeflare.pipelines.Datamodel.Node, to_node: codeflare.pipelines.Datamodel.Node)[source]

Bases: object

An edge connects two nodes, it’s an internal data structure for pipeline construction. An edge is a directed edge and has a “from_node” and a “to_node”.

An edge also defines a hash function and an equality, where the equality is on the from and to node names being the same.

get_from_node() codeflare.pipelines.Datamodel.Node[source]

The from_node of this edge (originating node)

Returns

The from_node of this edge

get_to_node() codeflare.pipelines.Datamodel.Node[source]

The to_node of this edge (terminating node)

Returns

The to_node of this edge

class codeflare.pipelines.Datamodel.EstimatorNode(node_name: str, estimator: sklearn.base.BaseEstimator)[source]

Bases: codeflare.pipelines.Datamodel.Node

Basic estimator node, which is the basic node that would be the equivalent of any SKlearn pipeline stage. This node is initialized with an estimator that needs to extend sklearn.BaseEstimator.

This estimator node is typically an OR node, with ANY firing semantics, and IMMUTABLE state. For partial fit, we will have to define a different node type to keep semantics very clear.

random_forest = RandomForestClassifier(n_estimators=200)
node_rf = dm.EstimatorNode('randomforest', random_forest)

# get the estimator
node_rf_estimator = node_rf.get_estimator()

# clone the node, clones the estimator as well
node_rf_cloned = node_rf.clone()
clone()[source]

Clones the given node and the underlying estimator as well, if it was initialized with

Returns

A cloned node

class codeflare.pipelines.Datamodel.KeyedObjectRef(obj_ref, key: Optional[object] = None)[source]

Bases: object

get_key()[source]
get_object_ref()[source]
class codeflare.pipelines.Datamodel.Node(node_name, estimator: sklearn.base.BaseEstimator, node_input_type: codeflare.pipelines.Datamodel.NodeInputType, node_firing_type: codeflare.pipelines.Datamodel.NodeFiringType, node_state_type: codeflare.pipelines.Datamodel.NodeStateType)[source]

Bases: abc.ABC

A node class that is an abstract one, this is capturing basic info re the Node. The hash code of this node is the name of the node and equality is defined if the node name and the type of the node match.

When doing a grid search, a node can be parameterized with new params for the estimator and updated. This is an internal method used by grid search.

abstract clone()[source]
get_estimator()[source]

Return the estimator of the node

Returns

The node’s estimator

get_node_firing_type() codeflare.pipelines.Datamodel.NodeFiringType[source]

Return the node firing type

Returns

The node firing type

get_node_input_type() codeflare.pipelines.Datamodel.NodeInputType[source]

Return the node input type

Returns

The node input type

get_node_name() str[source]

Returns the node name

Returns

The name of this node

get_node_state_type() codeflare.pipelines.Datamodel.NodeStateType[source]

Return the node state type

Returns

The node state type

get_parameterized_node(node_name, **params)[source]

Get a parameterized node, given kwargs **params, convert this node and update the estimator with the new set of parameters. It will clone the node and its underlying estimator.

Parameters
  • node_name – New node name

  • params – Updated parameters

Returns

class codeflare.pipelines.Datamodel.NodeFiringType(value)[source]

Bases: enum.Enum

Defines the “firing” semantics of a node, there are two types of firing semantics, ANY and ALL. ANY firing semantics means that upon the availability of a single object, the node will start executing its work. Whereas, on ALL semantics, the node has to wait for ALL the objects ot be materialized before the computation can begin, i.e. it is blocking.

For details on firing and pipeline semantics, the reader is directed to the pipeline semantics introduction of the User guide.

ALL = 1
ANY = (0,)
class codeflare.pipelines.Datamodel.NodeInputType(value)[source]

Bases: enum.Enum

Defines the node input types, currently, it supports an OR and AND node. An OR node is backed by an Estimator and an AND node is backed by an arbitrary lambda defined by an AndFunc. The key difference is that for an OR node, the parallelism is defined at a single XYRef object, whereas for an AND node, the parallelism is defined on a collection of objects coming “into” the AND node.

For details on parallelism and pipeline semantics, the reader is directed to the pipeline semantics introduction of the User guide.

AND = 1
OR = (0,)
class codeflare.pipelines.Datamodel.NodeStateType(value)[source]

Bases: enum.Enum

Defines the state type of a node, there are 4 types of state, which are STATELESS, IMMUTABLE, MUTABLE_SEQUENTIAL and MUTABLE_AGGREGATE.

A STATELESS node is one that keeps no state and can be called any number of times without any change to the “model” or “function” state.

A IMMUTABLE node is one that once a model has “fitted” cannot change, i.e. there is no partial fit available.

A MUTABLE_SEQUENTIAL node is one that can be updated with a sequence of input object(s) or a stream.

A MUTABLE_AGGREGATE node is one that can be updated in batches.

IMMUTABLE = (1,)
MUTABLE_AGGREGATE = 3
MUTABLE_SEQUENTIAL = (2,)
STATELESS = (0,)
class codeflare.pipelines.Datamodel.Pipeline[source]

Bases: object

The pipeline class that defines the DAG structure composed of Node(s). This is the core data structure that defines the computation graph. A key note is that unlike SKLearn pipeline, CodeFlare pipelines are “abstract” graphs and get realized only when executed. Upon execution, they can potentially be multiple pathways in the pipeline, i.e. multiple “single” pipelines can be realized.

Examples

Pipelines can be constructed quite simply using the builder paradigm with add_node and/or add_edge. In its simplest form, one can create nodes and then wire the DAG by adding edges. An example that does a simple pipeline is below:

feature_union = FeatureUnion(transformer_list=[('PCA', PCA()),
    ('Nystroem', Nystroem()), ('SelectKBest', SelectKBest(k=3))])
random_forest = RandomForestClassifier(n_estimators=200)
node_fu = dm.EstimatorNode('feature_union', feature_union)
node_rf = dm.EstimatorNode('randomforest', random_forest)
pipeline.add_edge(node_fu, node_rf)

One can of course construct complex pipelines with multiple outgoing edges as well. An example of one that explores multiple models is shown below:

preprocessor = ColumnTransformer(
transformers=[
    ('num', numeric_transformer, numeric_features),
    ('cat', categorical_transformer, categorical_features)])

classifiers = [
    RandomForestClassifier(),
    GradientBoostingClassifier()
]
pipeline = dm.Pipeline()
node_pre = dm.EstimatorNode('preprocess', preprocessor)
node_rf = dm.EstimatorNode('random_forest', classifiers[0])
node_gb = dm.EstimatorNode('gradient_boost', classifiers[1])

pipeline.add_edge(node_pre, node_rf)
pipeline.add_edge(node_pre, node_gb)

A pipeline can be saved and loaded, which in essence saves the “graph” and not the state of this pipeline. For saving the state of the pipeline, one can use the Runtime’s save method! Save/load of pipeline uses Pickle protocol 5.

fname = 'save_pipeline.cfp'
fh = open(fname, 'wb')
pipeline.save(fh)
fh.close()

r_fh = open(fname, 'rb')
saved_pipeline = dm.Pipeline.load(r_fh)
add_edge(from_node: codeflare.pipelines.Datamodel.Node, to_node: codeflare.pipelines.Datamodel.Node)[source]

Adds an edge to this pipeline

Parameters
  • from_node – The from node

  • to_node – The to node

Returns

None

add_node(node: codeflare.pipelines.Datamodel.Node)[source]

Adds a node to this pipeline

Parameters

node – The node to add

Returns

None

compute_max_level()[source]

Get the max depth of this pipeline graph.

Returns

The max depth of pipeline

compute_node_level(node: codeflare.pipelines.Datamodel.Node, result: dict)[source]

Computes the node levels for a given node, an internal supporting function that is recursive, so it takes the result computed so far.

Parameters
  • node – The node for which level needs to be computed

  • result – The node levels that have already been computed

Returns

The level for this node

compute_node_levels()[source]

Computes node levels for all nodes. If a cache of node levels from previous calls exists, it will return the cache to avoid repeated computation.

Returns

The mapping from node to its level as a dict

get_input_nodes()[source]

Returns all the input nodes of this pipeline

Returns

List of input nodes

get_node(node_name: str) codeflare.pipelines.Datamodel.Node[source]

Return the node given a node name

Parameters

node_name – Node name

Returns

The node with this node name

get_node_level(node: codeflare.pipelines.Datamodel.Node)[source]

Returns the node level for the given node, a number between 0 and max_level (depth of the DAG/Pipeline).

Parameters

node – Given node

Returns

Level between 0 and max_depth of pipeline

get_nodes()[source]

Returns all the nodes of this pipeline in a dict from node_name to the node

Returns

Dict of node_name to node

get_nodes_by_level()[source]

A mapping from level to a list of nodes, useful for pipeline execution time. Similar to compute_levels, this method will return a cache if it exists, else will compute the levels and cache it.

Returns

The mapping from level to a list of nodes at that level

get_output_nodes()[source]

Gets all the output nodes for this pipeline

Returns

List of output nodes

get_parameterized_pipeline(pipeline_param)[source]

Parameterizes the current pipeline with the provided pipeline_param and returns the newly parameterized pipeline. The pipeline_param is explored for all the parameters associated with a given node, which is then expanded to multiple nodes with generated node names. The graph is created using the existing connections, i.e. if there is an edge between node A and node B and with parameterization node B became node B1, node B2, an edge is created between node A and node B1 as well as node A and node B2.

Depending on the strategy of searches, the appropriate pipeline_param can create the right expansion. For example, grid search can expand the cross product of parameters and the pipeline will get expanded.

Examples

The below code shows an example of how a 2 step pipeline gets expanded to a 9 node pipeline for grid search.

pipeline = dm.Pipeline()
node_pca = dm.EstimatorNode('pca', pca)
node_logistic = dm.EstimatorNode('logistic', logistic)

pipeline.add_edge(node_pca, node_logistic)

param_grid = {
    'pca__n_components': [5, 15, 30, 45, 64],
    'logistic__C': np.logspace(-4, 4, 4),
}

pipeline_param = dm.PipelineParam.from_param_grid(param_grid)

param_grid_pipeline = pipeline.get_parameterized_pipeline(pipeline_param)
Parameters

pipeline_param – The pipeline parameters

Returns

A parameterized pipeline

get_post_edges(node: codeflare.pipelines.Datamodel.Node)[source]

Get the outgoing edges for the given node

Parameters

node – Given node

Returns

Outgoing edges for the node

get_post_nodes(node: codeflare.pipelines.Datamodel.Node)[source]

Returns the nodes which are “below” the given node, i.e., have incoming edges from the given node, empty if it is an output node

Parameters

node – Given node

Returns

List of nodes that have incoming edges to the given node

get_pre_edges(node: codeflare.pipelines.Datamodel.Node)[source]

Get the incoming edges to a specific node.

Parameters

node – Given node

Returns

Incoming edges for the node

get_pre_nodes(node: codeflare.pipelines.Datamodel.Node)[source]

Returns the nodes which are “above” the given node, i.e., have outgoing edges from the given node, empty if it is an input node

Parameters

node – Given node

Returns

List of nodes that have outgoing edges to the given node

static get_str(nodes: list)[source]
has_single_estimator()[source]

Checks if this pipeline has only a single OR estimator, this is useful to know when picking a specific pipeline

Returns

True if only one OR estimator else False

is_input(node: codeflare.pipelines.Datamodel.Node)[source]

Checks if the given node is an input node of this pipeline

Parameters

node – Given node

Returns

True if input node else False

is_output(node: codeflare.pipelines.Datamodel.Node)[source]

Checks if the given node is an output node

Parameters

node – Given node

Returns

True if output else False

static load(filehandle)[source]

Loads a pipeline that has been saved given the filehandle. Filehandle is in rb format.

Parameters

filehandle – Filehandle to load pipeline from

Returns

save(filehandle)[source]

Saves the pipeline graph (without state) to a file. A filehandle with write and binary mode is expected.

Parameters

filehandle – Filehandle with wb mode

Returns

None

class codeflare.pipelines.Datamodel.PipelineInput[source]

Bases: object

This is a holder to capture the input to the pipeline in an appropriate manner. Internally, it holds the input from a node to a pointer to XYref, i.e. it only holds pointers. It does not hold the entire data. This is key to distributing the data in the object store.

Examples

The simplest way to add input to a node is by specifying the X and y args, the underlying platform will take care of distributing it to the backend in-memory object storage.

pipeline_input = dm.PipelineInput()
pipeline_input.add_xy_arg(node_a, dm.Xy(X_train, y_train))
add_all(node, node_inargs)[source]

Adds all the in args to a given node, this is very useful when cloning a pipeline or “morphing” it for grid search, etc.

Parameters
  • node – Node to which input needs to be added

  • node_inargs – All the in args, which will be added whole

Returns

None

add_xy_arg(node: codeflare.pipelines.Datamodel.Node, xy: codeflare.pipelines.Datamodel.Xy)[source]

The most common way of adding input to a node, by providing a xy.

Parameters
  • node – Node to which input needs to be added

  • xy – The xy to be added

Returns

None

add_xyref_arg(node: codeflare.pipelines.Datamodel.Node, xyref: codeflare.pipelines.Datamodel.XYRef)[source]

A convenience method that adds a XYRef to the given node as input.

Parameters
  • node – Node to which input needs to be added

  • xyref – The XYRef

Returns

None

add_xyref_ptr_arg(node: codeflare.pipelines.Datamodel.Node, xyref_ptr)[source]

A direct pointer input addition, this is typically used in internal methods, but enables the advanecd developer to have direct access to the pipeline internals.

Parameters
  • node – Node to which input needs to be added

  • xyref_ptr – The pointer to XYref

Returns

None

get_in_args()[source]

Returns the dict with the node to in args mapping

Returns

The internal structure holding the in args

get_parameterized_input(pipeline: codeflare.pipelines.Datamodel.Pipeline, parameterized_pipeline: codeflare.pipelines.Datamodel.Pipeline)[source]

This is meant to create a parameterized input given a pipeline and the parameterized pipeline. This method will explore the nodes from pipeline that are matching the parameterized_pipeline and copy the input over to the appropriate nodes of the parameterized_pipeline.

Parameters
  • pipeline – The original pipeline

  • parameterized_pipeline – The parameterized pipeline corresponding to the original pipeline

Returns

The parameterized input for the given parameterized_pipeline

class codeflare.pipelines.Datamodel.PipelineOutput(out_args, edge_args)[source]

Bases: object

Pipeline output to keep reference counters so that pipelines can be materialized

get_edge_args()[source]
get_out_args()[source]
get_xyrefs(node: codeflare.pipelines.Datamodel.Node)[source]
class codeflare.pipelines.Datamodel.PipelineParam[source]

Bases: object

This class captures the pipeline parameters, which can be changed for various forms of exploration. It is a fairly simple holder class capturing for each node, the corresponding estimators parameters as a dictionary.

It also provides creating a PipelineParam object from a parameter grid, typically used in sklearn.GridSearchCV.

Examples

A simple example to create a pipeline param from a parameter grid.

param_grid = {
    'pca__n_components': [5, 15, 30, 45, 64],
    'logistic__C': np.logspace(-4, 4, 4),
}

pipeline_param = dm.PipelineParam.from_param_grid(param_grid)
add_param(node_name: str, params: dict)[source]

Add a parameter to the given node name

Parameters
  • node_name – Node name to add parameter to

  • params – Parameter as a dictionary

Returns

None

static from_param_grid(fit_params: dict)[source]

A method to create a a pipeline param object from a typical parameter grid with the standard sklearn convention of __. For example, pca__n_components is a parameter for node name pca and the parameter name is n_components. The parameter grid creates a full grid exploration of the parameters.

Parameters

fit_params – Dictionary of parameter name in the sklearn convention to the parameter list

Returns

A pipeline param object

get_all_params()[source]

Return all the parmaters for the given pipeline param

Returns

A dict from node name to the dictionary of parameters

get_param(node_name: str)[source]

Returns the parameter dict for the given node name

Parameters

node_name – Node name to retrieve parameters for

Returns

Dict of parameters

class codeflare.pipelines.Datamodel.XYRef(Xref: ray._raylet.ObjectRef, yref: ray._raylet.ObjectRef, prev_node_state_ref: Optional[ray._raylet.ObjectRef] = None, curr_node_state_ref: Optional[ray._raylet.ObjectRef] = None, prev_Xyrefs=None)[source]

Bases: object

Holder class that maintains a pointer/reference to X and y. The goal of this is to provide a holder to the object references of Ray. This is used for passing outputs from a transform/fit to the next stage of the pipeline. Since the object references can be potentially in flight (or being computed), these holders are essential to the pipeline constructs.

It also holds the state of the node itself, with the previous state of the node before a transform operation is applied being held along with the next state. It also holds the previous XYRef instances. In essence, this holder class is a bunch of pointers, but it is enough to reconstruct the entire pipeline through appropriate traversals.

NOTE: Default constructor takes pointer to X and y. The more advanced constructs are pointer holders for the pipeline during its execution and are not meant to be used outside by developers.

Examples

x = np.array([1.0, 2.0, 4.0, 5.0])
y = np.array(['odd', 'even', 'even', 'odd'])
x_ref = ray.put(x)
y_ref = ray.put(y)

xy_ref = XYRef(x_ref, y_ref)
get_Xref() ray._raylet.ObjectRef[source]

Getter for the reference to X

Returns

ObjectRef to X

get_curr_node_state_ref() ray._raylet.ObjectRef[source]

Getter for the reference to current node state

Returns

ObjectRef to current node state

get_prev_node_state_ref() ray._raylet.ObjectRef[source]

Getter for the reference to previous node state

Returns

ObjectRef to previous node state

get_prev_xyrefs()[source]

Getter for the list of previous XYrefs

Returns

List of XYRefs

get_yref() ray._raylet.ObjectRef[source]

Getter for the reference to y

Returns

ObjectRef to y

class codeflare.pipelines.Datamodel.Xy(X, y)[source]

Bases: object

Holder class for Xy, where X is array-like and y is array-like. This is the base data structure for fully materialized X and y.

Examples

x = np.array([1.0, 2.0, 4.0, 5.0])
y = np.array(['odd', 'even', 'even', 'odd'])
xy = Xy(x, y)
get_x()[source]

Getter for X

Returns

Holder value of X

get_y()[source]

Getter for y

Returns

Holder value of y

CodeFlare Pipelines Runtime

codeflare.pipelines.Runtime This class is the core runtime for CodeFlare pipelines. It provides the entry point for execution of the pipeline that was constructed from codeflare.pipelines.Datamodel. The key entry point is the basic execute_pipeline, with other enhanced entry points such as cross_validate and grid_search_cv.

The other methods provide supporting functions for execution of pipeline primitives. In addition to this, methods for selecting a pipeline are provided as well as saving a specific pipeline instance along with that pipeline’s state.

Details on the execution and parallelism exposed are provided in the design documentation.

class codeflare.pipelines.Runtime.ExecutionType(value)[source]

Bases: enum.Enum

Pipelines can be executed in different modes, this is targeting the typical AI/ML parlance, with the supported types being FIT for training a pipeline, PREDICT for predicting/transforming on the steps of a pipeline, and finally SCORE, which scores against a given input.

FIT = (0,)
PREDICT = (1,)
SCORE = 2
codeflare.pipelines.Runtime.cross_validate(cross_validator: sklearn.model_selection._split.BaseCrossValidator, pipeline: codeflare.pipelines.Datamodel.Pipeline, pipeline_input: codeflare.pipelines.Datamodel.PipelineInput)[source]

Similar to sklearn cross validate, but a parallelized version on Ray with zero copy sharing of data. This method allows for the user to explore a pipeline with a single input object to be explored by cross validation. The output is a list of scores that correspond to the SCORE mode of the pipeline execution.

Examples

Cross validation is quite simple:

kf = StratifiedKFold(n_splits=10)
scores = rt.cross_validate(kf, pipeline, pipeline_input)
Parameters
  • cross_validator – Cross validator to use

  • pipeline – Pipeline to execute

  • pipeline_input – Input to the pipeline

Returns

Scored outputs from the pipeline

codeflare.pipelines.Runtime.execute_and_node(node, pre_edges, edge_args, post_edges, mode: codeflare.pipelines.Runtime.ExecutionType)[source]

Inner method that executes an and node by combining the inputs coming from multiple edges. Unlike the OR node, which only executes a remote task per input object, the and node combines input from across all the edges. For example, if there are two edges incoming to this node with two objects each, the combiner will create four input combinations. Each of these input combinations is then evaluated by the AND node in parallel.

The result is then sent to the edges outgoing from this node.

Parameters
  • node – Node to execute on

  • pre_edges – Incoming edges to this node

  • edge_args – Data arguments for each of this edge

  • post_edges – Outgoing edges

  • mode – Execution mode

Returns

None

codeflare.pipelines.Runtime.execute_and_node_inner(node: codeflare.pipelines.Datamodel.AndNode, mode: codeflare.pipelines.Runtime.ExecutionType, Xyref_ptrs)[source]

This is a helper method for executing and nodes, which fires off remote tasks. Unlike the helper for OR nodes, which can fire off on single objects, this method retrieves the list of inputs, unmarshals the pointers to XYrefs to materialize XYRef and then passes it along to the and node remote executor.

Parameters
  • node – Node to execute on

  • mode – Mode of execution

  • Xyref_ptrs – Object ref pointers for data input

Returns

codeflare.pipelines.Runtime.execute_or_node(node, pre_edges, edge_args, post_edges, mode: codeflare.pipelines.Runtime.ExecutionType)[source]

Inner method that executes the estimator node parallelizing at the level of input objects. This defines the strategy of execution of the node, in this case, parallel for each object that is input. The function takes in the edges coming to this node (pre_edges) and the associated arguments (edge_args) and fires off remote tasks for each of the objects (this is defined by the ANY firing semantics). The resulting pointer(s) are then captured and passed to the post_edges.

Parameters
  • node – Node to execute

  • pre_edges – Input edges to the given node

  • edge_args – Data arguments for the edges

  • post_edges – Data arguments for downstream processing

  • mode – Execution mode

Returns

None

codeflare.pipelines.Runtime.execute_pipeline(pipeline: codeflare.pipelines.Datamodel.Pipeline, mode: codeflare.pipelines.Runtime.ExecutionType, pipeline_input: codeflare.pipelines.Datamodel.PipelineInput) codeflare.pipelines.Datamodel.PipelineOutput[source]

The entry point for a basic pipeline execution. This method takes a pipeline, the input to it and the execution mode and runs the pipeline. Based on the parallelism defined by the DAG structure and the input data, the execution of the pipeline will happen in parallel.

In the FIT mode of execution, the pipeline can materialize into several pipelines which can be examined in further detail based on metrics of interest. The method select_pipeline enables selecting a specific pipeline to examine further.

A selected pipeline can be executed in SCORE and PREDICT modes for evaluating the results or saving them for future reuse.

Examples

Execution of pipeline is fairly simple and getting the output can be done:

pipeline_output = rt.execute_pipeline(pipeline, rt.ExecutionType.FIT, pipeline_input)
node_rf_xyrefs = pipeline_output.get_xyrefs(node_rf)
Parameters
  • pipeline – Abstract DAG representation of the pipeline

  • mode – Execution mode

  • pipeline_input – The input to this pipeline

Returns

Pipeline output

codeflare.pipelines.Runtime.get_pipeline_input(pipeline: codeflare.pipelines.Datamodel.Pipeline, pipeline_output: codeflare.pipelines.Datamodel.PipelineOutput, chosen_xyref: codeflare.pipelines.Datamodel.XYRef) codeflare.pipelines.Datamodel.PipelineInput[source]

Given the output from a pipeline and a chosen output object, this method gets the inputs that were used to generate this output. Combining the input and the selected pipeline, one can then actually recreate the full provenance – graph and data to execute the selected pipeline.

Note that once the persistence of objects in memory or other persistent stores is lost, it is not possible to get the data.

Parameters
  • pipeline – Executed pipeline

  • pipeline_output – Output from the executed pipeline

  • chosen_xyref – Chosen object from the output

Returns

The pipeline input (for the given chosen object)

codeflare.pipelines.Runtime.grid_search_cv(cross_validator: sklearn.model_selection._split.BaseCrossValidator, pipeline: codeflare.pipelines.Datamodel.Pipeline, pipeline_input: codeflare.pipelines.Datamodel.PipelineInput, pipeline_params: codeflare.pipelines.Datamodel.PipelineParam)[source]

A top-level method that does a grid search with cross validation. This method takes pipeline, the input to it, a set of parameters for the pipeline, and a cross validator similar to the traditional GridSearchCV of sklearn and executes the various pipelines and cross validation in parallel.

This method will first transform the input pipeline and expand it to perform a parameter grid search and then the cross validator is run in parallel. The goal is to execute each of the cross validation for each of the parameter combination in parallel to provide the results.

The results are captured in a dict that maps each pipeline to its corresponding cross validation scores.

Examples

An example of grid search using a parameter grid similar to what SKLearn does:

k = 2
kf = KFold(k)
result = rt._grid_search_cv(kf, pipeline, pipeline_input)

# Results can be examined by iterating over the pipeline, for example to pick a best pipeline based
# on mean scores
best_pipeline = None
best_mean_scores = 0.0

for cv_pipeline, scores in result.items():
    mean = statistics.mean(scores)
    if mean > best_mean_scores:
        best_pipeline = cv_pipeline
        best_mean_scores = mean
Parameters
  • cross_validator – Cross validator for grid search

  • pipeline – Pipeline graph

  • pipeline_input – Input to the pipeline

  • pipeline_params – Parameter space to explore using a grid search approach

Returns

Dict from pipeline to the cross validation scores

codeflare.pipelines.Runtime.save(pipeline_output: codeflare.pipelines.Datamodel.PipelineOutput, xy_ref: codeflare.pipelines.Datamodel.XYRef, filehandle)[source]

Saves a selected pipeline, i.e. this selected pipeline will save the state of the estimators enabling for the end user to load and execute the pipeline in SCORE/PREDICT modes in the future.

Examples

Saving a selected pipeline can be done as follows:

# this pipeline can also be saved
fname = 'random_forest.cfp'
w_fh = open(fname, 'wb')
rt.save(pipeline_output, node_rf_xyrefs[0], w_fh)
w_fh.close()
Parameters
  • pipeline_output – Pipeline output from an executed pipeline

  • xy_ref – The chosen XYRef that will be used to materialize a selected pipeline

  • filehandle – The file handle to save this pipeline to

Returns

None

codeflare.pipelines.Runtime.select_pipeline(pipeline_output: codeflare.pipelines.Datamodel.PipelineOutput, chosen_xyref: codeflare.pipelines.Datamodel.XYRef) codeflare.pipelines.Datamodel.Pipeline[source]

Pipeline execution results in a materialization of several pipelines, this entry point method enables the end user to select a specific pipeline to examine in further detail. Typical way of examining a pipeline is to select a specific output and then “request” which pipeline generated it.

Internally, the runtime has generated “trackers” to keep a lineage for every input and output and which node generated it. These are then selected to create the appropriate pipeline that can be scored, predicted, and saved.

Examples

Selecting a pipeline can be done by identifying an output object of interest. One can select the pipeline without going to the output node, i.e. looking at some internal nodes as well

# one can examine the output in more detail and select a pipeline of interest
selected_pipeline = rt.select_pipeline(pipeline_output, node_rf_xyrefs[0])
Parameters
  • pipeline_output – Pipeline output from execute pipeline

  • chosen_xyref – The XYref for which the pipeline needs to be selected

Returns

Selected pipeline

CodeFlare Pipelines Exceptions

codeflare.pipelines.Exceptions The exceptions that pipeline creation and execution throw are defined here.

exception codeflare.pipelines.Exceptions.BasePipelineException[source]

Bases: Exception

Base pipeline exception

exception codeflare.pipelines.Exceptions.PipelineException(message)[source]

Bases: codeflare.pipelines.Exceptions.BasePipelineException

Generic pipeline exceptions

exception codeflare.pipelines.Exceptions.PipelineNodeNotFoundException(message)[source]

Bases: codeflare.pipelines.Exceptions.BasePipelineException

Exception thrown when a node is not found in a pipeline, this can typically happen when pipelines are not properly constructed.

exception codeflare.pipelines.Exceptions.PipelineSaveException(message)[source]

Bases: codeflare.pipelines.Exceptions.BasePipelineException

Exception thrown when a pipeline save fails