Model build and predict

This example file will walk you through the steps involved to build an ML model using historic data and predict on new incoming data.

The example provides historic sensor data of windturbines and their failures. A model will be built based on this historic data. New daily sensor data will be passed through the model to predict failures.

Download the project files here:Reference Project

Building an ML model and predicting on RapidCanvas involves the following steps:

  • Import functions

  • Authenticate your client

  • Create a custom environment

  • Create a new project

    • Fetch pre-built templates

  • Set project variables and create scenarios

    • Set project varaibles

    • Create relevant scenarios

      • Create a build scenario

      • Create a predict scenario

  • Create a build pipeline

    • Add Input Datasets

    • Transform your raw data

      • Create recipe to fill nulls

      • Create recipe to clean data

    • Create recipe for tsfresh features

    • Build ML model

      • Create a recipe to add labels to the dataset

      • Create a recipe to build a random forest model

  • Create a predict pipeline

    • Add Input Datasets

    • Transform your raw data

      • Create recipe to fill nulls

      • Create recipe to clean raw data

    • Create recipe to add prediction features

    • Create recipe for tsfresh features

    • Model Prediction

      • Create a recipe for model prediction

  • Run scenarios

    • Run predict scenario for model prediction

    • Run build scenario for model building

Import function

from utils.rc.client.requests import Requests
from utils.rc.client.auth import AuthClient

from utils.rc.dtos.project import Project
from utils.rc.dtos.dataset import Dataset
from utils.rc.dtos.recipe import Recipe
from utils.rc.dtos.transform import Transform
from utils.rc.dtos.env import Env
from utils.rc.dtos.env import EnvType
from utils.rc.dtos.template_v2 import TemplateV2, TemplateTransformV2
from utils.rc.dtos.global_variable import GlobalVariable
from utils.rc.dtos.scenario import RecipeExpression
from utils.rc.dtos.scenario import Operator
from utils.rc.dtos.scenario import Scenario

from utils.rc.dtos.dataSource import DataSource
from utils.rc.dtos.dataSource import DataSourceType
from utils.rc.dtos.dataSource import GcpConfig

import logging
from utils.utils.log_util import LogUtil

LogUtil.set_basic_config(format='%(levelname)s:%(message)s', level=logging.INFO)

Authenticate your client

# Requests.setRootHost("https://test.dev.rapidcanvas.net/api/")
AuthClient.setToken()

Create a custom environment

Here are the available custom environments and their usage gudelines

SMALL: 1 Core, 2GB Memmory
MEDIUM: 2 Cores, 4GB Memmory
LARGE: 4 Cores, 8GB Memmory
CPU_LARGE: 8 Cores, 16GB Memmory
MAX_LARGE: 12 Cores, 32GB Memmory
EXTRA_MAX_LARGE: 12 Cores, 48GB Memmory
## Environment Creation
env = Env.createEnv(
    name="env_build_predict",
    description="Max large env for running build and predict",
    envType=EnvType.LARGE,
    requirements="numpy==1.21.5 tsfresh==0.20.0"
)
env.id

Create a Project

Create a new project under your tenant

project_name = "Build and Predict"
description = "One project for build and predict with 2 pipelines"
icon = "https://rapidcanvas.ai/wp-content/uploads/2022/09/windturbine_med.jpg"
project = Project.create(
    name=project_name,
    description=description,
    icon=icon,
    envId=env.id,
    createEmpty=True
)
project.id

This has now created a new project named “Build and Predict” under your tenant. You can check the same on the RapidCanvas UI by logging in here: RapidCanvas UI

Getting Templates

You can utilize pre-built RapidCanvas templates as part of your project. In this section we are defining some prebuilt templates which will be used during the build pipeline.

