Apache Beam RunInference for scikit-learn

Run in Google Colab View source on GitHub

This notebook demonstrates the use of the RunInference transform for scikit-learn, also called sklearn. Apache Beam RunInference has implementations of the ModelHandler class prebuilt for scikit-learn. For more information about using RunInference, see Get started with AI/ML pipelines in the Apache Beam documentation.

You can choose the appropriate model handler based on your input data type:

With RunInference, these model handlers manage batching, vectorization, and prediction optimization for your scikit-learn pipeline or model.

This notebook demonstrates the following common RunInference patterns:

  • Generate predictions.
  • Postprocess results after RunInference.
  • Run inference with multiple models in the same pipeline.

The linear regression models used in these samples are trained on data that correspondes to the 5 and 10 times tables; that is,y = 5x and y = 10x respectively.

Before you begin

Complete the following setup steps:

  1. Install dependencies for Apache Beam.
  2. Authenticate with Google Cloud.
  3. Specify your project and bucket. You use the project and bucket to save and load models.
pip install google-api-core --quiet pip install google-cloud-pubsub google-cloud-bigquery-storage --quiet pip install apache-beam[gcp,dataframe] --quiet

About scikit-learn versions

scikit-learn is a build-dependency of Apache Beam. If you need to install a different version of sklearn , use %pip install scikit-learn==<version>

from google.colab import auth auth.authenticate_user() 
import pickle from sklearn import linear_model from typing import Tuple  import numpy as np import apache_beam as beam  from apache_beam.ml.inference.sklearn_inference import ModelFileType from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy from apache_beam.ml.inference.base import KeyedModelHandler from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.base import RunInference from apache_beam.options.pipeline_options import PipelineOptions  # NOTE: If an error occurs, restart your runtime. 
import os  # Constants project = "<PROJECT_ID>" # @param {type:'string'} bucket = "<BUCKET_NAME>" # @param {type:'string'}  # To avoid warnings, set the project. os.environ['GOOGLE_CLOUD_PROJECT'] = project 

Create the data and the scikit-learn model

This section demonstrates the following steps:

  1. Create the data to train the scikit-learn linear regression model.
  2. Train the linear regression model.
  3. Save the scikit-learn model using pickle.

In this example, you create two models, one with the 5 times model and a second with the 10 times model.

# Input data to train the sklearn model for the 5 times table. x = np.arange(0, 100, dtype=np.float32).reshape(-1, 1) y = (x * 5).reshape(-1, 1)  def train_and_save_model(x, y, model_file_name):   regression = linear_model.LinearRegression()   regression.fit(x,y)    with open(model_file_name, 'wb') as f:       pickle.dump(regression, f)  five_times_model_filename = 'sklearn_5x_model.pkl' train_and_save_model(x, y, five_times_model_filename)  # Change y to be 10 times, and output a 10 times table. ten_times_model_filename = 'sklearn_10x_model.pkl' train_and_save_model(x, y, ten_times_model_filename) y = (x * 10).reshape(-1, 1) train_and_save_model(x, y, 'sklearn_10x_model.pkl') 

Create a scikit-learn RunInference pipeline

This section demonstrates how to do the following:

  1. Define a scikit-learn model handler that accepts an array_like object as input.
  2. Read the data from BigQuery.
  3. Use the scikit-learn trained model and the scikit-learn RunInference transform on unkeyed data.
%pip install --upgrade google-cloud-bigquery --quiet 
gcloud config set project $project
 Updated property [core/project]. 
# Populated BigQuery table  from google.cloud import bigquery  client = bigquery.Client(project=project)  # Make sure the dataset_id is unique in your project. dataset_id = '{project}.maths'.format(project=project) dataset = bigquery.Dataset(dataset_id)  # Modify the location based on your project configuration. dataset.location = 'US' dataset = client.create_dataset(dataset, exists_ok=True)  # Table name in the BigQuery dataset. table_name = 'maths_problems_1'  query = """     CREATE OR REPLACE TABLE       {project}.maths.{table} ( key STRING OPTIONS(description="A unique key for the maths problem"),     value FLOAT64 OPTIONS(description="Our maths problem" ) );     INSERT INTO maths.{table}     VALUES       ("first_example", 105.00),       ("second_example", 108.00),       ("third_example", 1000.00),       ("fourth_example", 1013.00) """.format(project=project, table=table_name)  create_job = client.query(query) create_job.result() 
 <google.cloud.bigquery.table._EmptyRowIterator at 0x7f97abb4e850> 
sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=five_times_model_filename)    pipeline_options = PipelineOptions().from_dictionary(                                       {'temp_location':f'gs://{bucket}/tmp'})  # Define the BigQuery table specification. table_name = 'maths_problems_1' table_spec = f'{project}:maths.{table_name}'  with beam.Pipeline(options=pipeline_options) as p:   (       p        | "ReadFromBQ" >> beam.io.ReadFromBigQuery(table=table_spec)       | "ExtractInputs" >> beam.Map(lambda x: [x['value']])        | "RunInferenceSklearn" >> RunInference(model_handler=sklearn_model_handler)       | beam.Map(print)   ) 
 PredictionResult(example=[1000.0], inference=array([5000.])) PredictionResult(example=[1013.0], inference=array([5065.])) PredictionResult(example=[108.0], inference=array([540.])) PredictionResult(example=[105.0], inference=array([525.])) 

Use sklearn RunInference on keyed inputs

This section demonstrates how to do the following:

  1. Wrap the SklearnModelHandlerNumpy object around KeyedModelHandler to handle keyed data.
  2. Read the data from BigQuery.
  3. Use the sklearn trained model and the sklearn RunInference transform on a keyed data.
sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=five_times_model_filename)  keyed_sklearn_model_handler = KeyedModelHandler(sklearn_model_handler)  pipeline_options = PipelineOptions().from_dictionary(                                       {'temp_location':f'gs://{bucket}/tmp'}) with beam.Pipeline(options=pipeline_options) as p:   (   p    | "ReadFromBQ" >> beam.io.ReadFromBigQuery(table=table_spec)   | "ExtractInputs" >> beam.Map(lambda x: (x['key'], [x['value']]))    | "RunInferenceSklearn" >> RunInference(model_handler=keyed_sklearn_model_handler)   | beam.Map(print)   ) 
 ('third_example', PredictionResult(example=[1000.0], inference=array([5000.]))) ('fourth_example', PredictionResult(example=[1013.0], inference=array([5065.]))) ('second_example', PredictionResult(example=[108.0], inference=array([540.]))) ('first_example', PredictionResult(example=[105.0], inference=array([525.]))) 

Run multiple models

This code creates a pipeline that takes two RunInference transforms with different models and then combines the output.

from typing import Tuple  def format_output(run_inference_output) -> str:   """Takes input from RunInference for scikit-learn and extracts the output."""   key, prediction_result = run_inference_output   example = prediction_result.example[0]   prediction = prediction_result.inference[0]   return f"key = {key}, example = {example} -> predictions {prediction}"  five_times_model_handler = KeyedModelHandler(     SklearnModelHandlerNumpy(model_uri=five_times_model_filename)) ten_times_model_handler = KeyedModelHandler(     SklearnModelHandlerNumpy(model_uri=ten_times_model_filename))  pipeline_options = PipelineOptions().from_dictionary(                                       {'temp_location':f'gs://{bucket}/tmp'}) with beam.Pipeline(options=pipeline_options) as p:   inputs = (p      | "ReadFromBQ" >> beam.io.ReadFromBigQuery(table=table_spec))   five_times = (inputs     | "Extract For 5" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 5'), [x['value']]))     | "5 times" >> RunInference(model_handler = five_times_model_handler))   ten_times = (inputs     | "Extract For 10" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 10'), [x['value']]))     | "10 times" >> RunInference(model_handler = ten_times_model_handler))   _ = ((five_times, ten_times) | "Flattened" >> beam.Flatten()     | "format output" >> beam.Map(format_output)     | "Print" >> beam.Map(print)) 
 key = third_example * 10, example = 1000.0 -> predictions 10000.0 key = fourth_example * 10, example = 1013.0 -> predictions 10130.0 key = second_example * 10, example = 108.0 -> predictions 1080.0 key = first_example * 10, example = 105.0 -> predictions 1050.0 key = third_example * 5, example = 1000.0 -> predictions 5000.0 key = fourth_example * 5, example = 1013.0 -> predictions 5065.0 key = second_example * 5, example = 108.0 -> predictions 540.0 key = first_example * 5, example = 105.0 -> predictions 525.0