Skip to content

Part 4: Deploy a ML use case with inputs and outputs

Introduction

In Part 3, we have built and run our second ML pipeline to retrieve our trained model from the data store, provide some new data to it as input and retrieve the result as an output of our pipeline execution.

What if we want to let an external user execute our predict pipeline? Or if we want to schedule the execution of the pipeline that trains our model periodically?

We need to deploy one pipeline via an endpoint and another one with a scheduled execution.

This part will show you how to do this with the Craft AI platform:

  • We will first update the code of the predictIris() function so that it can retrieve directly from the data store the trained model and returns the predictions as a json to the user.
  • We will also update the code of the trainIris() function so that it re trains the model on a specific dataset (that could be often updated) and uploads the trained model directly to the datastore.
  • Then, we will see how to create a step and a pipeline that we will deploy on the platform in two different ways, and that could be executed periodicly and by a call.

Prerequisites

Machine Learning application with I/O

Here we will build an application based on what we did on the last part. We will expose our service to external users and schedule periodic executions.

Overview of the use case

  • To get the predictions via and endpoint:

step4_0

  • To retrain the model periodicly (we will focus on this case later):

step4_0_bis

The code we want to execute

We will first focus on the construction of the endpoint the final user will be able to target.

First we have to update our code to retrieve directly the model from the data store without any call to the sdk in the code and to return a file on the data store with the predictions inside. Hence, our file src/part-4-iris-predict.py is as follows:

import joblib
import pandas as pd
import json

def predictIris(input_data: dict, input_model:dict):

   model = joblib.load(input_model['path'])

   input_dataframe = pd.DataFrame.from_dict(input_data, orient="index")
   predictions = model.predict(input_dataframe)

   return {"predictions": predictions.tolist()}

What changed are only how we get the trained model.

model = joblib.load(input_model['path'])

input_model is a dictionary in which the key path refers to the file's path where is located the file on the step environnement.

This input is a file data type.

Don't forget to update your requirements.txt file, containing the list of Python libraries used in our step function:

joblib==xx.xx.xx
pandas==xx.xx.xx

Warning

As for the code, the platform only sees what’s on your repository so don’t forget to push your requirements file on your Git repository.

Step creation with Input and Output

As we did in part 3, we will first declare the inputs and the output. Then, we will use the function sdk.create_step() to create the whole step.

step4_1

Declare Input and Output of our new step

The only difference now is the data type we will assign to input_model. This is now a file that we want to retrieve from the data store. To do so, we define the inputs and output like below:

from craft_ai_sdk.io import Input, Output

prediction_input = Input(
   name="input_data",
   data_type="json"
)

model_input = Input(
   name="input_model",
   data_type="file"
)

prediction_output = Output(
   name="predictions",
   data_type="json"
)

We have just seen the code of the step has been adapted to handle file objects.

Now, we have everything we need to create, as before, the step and the pipeline corresponding to our predictIris() function.

Create your step

Now as in Part 3, it is time to create our step on the platform using the sdk.create_step() function, with our inputs and output:

sdk.create_step(
   step_name="part-4-iris-deployment",
   function_path="src/part-4-iris-predict.py",
   function_name="predictIris",
   description="This function retrieves the trained model and classifies the input data by returning the prediction.",
   inputs=[prediction_input, model_input],
   outputs=[prediction_output],
   container_config={
     "included_folders": ["src"],
     "requirements_path": "requirements.txt",
   },
)

When the step creation is finished, you obtain an output describing your step (including its inputs and outputs) as below:

>> Step "part-4-iris-deployment" created
  Inputs:
    - input_data (json)
    - input_model (file)
  Outputs:
    - predictions (json)
>> Steps creation succeeded
>> {'name': 'part-4-iris-deployment',
 'inputs': [{'name': 'input_data', 'data_type': 'json'}, {'name': 'input_model', 'data_type': 'file'}],
 'outputs': [{'name': 'predictions', 'data_type': 'json'}]}

Now that our step is created in the platform, we can embed it in a pipeline and deploy it.

Create your pipeline

Let’s create our pipeline here with sdk.create_pipeline() as in Part 3:

sdk.create_pipeline(
   pipeline_name="part-4-iris-deployment",
   step_name="part-4-iris-deployment",
)

You quickly obtain this output, which describes the pipeline, its step and its inputs and outputs:

>> Pipeline creation succeeded
>> {'pipeline_name': 'part-4-iris-deployment',
 'created_at': 'xxxx-xx-xxTxx:xx:xx.xxxZ',
 'steps': ['part-4-iris-deployment'],
 'open_inputs': [{'input_name': 'input_data',
   'step_name': 'part-4-iris-deployment',
   'data_type': 'json'}, {'input_name': 'input_model',
   'step_name': 'part-4-iris-deployment',
   'data_type': 'file'}],
 'open_outputs': [{'output_name': 'predictions',
   'step_name': 'part-4-iris-deployment',
   'data_type': 'json'}]}

Success

🎉 You’ve created your second step & pipeline with inputs and output!

Create your deployments with input and output mappings

Here, we want to be able to execute the pipeline, either by launching the execution with an url link or at a certain time, but not by a run anymore.

Let's try the first case.

We want the user to be able to:

  • send the input data directly to the application via an url link
  • retrieve the results directly from the endpoint

We want also to specify the path to the stored model on the data store, so that the service will take this model directly from the data store. The user won't be the one selecting the model used, it's only on the technical side.

Create the endpoint with IO mappings

An endpoint is a publicly accessible URL that launches the execution of the Pipeline.

Without the platform, you would need to write an api with a library like Flask, Fast API or Django and deploy it on a server that you would have to maintain.

step4_2

IO Mappings

When you start a new deployment, the data flow has to be configured with a mapping, that you can create with the sdk.