# This gets all available templates
templates = TemplateV2.get_all()
# Relevant templates for this project are being fetched
fill_null_template = TemplateV2.get_template_by('Fill Null Timeseries')
undersample_timeseries_template = TemplateV2.get_template_by('Undersample Timeseries Data')
tsfresh_template = TemplateV2.get_template_by('Tsfresh Features')
time_to_event_template = TemplateV2.get_template_by('Time To Event')
RandomForest_template = TemplateV2.get_template_by('Random Forest')

Set project variables and create scenarios

Add project variables

Project variables are stored as key value pairs at the project level and the name of the variable can be referred to using the “@variable name” notion to pass the corresponding value. In this case we are creating a global variable called model_global which can be used to determine wheather to run the build pipeline or a predict pipeline.

globalVariable = GlobalVariable(
    name="mode_global",
    project_id=project.id,
    type="string",
    value="build"
)
globalVariable.create()

Create relevant scenarios

A scenario is created with in a project and allows to run a pipeline or a recipe only when certain conditions are met. We are using scenarios in this example to either just run the build pipeline or the predict pipeline.

Build Scenario

As part of the build scenario, our global variable mode_global is changed to “build” which will only run the build pipeline and skip the predict pipeline. After your first build run, you will typically only re-run your predict pipeline. However if you have some new historic data, or want to rebuild the model, you can re-run the build scenario.

build_scenario = project.add_scenario(
    name='build_scenario',
    description='Model Build',
    shared_variables=dict(mode_global="build")
)

Predict Scenario

As part of the predict scenario, our global variable mode_global is changed to “predict” which will only run the predict pipeline and skip the build pipeline.

In our example, we we will run predict scenario, every time we have a new file to predict. During prediction we will be using the model which was already built during the build pipeline.

predict_scenario = project.add_scenario(
    name='predict_scenario',
    description='Model Predict',
    shared_variables=dict(mode_global="predict")
)

Create a build pipeline

As part of the section, we will follow all the relevant steps to build an ML model using historic data.

Add Input Datasets - Build pipeline

As part of the build pipeline, we are adding 2 data sets to the project, sensor data and failures data. Sensor data contains all the historic data which was collected from wind turbine sensors for a given time period.

Failures data contains the list of turbine components and their corresponding failure timestamp along with remarks

sensorsDataset = project.addDataset(
    dataset_name="sensor_events",
    dataset_description="Sensor data of wind turbines",
    dataset_file_path="data/sensor_edp.csv"
)

labelsDataset = project.addDataset(
    dataset_name="incident_events",
    dataset_description="Labels data of wind turbines",
    dataset_file_path="data/failures_edp.csv"
)

Transform your raw data

Create recipe to fill nulls

This recipe cleans up the sensor data by identifying any nulls and filling them as per the chosen method.

Note that we have added a first line before the recipe, to define a build_mode. A build_mode will make sure this recipe will only run when the value of “mode_global” is set to “build”. If not, this recipe run will be skipped. Please note if this recipe run is skipped, everything downstream associated with this recipe will also be skipped.

build_mode = RecipeExpression(field='mode_global', operator=Operator.EQUAL_TO, value='build')
fill_null_recipe = project.addRecipe([sensorsDataset], name='fill_null_recipe', condition=build_mode)
fill_null = Transform()
fill_null.templateId = fill_null_template.id
fill_null.name='fill_null'
fill_null.variables = {
    'inputDataset': "sensor_events",
    'columns':'',
    'Group_by':'Turbine_ID',
    'how':'ffill',
    'Timestamp': 'Timestamp',
    'outputDataset':'fill_null_output'
}
fill_null_recipe.add_transform(fill_null)
fill_null_recipe.run()
fill_null_dataset = fill_null_recipe.getChildrenDatasets()['fill_null_output']
fill_null_dataset.getData(5)

Create recipe to clean data

This recipe takes the fill null output and uses undersample timeseries to clean the sensor dataset

