Model build and predict
This example file will walk you through the steps involved to build an ML model using historic data and predict on new incoming data.
The example provides historic sensor data of windturbines and their failures. A model will be built based on this historic data. New daily sensor data will be passed through the model to predict failures.
Download the project files here:Reference Project
Building an ML model and predicting on RapidCanvas involves the following steps:
Import functions
Authenticate your client
Create a custom environment
Create a new project
Fetch pre-built templates
Set project variables and create scenarios
Set project varaibles
Create relevant scenarios
Create a build scenario
Create a predict scenario
Create a build pipeline
Add Input Datasets
Transform your raw data
Create recipe to fill nulls
Create recipe to clean data
Create recipe for tsfresh features
Build ML model
Create a recipe to add labels to the dataset
Create a recipe to build a random forest model
Create a predict pipeline
Add Input Datasets
Transform your raw data
Create recipe to fill nulls
Create recipe to clean raw data
Create recipe to add prediction features
Create recipe for tsfresh features
Model Prediction
Create a recipe for model prediction
Run scenarios
Run predict scenario for model prediction
Run build scenario for model building
Import function
from utils.rc.client.requests import Requests
from utils.rc.client.auth import AuthClient
from utils.rc.dtos.project import Project
from utils.rc.dtos.dataset import Dataset
from utils.rc.dtos.recipe import Recipe
from utils.rc.dtos.transform import Transform
from utils.rc.dtos.env import Env
from utils.rc.dtos.env import EnvType
from utils.rc.dtos.template_v2 import TemplateV2, TemplateTransformV2
from utils.rc.dtos.global_variable import GlobalVariable
from utils.rc.dtos.scenario import RecipeExpression
from utils.rc.dtos.scenario import Operator
from utils.rc.dtos.scenario import Scenario
from utils.rc.dtos.dataSource import DataSource
from utils.rc.dtos.dataSource import DataSourceType
from utils.rc.dtos.dataSource import GcpConfig
import logging
from utils.utils.log_util import LogUtil
LogUtil.set_basic_config(format='%(levelname)s:%(message)s', level=logging.INFO)
Authenticate your client
# Requests.setRootHost("https://test.dev.rapidcanvas.net/api/")
AuthClient.setToken()
Create a custom environment
Here are the available custom environments and their usage gudelines
## Environment Creation
env = Env.createEnv(
name="env_build_predict",
description="Max large env for running build and predict",
envType=EnvType.LARGE,
requirements="numpy==1.21.5 tsfresh==0.20.0"
)
env.id
Create a Project
Create a new project under your tenant
project_name = "Build and Predict"
description = "One project for build and predict with 2 pipelines"
icon = "https://rapidcanvas.ai/wp-content/uploads/2022/09/windturbine_med.jpg"
project = Project.create(
name=project_name,
description=description,
icon=icon,
envId=env.id,
createEmpty=True
)
project.id
This has now created a new project named “Build and Predict” under your tenant. You can check the same on the RapidCanvas UI by logging in here: RapidCanvas UI
Getting Templates
You can utilize pre-built RapidCanvas templates as part of your project. In this section we are defining some prebuilt templates which will be used during the build pipeline.
# This gets all available templates
templates = TemplateV2.get_all()
# Relevant templates for this project are being fetched
fill_null_template = TemplateV2.get_template_by('Fill Null Timeseries')
undersample_timeseries_template = TemplateV2.get_template_by('Undersample Timeseries Data')
tsfresh_template = TemplateV2.get_template_by('Tsfresh Features')
time_to_event_template = TemplateV2.get_template_by('Time To Event')
RandomForest_template = TemplateV2.get_template_by('Random Forest')
Set project variables and create scenarios
Add project variables
Project variables are stored as key value pairs at the project level and the name of the variable can be referred to using the “@variable name” notion to pass the corresponding value. In this case we are creating a global variable called model_global which can be used to determine wheather to run the build pipeline or a predict pipeline.
globalVariable = GlobalVariable(
name="mode_global",
project_id=project.id,
type="string",
value="build"
)
globalVariable.create()
Create relevant scenarios
A scenario is created with in a project and allows to run a pipeline or a recipe only when certain conditions are met. We are using scenarios in this example to either just run the build pipeline or the predict pipeline.
Build Scenario
As part of the build scenario, our global variable mode_global is changed to “build” which will only run the build pipeline and skip the predict pipeline. After your first build run, you will typically only re-run your predict pipeline. However if you have some new historic data, or want to rebuild the model, you can re-run the build scenario.
build_scenario = project.add_scenario(
name='build_scenario',
description='Model Build',
shared_variables=dict(mode_global="build")
)
Predict Scenario
As part of the predict scenario, our global variable mode_global is changed to “predict” which will only run the predict pipeline and skip the build pipeline.
In our example, we we will run predict scenario, every time we have a new file to predict. During prediction we will be using the model which was already built during the build pipeline.
predict_scenario = project.add_scenario(
name='predict_scenario',
description='Model Predict',
shared_variables=dict(mode_global="predict")
)
Create a build pipeline
As part of the section, we will follow all the relevant steps to build an ML model using historic data.
Add Input Datasets - Build pipeline
As part of the build pipeline, we are adding 2 data sets to the project, sensor data and failures data. Sensor data contains all the historic data which was collected from wind turbine sensors for a given time period.
Failures data contains the list of turbine components and their corresponding failure timestamp along with remarks
sensorsDataset = project.addDataset(
dataset_name="sensor_events",
dataset_description="Sensor data of wind turbines",
dataset_file_path="data/sensor_edp.csv"
)
labelsDataset = project.addDataset(
dataset_name="incident_events",
dataset_description="Labels data of wind turbines",
dataset_file_path="data/failures_edp.csv"
)
Transform your raw data
Create recipe to fill nulls
This recipe cleans up the sensor data by identifying any nulls and filling them as per the chosen method.
Note that we have added a first line before the recipe, to define a build_mode. A build_mode will make sure this recipe will only run when the value of “mode_global” is set to “build”. If not, this recipe run will be skipped. Please note if this recipe run is skipped, everything downstream associated with this recipe will also be skipped.
build_mode = RecipeExpression(field='mode_global', operator=Operator.EQUAL_TO, value='build')
fill_null_recipe = project.addRecipe([sensorsDataset], name='fill_null_recipe', condition=build_mode)
fill_null = Transform()
fill_null.templateId = fill_null_template.id
fill_null.name='fill_null'
fill_null.variables = {
'inputDataset': "sensor_events",
'columns':'',
'Group_by':'Turbine_ID',
'how':'ffill',
'Timestamp': 'Timestamp',
'outputDataset':'fill_null_output'
}
fill_null_recipe.add_transform(fill_null)
fill_null_recipe.run()
fill_null_dataset = fill_null_recipe.getChildrenDatasets()['fill_null_output']
fill_null_dataset.getData(5)
Create recipe to clean data
This recipe takes the fill null output and uses undersample timeseries to clean the sensor dataset
Note that we have not added any condition to run this only for the build pipeline. It is optional at this point because this is connected to the output of the previous recipe which had the condition in place. If the satisfied on the first recipe, we are expecting everything else to run downstream. If the condition is not met on the first recipe, it will be skipped along with everything downstream.
sensor_cleaning_recipe = project.addRecipe([fill_null_dataset], name='sensor_cleaning_recipe')
undersample_timeseries = Transform()
undersample_timeseries.templateId = undersample_timeseries_template.id
undersample_timeseries.name='undersample_timeseries'
undersample_timeseries.variables = {
'inputDataset': "fill_null_output",
'Col_to_undersample_by':'Turbine_ID',
'Timestamp':"Timestamp",
'Frequency': "D",
'Resample_type': "MEAN",
'outputDataset':'sensor_cleaned'
}
sensor_cleaning_recipe.add_transform(undersample_timeseries)
sensor_cleaning_recipe.run()
Output dataset and review sample
sensor_cleaned = sensor_cleaning_recipe.getChildrenDatasets()['sensor_cleaned']
sensor_cleaned.getData(5)
Create recipe for tsfresh features
This recipe takes the historic cleaned sensor output data and generates 30 day aggregates for each row of data. This generates all the additional features we need.
Please note that rows where 30 days of historic data is not available will be dropped at this step.
sensor_tsfresh = project.addRecipe([sensor_cleaned], name='sensor_tsfresh')
tsfresh = Transform()
tsfresh.templateId = tsfresh_template.id
tsfresh.name='tsfresh'
tsfresh.variables = {
'inputDataset': "sensor_cleaned",
"max_timeshift":30,
"min_timeshift":30,
"entity":'Turbine_ID',
"time":'Timestamp',
"large":"True",
"outputDataset": "sensor_ts_fresh"
}
sensor_tsfresh.add_transform(tsfresh)
sensor_tsfresh.run()
Output dataset and review sample
sensor_tsfresh_dataset = sensor_tsfresh.getChildrenDatasets()['sensor_ts_fresh']
sensor_tsfresh_dataset.getData(5)
Build ML model
Create a recipe to add labels to the dataset
As part of the model building step, we first join our dataset with new features with the failures dataset which we uploaded at the start of the build pipeline.
join_time_to_failure_recipe=project.addRecipe([sensor_tsfresh_dataset, labelsDataset], name='join_time_to_failure_recipe')
time_to_failure = Transform()
time_to_failure.templateId = time_to_event_template.id
time_to_failure.name='time_to_failure'
time_to_failure.variables = {
'EventDataset':labelsDataset.name,
'TimeSeriesDataset':'sensor_ts_fresh',
'Eventkey':'Turbine_ID',
'TimeSerieskey':'Turbine_ID',
'EventTimestamp':'Timestamp',
'TimeSeriesTimestamp':'Timestamp',
'UnitOfTime':'days',
'outputDataset':'time_to_failure_dataset'
}
join_time_to_failure_recipe.add_transform(time_to_failure)
join_time_to_failure_recipe.run()
Output dataset and review sample
time_to_failure_dataset = join_time_to_failure_recipe.getChildrenDatasets()['time_to_failure_dataset']
time_to_failure_dataset.getData(5)
Create a recipe to build a random forest model
In this step we build the ML model. Please note that once the model is built, it is automatically stored in RapidCanvas repository and can be retrieved for prediction in the later steps.
Note that this marks the end of the build pipeline
template = TemplateV2(
name="LocalRandomForest", description="LocalRandomForest", project_id=project.id, source="CUSTOM", status="ACTIVE", tags=["Number", "datatype-long"]
)
template_transform = TemplateTransformV2(type = "python", params=dict(notebookName="Local-Random-Forest.ipynb"))
template.base_transforms = [template_transform]
template.publish("transforms/Local-Random-Forest.ipynb")
randomforest_recipe=project.addRecipe([time_to_failure_dataset], 'LocalRandomForest')
RandomForest = Transform()
RandomForest.templateId = template.id
RandomForest.name='RandomForest'
RandomForest.variables = {
'inputDataset': "time_to_failure_dataset",
'target':'time_to_event',
'train_size':0.8,
'model_to_save':'ml_random_forest_v15'
}
randomforest_recipe.add_transform(RandomForest)
randomforest_recipe.run()
Output dataset and review sample
children = randomforest_recipe.getChildrenDatasets()
children['Test_with_prediction'].getData(5)
children['Train_with_prediction'].getData(5)
Model Prediction Pipeline
Now that we have built a ML model, we can start building our pipeline to utilize the model for predicting on new sensor data.
Add input dataset - Daily prediction files
This is the new sensor data which has not been used by the model and will be predicted on.
dailyDataset = project.addDataset(
dataset_name="daily_events",
dataset_description="Daily data of wind turbine for prediction",
dataset_file_path="data/daily_data/data-with-features.csv"
)
Transform your raw data
Create recipe to fill nulls
We follow the same set of data cleaning steps for the new data as we have done during the build pipeline. Do note that we have added a first line before the recipe, to define a predict mode.
A predict mode will make sure this recipe will only run when the value of “mode_global” is set to “predict”. If not, this recipe run will be skipped. Please note if this recipe run is skipped, everything downstream associated with this recipe will also be skipped.
predict_mode = RecipeExpression(field='mode_global', operator=Operator.EQUAL_TO, value='predict')
fill_null_recipe_predict = project.addRecipe([dailyDataset], name='fill_null_recipe_predict', condition=predict_mode)
fill_null = Transform()
fill_null.templateId = fill_null_template.id
fill_null.name='fill_null'
fill_null.variables = {
'inputDataset': "daily_events",
'columns':'',
'Group_by':'Turbine_ID',
'how':'ffill',
'Timestamp': 'Timestamp',
'outputDataset':'fill_null_output_predict'
}
fill_null_recipe_predict.add_transform(fill_null)
fill_null_recipe_predict.run()
fill_null_predict_dataset = fill_null_recipe_predict.getChildrenDatasets()['fill_null_output_predict']
fill_null_predict_dataset.getData(5)
Create recipe to clean data
This recipe takes the fill null output of the new data and uses undersample timeseries to clean the dataset
Note that we have not added any condition to run this only for the predict pipeline. It is optional at this point because this is connected to the output of the previous recipe which had the condition in place. If the condition is satisfied on the first recipe, we are expecting everything else to run downstream. If the condition is not met on the first recipe, it will be skipped along with everything downstream.
sensor_cleaned_recipe_predict = project.addRecipe([fill_null_predict_dataset], name='sensor_cleaned_recipe_predict')
undersample_timeseries = Transform()
undersample_timeseries.templateId = undersample_timeseries_template.id
undersample_timeseries.name='undersample_timeseries'
undersample_timeseries.variables = {
'inputDataset': "fill_null_output_predict",
'Col_to_undersample_by':'Turbine_ID',
'Timestamp':"Timestamp",
'Frequency': "D",
'Resample_type': "MEAN",
'outputDataset':'sensor_cleaned_predict'
}
sensor_cleaned_recipe_predict.add_transform(undersample_timeseries)
sensor_cleaned_recipe_predict.run()
sensor_cleaned_predict_dataset = sensor_cleaned_recipe_predict.getChildrenDatasets()['sensor_cleaned_predict']
sensor_cleaned_predict_dataset.getData(5)
Create recipe to add prediction features
This step is new to the predict pipeline compared to the build pipeline. During the build pipeline, all the historic data was available to generate aggregates. However during predict pipeline, we only have access to that days data. To generate 30 day aggregates we need to go to the historic data and pull the relevant 30 days historic data for each of these rows.
This recipe provides the ability to go to the feature store and pull the necessary data for generating 30 day aggregates.
Do note that this feature is still in beta and might change in the future
addFeaturesRecipe = project.addRecipe([sensor_cleaned_predict_dataset], name="addFeatures")
template = TemplateV2(
name="AddFeaturesPredict", description="AddFeaturesPredict", project_id=project.id, source="CUSTOM", status="ACTIVE", tags=["Number", "datatype-long"]
)
template_transform = TemplateTransformV2(type = "python", params=dict(notebookName="Add-Features-Predict.ipynb"))
template.base_transforms = [template_transform]
template.publish("transforms/Add-Features-Predict.ipynb")
transform = Transform()
transform.templateId = template.id
transform.name = "addFeatures"
transform.variables = {
"cleanedDataset": "sensor_cleaned_predict",
"outputDataset": "addFeaturesOutput"
}
addFeaturesRecipe.add_transform(transform)
addFeaturesRecipe.run()
added_features_dataset = addFeaturesRecipe.getChildrenDatasets()['addFeaturesOutput']
added_features_dataset.getData(5)
Create recipe for tsfresh features
Now that we have the necessary historic data available, this recipe generates 30 day aggregates for each row of data. This generates all the tsfresh features we need.
Please note that rows where 30 days of historic data is not available will be dropped at this step.
sensor_tsfresh_predict = project.addRecipe([added_features_dataset], name='sensor_tsfresh_predict')
tsfresh = Transform()
tsfresh.templateId = tsfresh_template.id
tsfresh.name='tsfresh'
tsfresh.variables = {
'inputDataset': "addFeaturesOutput",
"max_timeshift":30,
"min_timeshift":30,
"entity":'Turbine_ID',
"time":'Timestamp',
"large":"True",
"outputDataset": "sensor_ts_fresh_predict"
}
sensor_tsfresh_predict.add_transform(tsfresh)
sensor_tsfresh_predict.run()
sensor_tsfresh_predict_dataset = sensor_tsfresh_predict.getChildrenDatasets()['sensor_ts_fresh_predict']
sensor_tsfresh_predict_dataset.getData(5)
Create a recipe for model prediction
In this step, we pass the feature enriched daily dataset to our previously stored model. All you need to provide is the model name and RapidCanvas run the dataset through it.
prediction_template = TemplateV2(
name="Model Prediction", description="Pick a model to run the prediction on the dataset",
source="CUSTOM", status="ACTIVE", tags=["UI", "Aggregation"], project_id=project.id
)
prediction_template_transform = TemplateTransformV2(
type = "python", params=dict(notebookName="Prediction.ipynb"))
prediction_template.base_transforms = [prediction_template_transform]
prediction_template.publish("transforms/Prediction.ipynb")
predictor_transform = Transform()
predictor_transform.templateId = prediction_template.id
predictor_transform.name='predictor'
predictor_transform.variables = {
'inputDataset': "sensor_ts_fresh_predict",
"modelName":"ml_random_forest_v15"
}
predictor = project.addRecipe([sensor_tsfresh_predict_dataset], name='predictor')
predictor.add_transform(predictor_transform)
predictor.run()
Output dataset and review sample
predictions = predictor.getChildrenDatasets()['prediction']
predictions.getData(5)
Run only predict scenario
If you get new data sets on a daily basis, you can update the daily_events dataset and just run the predict scenario, which will ensure that only the predict pipeline runs and the build pipeline is skipped.
You can review the same by changing the scenario dropdown to predict_scenario on the canvas view on RapidCanvas UI
#project.run_scenario(predict_scenario._id)
Run only build scenario
If you want to re-build the model for any reason, you can just run the build scenario which will ensure the build pipeline runs and predict pipeline is skipped
You can review the same by changing the scenario dropdown to build_scenario on the canvas view on RapidCanvas UI
#project.run_scenario(build_scenario._id)