Model build and predict
This example file will walk you through the steps involved to build an ML model using historic data and predict on new incoming data.
The example provides historic sensor data of windturbines and their failures. A model will be built based on this historic data. New daily sensor data will be passed through the model to predict failures.
Download the project files here: Reference Project
Building an ML model and predicting on RapidCanvas involves the following steps:
Import functions
Authenticate your client
Create a custom environment
Create a new project
Fetch pre-built templates
Set project variables and create scenarios
Set project varaibles
Create relevant scenarios
Create a build scenario
Create a predict scenario
Create a build pipeline
Add Input Datasets
Transform your raw data
Create recipe to fill nulls
Create recipe to clean data
Create recipe for tsfresh features
Build ML model
Create a recipe to add labels to the dataset
Create a recipe to build a random forest model
Create a predict pipeline
Add Input Datasets
Transform your raw data
Create recipe to fill nulls
Create recipe to clean raw data
Create recipe to add prediction features
Create recipe for tsfresh features
Model Prediction
Create a recipe for model prediction
Run scenarios
Run predict scenario for model prediction
Run build scenario for model building
Import function
from utils.rc.client.requests import Requests
from utils.rc.client.auth import AuthClient
from utils.rc.dtos.project import Project
from utils.rc.dtos.dataset import Dataset
from utils.rc.dtos.recipe import Recipe
from utils.rc.dtos.transform import Transform
from utils.rc.dtos.template import Template
from utils.rc.dtos.template import TemplateTransform
from utils.rc.dtos.template import TemplateInput
from utils.rc.dtos.env import Env
from utils.rc.dtos.env import EnvType
from utils.rc.dtos.template_v2 import TemplateV2, TemplateTransformV2
from utils.rc.dtos.global_variable import GlobalVariable
from utils.rc.dtos.scenario import RecipeExpression
from utils.rc.dtos.scenario import Operator
from utils.rc.dtos.scenario import Scenario
from utils.rc.dtos.dataSource import DataSource
from utils.rc.dtos.dataSource import DataSourceType
from utils.rc.dtos.dataSource import GcpConfig
import logging
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
## Authenticate your client
## Create a custom environment
Here are the available custom environments and their usage gudelines
| SMALL: 1 Core, 2GB Memmory
| MEDIUM: 2 Cores, 4GB Memmory
| LARGE: 4 Cores, 8GB Memmory
| CPU_LARGE: 8 Cores, 16GB Memmory
| MAX_LARGE: 12 Cores, 32GB Memmory
| EXTRA_MAX_LARGE: 12 Cores, 48GB Memmory
```ipython3
## Environment Creation
env = Env.createEnv(
name="env_build_predict",
description="Max large env for running build and predict",
envType=EnvType.MAX_LARGE,
requirements=""
)
env.id
## Create a Project
Create a new project under your tenant
```ipython3
project_name = "Build and Predict"
description = "One project for build and predict with 2 pipelines"
icon = "https://rapidcanvas.ai/wp-content/uploads/2022/09/windturbine_med.jpg"
project = Project.create(
name=project_name,
description=description,
icon=icon,
envId=env.id,
# createEmpty=True
)
project.id
**This has now created a new project named “Build and Predict” under
your tenant. You can check the created project on the RapidCanvas UI by logging in
here:** [RapidCanvas UI](https://staging.dev.rapidcanvas.net/)
### Getting Templates
You can use pre-built RapidCanvas templates as part of your project.
In this section, we are defining some prebuilt templates which will be
used during the build pipeline.
```ipython3
# This gets all available templates
templates = TemplateV2.get_all()
```ipython3
# Relevant templates for this project are being fetched
fill_null_template = TemplateV2.get_template_by('Fill Null Timeseries')
undersample_timeseries_template = TemplateV2.get_template_by('Undersample Timeseries Data')
tsfresh_template = TemplateV2.get_template_by('Tsfresh Features')
time_to_event_template = TemplateV2.get_template_by('Time To Event')
RandomForest_template = TemplateV2.get_template_by('Random Forest')
## Set project variables and create scenarios
### Add project variables
Project variables are stored as key value pairs at the project level and
the name of the variable can be referred to using the “@variable name”
notion to pass the corresponding value. In this case we are creating a
global variable called model_global which can be used to determine
whether to run the build pipeline or a predict pipeline.
```ipython3
globalVariable = GlobalVariable(
name="mode_global",
project_id=project.id,
type="string",
value="build"
)
globalVariable.create()
### Create relevant scenarios
A scenario is created with in a project and allows to run a pipeline or
a recipe only when certain conditions are met. We are using scenarios in
this example to either just run the build pipeline or the predict
pipeline.
#### Build Scenario
As part of the build scenario, our global variable mode_global is
changed to “build” which will only run the build pipeline and skip the
predict pipeline. After your first build run, you will typically only
re-run your predict pipeline. However if you have some new historic
data, or want to rebuild the model, you can re-run the build scenario.
```ipython3
build_scenario = project.add_scenario(
name='build_scenario',
description='Model Build',
shared_variables=dict(mode_global="build")
)
#### Predict Scenario
As part of the predict scenario, our global variable mode_global is
changed to “predict” which will only run the predict pipeline and skip
the build pipeline.
In our example, we we will run predict scenario, every time we have a
new file to predict. During prediction we will be using the model which
was already built during the build pipeline.
```ipython3
predict_scenario = project.add_scenario(
name='predict_scenario',
description='Model Predict',
shared_variables=dict(mode_global="predict")
)
## Create a build pipeline
As part of the section, we will follow all the relevant steps to build
an ML model using historic data.
### Add Input Datasets - Build pipeline
As part of the build pipeline, we are adding 2 data sets to the project,
sensor data and failures data. Sensor data contains all the historic
data which was collected from wind turbine sensors for a given time
period.
Failures data contains the list of turbine components and their
corresponding failure timestamp along with remarks
```ipython3
sensorsDataset = project.addDataset(
dataset_name="sensor_events",
dataset_description="Sensor data of wind turbines",
dataset_file_path="data/sensor_edp.csv"
)
labelsDataset = project.addDataset(
dataset_name="incident_events",
dataset_description="Labels data of wind turbines",
dataset_file_path="data/failures_edp.csv"
)
## Transform your raw data
### Create a recipe to fill nulls
This recipe cleans up the sensor data by identifying any nulls and
filling them as per the chosen method.
Note that we have added a first line before the recipe, to define a
build_mode. A build_mode will make sure this recipe will only run when
the value of “mode_global” is set to “build”. If not, this recipe run
will be skipped. Please note if this recipe run is skipped, everything
downstream associated with this recipe will also be skipped.
```ipython3
build_mode = RecipeExpression(field='@mode_global', operator=Operator.EQUAL_TO, value='build')
fill_null_recipe = project.addRecipe([sensorsDataset], name='fill_null_recipe', condition=build_mode)
fill_null = Transform()
fill_null.templateId = fill_null_template.id
fill_null.name='fill_null'
fill_null.variables = {
'inputDataset': "sensor_events",
'columns':'',
'Group_by':'Turbine_ID',
'how':'ffill',
'Timestamp': 'Timestamp',
'outputDataset':'fill_null_output'
}
fill_null_recipe.add_transform(fill_null)
fill_null_recipe.run()
```ipython3
fill_null_dataset = fill_null_recipe.getChildrenDatasets()['fill_null_output']
fill_null_dataset.getData(5)
### Create a recipe to clean data
This recipe takes the fill null output and uses undersample timeseries
to clean the sensor dataset
Note that we have not added any condition to run this only for the build
pipeline. It is optional at this point because this is connected to the
output of the previous recipe which had the condition in place. If the
satisfied on the first recipe, we are expecting everything else to run
downstream. If the condition is not met on the first recipe, it will be
skipped along with everything downstream.
```ipython3
sensor_cleaning_recipe = project.addRecipe([fill_null_dataset], name='sensor_cleaning_recipe')
undersample_timeseries = Transform()
undersample_timeseries.templateId = undersample_timeseries_template.id
undersample_timeseries.name='undersample_timeseries'
undersample_timeseries.variables = {
'inputDataset': "fill_null_output",
'Col_to_undersample_by':'Turbine_ID',
'Timestamp':"Timestamp",
'Frequency': "D",
'Resample_type': "MEAN",
'outputDataset':'sensor_cleaned'
}
sensor_cleaning_recipe.add_transform(undersample_timeseries)
sensor_cleaning_recipe.run()
#### Output dataset and review sample
```ipython3
sensor_cleaned = sensor_cleaning_recipe.getChildrenDatasets()['sensor_cleaned']
sensor_cleaned.getData(5)
### Create a recipe for tsfresh features
This recipe takes the historic cleaned sensor output data and generates
30 day aggregates for each row of data. This generates all the
additional features we need.
Please note that rows where 30 days of historic data is not available
will be dropped at this step.
```ipython3
sensor_tsfresh = project.addRecipe([sensor_cleaned], name='sensor_tsfresh')
tsfresh = Transform()
tsfresh.templateId = tsfresh_template.id
tsfresh.name='tsfresh'
tsfresh.variables = {
'inputDataset': "sensor_cleaned",
"max_timeshift":30,
"min_timeshift":30,
"entity":'Turbine_ID',
"time":'Timestamp',
"large":"True",
"outputDataset": "sensor_ts_fresh"
}
sensor_tsfresh.add_transform(tsfresh)
sensor_tsfresh.run()
#### Output dataset and review sample
```ipython3
sensor_tsfresh_dataset = sensor_tsfresh.getChildrenDatasets()['sensor_ts_fresh']
sensor_tsfresh_dataset.getData(5)
## Build ML model
### Create a recipe to add labels to the dataset
As part of the model building step, we first join our dataset with new
features with the failures dataset which we uploaded at the start of the
build pipeline.
```ipython3
join_time_to_failure_recipe=project.addRecipe([sensor_tsfresh_dataset, labelsDataset], name='join_time_to_failure_recipe')
time_to_failure = Transform()
time_to_failure.templateId = time_to_event_template.id
time_to_failure.name='time_to_failure'
time_to_failure.variables = {
'EventDataset':labelsDataset.name,
'TimeSeriesDataset':'sensor_ts_fresh',
'Eventkey':'Turbine_ID',
'TimeSerieskey':'Turbine_ID',
'EventTimestamp':'Timestamp',
'TimeSeriesTimestamp':'Timestamp',
'UnitOfTime':'days',
'outputDataset':'time_to_failure_dataset'
}
join_time_to_failure_recipe.add_transform(time_to_failure)
join_time_to_failure_recipe.run()
#### Output dataset and review sample
```ipython3
time_to_failure_dataset = join_time_to_failure_recipe.getChildrenDatasets()['time_to_failure_dataset']
time_to_failure_dataset.getData(5)
### Create a recipe to build a random forest model
In this step we build the ML model. Please note that once the model is
built, it is automatically stored in RapidCanvas repository and can be
retrieved for prediction in the later steps.
Note that this marks the end of the build pipeline
```ipython3
template = TemplateV2(
name="LocalRandomForest", description="LocalRandomForest", project_id=project.id, source="CUSTOM", status="ACTIVE", tags=["Number", "datatype-long"]
)
template_transform = TemplateTransformV2(type = "python", params=dict(notebookName="Local-Random-Forest.ipynb"))
template.base_transforms = [template_transform]
template.publish("transforms/Local-Random-Forest.ipynb")
```ipython3
randomforest_recipe=project.addRecipe([time_to_failure_dataset], 'LocalRandomForest')
RandomForest = Transform()
RandomForest.templateId = template.id
RandomForest.name='RandomForest'
RandomForest.variables = {
'inputDataset': "time_to_failure_dataset",
'target':'time_to_event',
'train_size':0.8,
'model_to_save':'ml_random_forest_v15'
}
randomforest_recipe.add_transform(RandomForest)
```ipython3
randomforest_recipe.run()
#### Output dataset and review sample
```ipython3
children = randomforest_recipe.getChildrenDatasets()
```ipython3
children['Test_with_prediction'].getData(5)
```ipython3
children['Train_with_prediction'].getData(5)
## Model Prediction Pipeline
Now that we have built an ML model, we can start building our pipeline to
utilize the model for predicting on new sensor data.
### Add input dataset - Daily prediction files
This is the new sensor data which has not been used by the model and
will be predicted on.
```ipython3
dailyDataset = project.addDataset(
dataset_name="daily_events",
dataset_description="Daily data of wind turbine for prediction",
dataset_file_path="data/daily_data/data-with-features.csv"
)
## Transform your raw data
### Create recipe to fill nulls
We follow the same set of data cleaning steps for the new data as we
have done during the build pipeline. Do note that we have added a first
line before the recipe, to define a predict mode.
A predict mode will make sure this recipe will only run when the value
of “mode_global” is set to “predict”. If not, this recipe run will be
skipped. Please note if this recipe run is skipped, everything
downstream associated with this recipe will also be skipped.
```ipython3
predict_mode = RecipeExpression(field='@mode_global', operator=Operator.EQUAL_TO, value='predict')
fill_null_recipe_predict = project.addRecipe([dailyDataset], name='fill_null_recipe_predict', condition=predict_mode)
fill_null = Transform()
fill_null.templateId = fill_null_template.id
fill_null.name='fill_null'
fill_null.variables = {
'inputDataset': "daily_events",
'columns':'',
'Group_by':'Turbine_ID',
'how':'ffill',
'Timestamp': 'Timestamp',
'outputDataset':'fill_null_output_predict'
}
fill_null_recipe_predict.add_transform(fill_null)
fill_null_recipe_predict.run()
```ipython3
fill_null_predict_dataset = fill_null_recipe_predict.getChildrenDatasets()['fill_null_output_predict']
fill_null_predict_dataset.getData(5)
### Create recipe to clean data
This recipe takes the fill null output of the new data and uses
undersample timeseries to clean the dataset
Note that we have not added any condition to run this only for the
predict pipeline. It is optional at this point because this is connected
to the output of the previous recipe which had the condition in place.
If the condition is satisfied on the first recipe, we are expecting
everything else to run downstream. If the condition is not met on the
first recipe, it will be skipped along with everything downstream.
```ipython3
sensor_cleaned_recipe_predict = project.addRecipe([fill_null_predict_dataset], name='sensor_cleaned_recipe_predict')
undersample_timeseries = Transform()
undersample_timeseries.templateId = undersample_timeseries_template.id
undersample_timeseries.name='undersample_timeseries'
undersample_timeseries.variables = {
'inputDataset': "fill_null_output_predict",
'Col_to_undersample_by':'Turbine_ID',
'Timestamp':"Timestamp",
'Frequency': "D",
'Resample_type': "MEAN",
'outputDataset':'sensor_cleaned_predict'
}
sensor_cleaned_recipe_predict.add_transform(undersample_timeseries)
sensor_cleaned_recipe_predict.run()
```ipython3
sensor_cleaned_predict_dataset = sensor_cleaned_recipe_predict.getChildrenDatasets()['sensor_cleaned_predict']
sensor_cleaned_predict_dataset.getData(5)
### Create recipe to add prediction features
This step is new to the predict pipeline compared to the build pipeline.
During the build pipeline, all the historic data was available to
generate aggregates. However during predict pipeline, we only have
access to that days data. To generate 30 day aggregates we need to go to
the historic data and pull the relevant 30 days historic data for each
of these rows.
This recipe provides the ability to go to the feature store and pull the
necessary data for generating 30 day aggregates.
**Do note that this feature is still in beta and might change in the
future**
```ipython3
addFeaturesRecipe = project.addRecipe([sensor_cleaned_predict_dataset], name="addFeatures")
```ipython3
template = TemplateV2(
name="AddFeaturesPredict", description="AddFeaturesPredict", project_id=project.id, source="CUSTOM", status="ACTIVE", tags=["Number", "datatype-long"]
)
template_transform = TemplateTransformV2(type = "python", params=dict(notebookName="Add-Features-Predict.ipynb"))
template.base_transforms = [template_transform]
template.publish("transforms/Add-Features-Predict.ipynb")
```ipython3
transform = Transform()
transform.templateId = template.id
transform.name = "addFeatures"
transform.variables = {
"cleanedDataset": "sensor_cleaned_predict",
"outputDataset": "addFeaturesOutput"
}
```ipython3
addFeaturesRecipe.addTransform(transform)
addFeaturesRecipe.run()
```ipython3
added_features_dataset = addFeaturesRecipe.getChildrenDatasets()['addFeaturesOutput']
added_features_dataset.getData(5)
### Create recipe for tsfresh features
Now that we have the necessary historic data available, this recipe
generates 30 day aggregates for each row of data. This generates all the
tsfresh features we need.
Please note that rows where 30 days of historic data is not available
will be dropped at this step.
```ipython3
sensor_tsfresh_predict = project.addRecipe([added_features_dataset], name='sensor_tsfresh_predict')
tsfresh = Transform()
tsfresh.templateId = tsfresh_template.id
tsfresh.name='tsfresh'
tsfresh.variables = {
'inputDataset': "addFeaturesOutput",
"max_timeshift":30,
"min_timeshift":30,
"entity":'Turbine_ID',
"time":'Timestamp',
"large":"True",
"outputDataset": "sensor_ts_fresh_predict"
}
sensor_tsfresh_predict.add_transform(tsfresh)
sensor_tsfresh_predict.run()
```ipython3
sensor_tsfresh_predict_dataset = sensor_tsfresh_predict.getChildrenDatasets()['sensor_ts_fresh_predict']
```ipython3
sensor_tsfresh_predict_dataset.getData(5)
### Create a recipe for model prediction
In this step, we pass the feature enriched daily dataset to our
previously stored model. All you need to provide is the model name and
RapidCanvas run the dataset through it.
```ipython3
prediction_template = TemplateV2(
name="Model Prediction", description="Pick a model to run the prediction on the dataset",
source="CUSTOM", status="ACTIVE", tags=["UI", "Aggregation"], project_id=project.id
)
prediction_template_transform = TemplateTransformV2(
type = "python", params=dict(notebookName="Prediction.ipynb"))
prediction_template.base_transforms = [prediction_template_transform]
prediction_template.publish("transforms/Prediction.ipynb")
```ipython3
predictor_transform = Transform()
predictor_transform.templateId = prediction_template.id
predictor_transform.name='predictor'
predictor_transform.variables = {
'inputDataset': "sensor_ts_fresh_predict",
"modelName":"ml_random_forest_v15"
}
```ipython3
predictor = project.addRecipe([sensor_tsfresh_predict_dataset], name='predictor')
```ipython3
predictor.add_transform(predictor_transform)
predictor.run()
#### Output dataset and review sample
```ipython3
predictions = predictor.getChildrenDatasets()['prediction']
predictions.getData(5)
## Run only predict scenario
If you get new data sets on a daily basis, you can update the
daily_events dataset and just run the predict scenario, which will
ensure that only the predict pipeline runs and the build pipeline is
skipped.
You can review the same by changing the scenario dropdown to
predict_scenario on the canvas view on [RapidCanvas
UI](https://staging.dev.rapidcanvas.net/)
```ipython3
#project.run_scenario(predict_scenario._id)
## Run only build scenario
If you want to re-build the model for any reason, you can just run the
build scenario which will ensure the build pipeline runs and predict
pipeline is skipped
You can review the same by changing the scenario dropdown to
build_scenario on the canvas view on [RapidCanvas
UI](https://staging.dev.rapidcanvas.net/)
```ipython3
#project.run_scenario(build_scenario._id)