Note that we have not added any condition to run this only for the build pipeline. It is optional at this point because this is connected to the output of the previous recipe which had the condition in place. If the satisfied on the first recipe, we are expecting everything else to run downstream. If the condition is not met on the first recipe, it will be skipped along with everything downstream.

sensor_cleaning_recipe = project.addRecipe([fill_null_dataset], name='sensor_cleaning_recipe')
undersample_timeseries = Transform()
undersample_timeseries.templateId = undersample_timeseries_template.id
undersample_timeseries.name='undersample_timeseries'
undersample_timeseries.variables = {
    'inputDataset': "fill_null_output",
    'Col_to_undersample_by':'Turbine_ID',
    'Timestamp':"Timestamp",
    'Frequency': "D",
    'Resample_type': "MEAN",
    'outputDataset':'sensor_cleaned'
}
sensor_cleaning_recipe.add_transform(undersample_timeseries)
sensor_cleaning_recipe.run()

Output dataset and review sample

sensor_cleaned = sensor_cleaning_recipe.getChildrenDatasets()['sensor_cleaned']
sensor_cleaned.getData(5)

Create recipe for tsfresh features

This recipe takes the historic cleaned sensor output data and generates 30 day aggregates for each row of data. This generates all the additional features we need.

Please note that rows where 30 days of historic data is not available will be dropped at this step.

sensor_tsfresh = project.addRecipe([sensor_cleaned], name='sensor_tsfresh')
tsfresh = Transform()
tsfresh.templateId = tsfresh_template.id
tsfresh.name='tsfresh'
tsfresh.variables = {
    'inputDataset': "sensor_cleaned",
    "max_timeshift":30,
    "min_timeshift":30,
    "entity":'Turbine_ID',
    "time":'Timestamp',
    "large":"True",
    "outputDataset": "sensor_ts_fresh"
}
sensor_tsfresh.add_transform(tsfresh)
sensor_tsfresh.run()

Output dataset and review sample

sensor_tsfresh_dataset = sensor_tsfresh.getChildrenDatasets()['sensor_ts_fresh']
sensor_tsfresh_dataset.getData(5)

Build ML model

Create a recipe to add labels to the dataset

As part of the model building step, we first join our dataset with new features with the failures dataset which we uploaded at the start of the build pipeline.

join_time_to_failure_recipe=project.addRecipe([sensor_tsfresh_dataset, labelsDataset], name='join_time_to_failure_recipe')
time_to_failure = Transform()
time_to_failure.templateId = time_to_event_template.id
time_to_failure.name='time_to_failure'
time_to_failure.variables = {
    'EventDataset':labelsDataset.name,
    'TimeSeriesDataset':'sensor_ts_fresh',
    'Eventkey':'Turbine_ID',
    'TimeSerieskey':'Turbine_ID',
    'EventTimestamp':'Timestamp',
    'TimeSeriesTimestamp':'Timestamp',
    'UnitOfTime':'days',
    'outputDataset':'time_to_failure_dataset'
}
join_time_to_failure_recipe.add_transform(time_to_failure)
join_time_to_failure_recipe.run()

Output dataset and review sample

time_to_failure_dataset = join_time_to_failure_recipe.getChildrenDatasets()['time_to_failure_dataset']
time_to_failure_dataset.getData(5)

Create a recipe to build a random forest model

In this step we build the ML model. Please note that once the model is built, it is automatically stored in RapidCanvas repository and can be retrieved for prediction in the later steps.

Note that this marks the end of the build pipeline

template = TemplateV2(
    name="LocalRandomForest", description="LocalRandomForest", project_id=project.id, source="CUSTOM", status="ACTIVE", tags=["Number", "datatype-long"]
)
template_transform = TemplateTransformV2(type = "python", params=dict(notebookName="Local-Random-Forest.ipynb"))
template.base_transforms = [template_transform]
template.publish("transforms/Local-Random-Forest.ipynb")
randomforest_recipe=project.addRecipe([time_to_failure_dataset], 'LocalRandomForest')
RandomForest = Transform()
RandomForest.templateId = template.id
RandomForest.name='RandomForest'
RandomForest.variables = {
    'inputDataset': "time_to_failure_dataset",
    'target':'time_to_event',
    'train_size':0.8,
    'model_to_save':'ml_random_forest_v15'
}
randomforest_recipe.add_transform(RandomForest)
randomforest_recipe.run()