For our endpoint, we have to define the IO mappings defined on the schema above, like this:

inputs_mapping_endpoint = [
   InputSource(
      step_input_name="input_model",
      datastore_path="get_started/models/iris_knn_model.joblib"
      ),
   InputSource(
      step_input_name="input_data",
      endpoint_input_name="input_data"
      )
]

output_mapping_endpoint = [
   OutputDestination(
      step_output_name="predictions",
      endpoint_output_name="iris_type")
]

Create the endpoint

With the platform you can create an endpoint with a simple call to the sdk.create_deployment() function of the SDK, by choosing the endpoint for the argument execution_rule. You also have to specify a deployment_name, used to refer to the created endpoint and that is further used in its URL.

endpoint = sdk.create_deployment(
   execution_rule="endpoint",
   pipeline_name="part-4-iris-deployment",
   deployment_name="part-4-iris-endpoint",
   inputs_mapping=inputs_mapping_endpoint,
   outputs_mapping=output_mapping_endpoint
)

Target the endpoint

Prepare the input data

Now, our endpoint needs data as input, like we did for last part:

import numpy as np
import pandas as pd
from sklearn import datasets

np.random.seed(0)
indices = np.random.permutation(150)
iris_X, iris_y = datasets.load_iris(return_X_y=True, as_frame=True)
iris_X_test = iris_X.loc[indices[90:120],:]

new_data = iris_X_test.to_dict(orient="index")

We need to encapsulate this dictionary in another one whose key is "input_data" (the name of the input of our step, i.e. the name of the argument of our step’s function). We don't need to define the path to our trained model because it is already defined with the output mapping we have just done.

inputs = {
    "input_data": new_data
}

Call the endpoint with the input data

endpoint_url = sdk.base_environment_url  + "/endpoints/" + endpoint["name"]
endpoint_token = endpoint["endpoint_token"]
request = requests.post(endpoint_url, headers={"Authorization": f"EndpointToken {endpoint_token}"}, json=inputs)
request.json()

The HTTP code 200 indicates that the request has been taken into account. In case of an error, we can expect an error code starting with 4XX or 5XX.

It is a way to execute your deployment. But, obviously, you can execute it in any other way (curl command in bash, Postman…).

Warning

As the request is based on the POST method, note that you can't directly target your endpoint and recieve the output by entering it in your web navigator.

Let's check we can get the predictions as output of the endpoint:

print(request.json()['outputs']['iris_type'])
Moreover, you can check the logs on the UI, by clicking on the Executions tab of your environment, selecting your pipeline and choosing the last execution.

Success

🎉 You’ve created your first deployment and you've just called it!

Retrain the model periodically

Let's imagine that our dataset is frequently updated, for instance we get new labeled iris data every day. In this case we might want to retrain our model by triggering our training pipeline part4-iris-train every day.

The platform can do this automatically using the periodic execution rule in our deployment.

A periodic execution rule allows to schedule a pipeline execution at a certain time. For example, every Monday at a certain time, every month, every 5 minutes etc.

The inputs and output have to be defined, with a constant value or a data store mappings. First we will update our trainIris function so that it produces a file output containing our model, that we will then map to the datastore. You can check the entire updated version of this function in src/part-4-iris-predict.py. The only change is done at the return of the function: .. code:: python

return {"model": {"path": "iris_knn_model.joblib"}}

We can then create the step and pipeline as we are used to.

train_output = Output(
   name="model",
   data_type="file"
)

sdk.create_step(
   step_name="part-4-iristrain",
   function_path="src/part-4-iris-predict.py",
   function_name="trainIris",
   outputs=[train_output],
)

sdk.create_pipeline(
   pipeline_name="part-4-iristrain",
   step_name="part-4-iristrain"
)

Now let's create a deployment that executes our pipeline every 5 minutes. In our case, we will map the prediction output (which is our only I/O) to the datastore on the same path that is used in the pediction endpoint deployment. This way our prediction pipeline will automatically use the latest version of our model for predictions.

Note that the schedule argument takes the CRON syntax (examples here: https://crontab.guru/).

step4_3

Adapt IO mapping

Let's create a deployment that will schedule our pipeline to be executed every 5 minutes with the same IO mappings as in the endpoint, except that the new data is a constant input and not something we define during the execution.

output_mapping_periodic = OutputDestination(
   step_output_name="model",
   datastore_path="get_started/models/iris_knn_model.joblib"
)

Create periodic deployment

And now, below is how we create a deployment that will schedule our pipeline execution:

periodic = sdk.create_deployment(
   execution_rule="periodic",
   pipeline_name="part-4-iristrain",
   deployment_name="part-4-iristrain",
   schedule="*/5 * * * *",
   outputs_mapping=[output_mapping_periodic],

)

Our training pipeline will now be executed every 5 minutes, updating our model with the potential new data. The predict pipeline will then use this updated model automatically.

You can check that you actually have a new execution every 5 minutes using the sdk or via the web interface.

Success

🎉 Congrats! You’ve created your second deployment and planned it to run every 5 minutes!

Conclusion

Success

🎉 After this Get Started, you have learned how to use the basic functionalities of the platform! You know now the entire workflow to create a pipeline and deploy it.

step4_4

You are now able to:

  • Deploy your code through a pipeline in a few lines of code, run it whenever you want and have the logs to analyze the execution.
  • Use the Data Store on the platform to upload and download files, models, images, etc.
  • Execute your pipeline via an endpoint that is accessible from outside with a secured token, or via a periodic execution.
  • Make your inputs flexible: set constant values to avoid users to fill in, let users enter inputs values via the endpoint directly, or use the data store to retrieve or put objects.

If you want to go further

One concept has not been explained to you: the metrics.

If you want to go further and discover this feature, you can read the associated documentation.