Output dataset and review sample

children = randomforest_recipe.getChildrenDatasets()
children['Test_with_prediction'].getData(5)
children['Train_with_prediction'].getData(5)

Model Prediction Pipeline

Now that we have built a ML model, we can start building our pipeline to utilize the model for predicting on new sensor data.

Add input dataset - Daily prediction files

This is the new sensor data which has not been used by the model and will be predicted on.

dailyDataset = project.addDataset(
    dataset_name="daily_events",
    dataset_description="Daily data of wind turbine for prediction",
    dataset_file_path="data/daily_data/data-with-features.csv"
)

Transform your raw data

Create recipe to fill nulls

We follow the same set of data cleaning steps for the new data as we have done during the build pipeline. Do note that we have added a first line before the recipe, to define a predict mode.

A predict mode will make sure this recipe will only run when the value of “mode_global” is set to “predict”. If not, this recipe run will be skipped. Please note if this recipe run is skipped, everything downstream associated with this recipe will also be skipped.

predict_mode = RecipeExpression(field='mode_global', operator=Operator.EQUAL_TO, value='predict')
fill_null_recipe_predict = project.addRecipe([dailyDataset], name='fill_null_recipe_predict', condition=predict_mode)
fill_null = Transform()
fill_null.templateId = fill_null_template.id
fill_null.name='fill_null'
fill_null.variables = {
    'inputDataset': "daily_events",
    'columns':'',
    'Group_by':'Turbine_ID',
    'how':'ffill',
    'Timestamp': 'Timestamp',
    'outputDataset':'fill_null_output_predict'
}
fill_null_recipe_predict.add_transform(fill_null)
fill_null_recipe_predict.run()
fill_null_predict_dataset = fill_null_recipe_predict.getChildrenDatasets()['fill_null_output_predict']
fill_null_predict_dataset.getData(5)

Create recipe to clean data

This recipe takes the fill null output of the new data and uses undersample timeseries to clean the dataset

Note that we have not added any condition to run this only for the predict pipeline. It is optional at this point because this is connected to the output of the previous recipe which had the condition in place. If the condition is satisfied on the first recipe, we are expecting everything else to run downstream. If the condition is not met on the first recipe, it will be skipped along with everything downstream.

sensor_cleaned_recipe_predict = project.addRecipe([fill_null_predict_dataset], name='sensor_cleaned_recipe_predict')
undersample_timeseries = Transform()
undersample_timeseries.templateId = undersample_timeseries_template.id
undersample_timeseries.name='undersample_timeseries'
undersample_timeseries.variables = {
    'inputDataset': "fill_null_output_predict",
    'Col_to_undersample_by':'Turbine_ID',
    'Timestamp':"Timestamp",
    'Frequency': "D",
    'Resample_type': "MEAN",
    'outputDataset':'sensor_cleaned_predict'
}
sensor_cleaned_recipe_predict.add_transform(undersample_timeseries)
sensor_cleaned_recipe_predict.run()
sensor_cleaned_predict_dataset = sensor_cleaned_recipe_predict.getChildrenDatasets()['sensor_cleaned_predict']
sensor_cleaned_predict_dataset.getData(5)

Create recipe to add prediction features

This step is new to the predict pipeline compared to the build pipeline. During the build pipeline, all the historic data was available to generate aggregates. However during predict pipeline, we only have access to that days data. To generate 30 day aggregates we need to go to the historic data and pull the relevant 30 days historic data for each of these rows.

This recipe provides the ability to go to the feature store and pull the necessary data for generating 30 day aggregates.

Do note that this feature is still in beta and might change in the future

addFeaturesRecipe = project.addRecipe([sensor_cleaned_predict_dataset], name="addFeatures")
template = TemplateV2(
    name="AddFeaturesPredict", description="AddFeaturesPredict", project_id=project.id, source="CUSTOM", status="ACTIVE", tags=["Number", "datatype-long"]
)
template_transform = TemplateTransformV2(type = "python", params=dict(notebookName="Add-Features-Predict.ipynb"))
template.base_transforms = [template_transform]
template.publish("transforms/Add-Features-Predict.ipynb")
transform = Transform()
transform.templateId = template.id
transform.name = "addFeatures"
transform.variables = {
    "cleanedDataset": "sensor_cleaned_predict",
    "outputDataset": "addFeaturesOutput"
}
addFeaturesRecipe.add_transform(transform)
addFeaturesRecipe.run()
added_features_dataset = addFeaturesRecipe.getChildrenDatasets()['addFeaturesOutput']
added_features_dataset.getData(5)

Create recipe for tsfresh features

Now that we have the necessary historic data available, this recipe generates 30 day aggregates for each row of data. This generates all the tsfresh features we need.

Please note that rows where 30 days of historic data is not available will be dropped at this step.

sensor_tsfresh_predict = project.addRecipe([added_features_dataset], name='sensor_tsfresh_predict')
tsfresh = Transform()
tsfresh.templateId = tsfresh_template.id
tsfresh.name='tsfresh'
tsfresh.variables = {
    'inputDataset': "addFeaturesOutput",
    "max_timeshift":30,
    "min_timeshift":30,
    "entity":'Turbine_ID',
    "time":'Timestamp',
    "large":"True",
    "outputDataset": "sensor_ts_fresh_predict"
}
sensor_tsfresh_predict.add_transform(tsfresh)
sensor_tsfresh_predict.run()
sensor_tsfresh_predict_dataset = sensor_tsfresh_predict.getChildrenDatasets()['sensor_ts_fresh_predict']
sensor_tsfresh_predict_dataset.getData(5)

Create a recipe for model prediction

In this step, we pass the feature enriched daily dataset to our previously stored model. All you need to provide is the model name and RapidCanvas run the dataset through it.

prediction_template = TemplateV2(
    name="Model Prediction", description="Pick a model to run the prediction on the dataset",
    source="CUSTOM", status="ACTIVE", tags=["UI", "Aggregation"], project_id=project.id
)
prediction_template_transform = TemplateTransformV2(
    type = "python", params=dict(notebookName="Prediction.ipynb"))

prediction_template.base_transforms = [prediction_template_transform]
prediction_template.publish("transforms/Prediction.ipynb")
predictor_transform = Transform()
predictor_transform.templateId = prediction_template.id
predictor_transform.name='predictor'
predictor_transform.variables = {
    'inputDataset': "sensor_ts_fresh_predict",
    "modelName":"ml_random_forest_v15"
}
predictor = project.addRecipe([sensor_tsfresh_predict_dataset], name='predictor')
predictor.add_transform(predictor_transform)
predictor.run()

Output dataset and review sample

predictions = predictor.getChildrenDatasets()['prediction']
predictions.getData(5)

Run only predict scenario

If you get new data sets on a daily basis, you can update the daily_events dataset and just run the predict scenario, which will ensure that only the predict pipeline runs and the build pipeline is skipped.

You can review the same by changing the scenario dropdown to predict_scenario on the canvas view on RapidCanvas UI

#project.run_scenario(predict_scenario._id)

Run only build scenario

If you want to re-build the model for any reason, you can just run the build scenario which will ensure the build pipeline runs and predict pipeline is skipped

You can review the same by changing the scenario dropdown to build_scenario on the canvas view on RapidCanvas UI

#project.run_scenario(build_scenario._id)