1. Introducción

Este notebook implementa una ejecución interactiva de los componentes individuales de un pipeline de TensorFlow Extended (TFX). TFX es una plataforma extremo a extremo para el despliegue de modelos de ML en producción. En TFX se define un DAG con una serie de componentes, muchos de las cuales están definidas por el propio TFX. Los componentes implementados son los principales usados desde la ingestión a la puesta del modelo en servicio en producción. Se usa Keras y TensorFlow 2.

Se pueden usar los componentes estándar de TFX y sus dependencias entre ellos (es decir, qué componente es continuación de otro), ó bien que el desarrollador defina las dependencias con add_downstream_node / add_upstream_node(disponible en cada componente, pej en el componente Trainer). Nótese que algunas dependencias son implícitas, pej: el componente Pusher va siempre después de Trainer.

Se puede leer la historia de TFX aqui.

TFX implementa componentes estándar y las librerías Python para usarlas, las cuales están ya implementadas y listas para usar.

Ejecución: TFX permite la ejecución en orquestradores como Apache Airflow, Kubeflow Pipelines ó Apache Beam. En este notebook el orquestrador será el propio Notebook ya que se va a usar un entorno interactivo.

Metadatos: en un entorno en producción, los metadatos se almacenan con el ML Metadata (MLMD) API. MLMD almacena las propiedades de los metadatos en una base de datos comom MySQL ó SQLite, y el contenido en almacenamiento persistente como un disco duro. En en notebook interactico, ambas propiedades y contenido de los metadatos se almacenan los almacena en el directorio /tmp del notebook.

2. Setup

Importamos los paquetes necesarios, incluyendo los componentes estándar de TFX.

 # OPCIONAL: en caso de usar Colab
 !pip3 install tfx
 # OPCIONAL: Restart Colab runtime after installing TFX
 from google.colab import auth as google_auth
 google_auth.authenticate_user()
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
<ipython-input-1-8c3cba8104c1> in <module>
      1 # OPCIONAL: Restart Colab runtime after installing TFX
----> 2 from google.colab import auth as google_auth
      3 google_auth.authenticate_user()

ModuleNotFoundError: No module named 'google.colab'
# OPCIONAL: en caso de usar AI Platform Notebooks
#!gcloud auth login
import os
import pprint
import tempfile
import urllib

import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

import tfx
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input


%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip
WARNING:absl:RuntimeParameter is only supported on Cloud-based DAG runner currently.

Comprobamos versiones de TF y TFX:

print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.4.1
TFX version: 0.27.0
# Set up logging.
absl.logging.set_verbosity(absl.logging.INFO)

Activamos InteractiveContext, lo cual nos permite que el notebook sea el orquestrador y ejecutar los componentes TFX en el propio notebook. InteractiveContext es una forma más rápida de usar TFX en un notebook, hacer depuración y fue lanzado en 2019.

Es decir, en este notebook, instanciamos los componentes uno a uno con InteractiveContext.run() y se ejecutan en el notebook. En un sistema en producción, los componentes se deben definir dentro de la clase Pipeline y se ejecutan en el orquestrador (véase Building a TFX Pipeline Guide).

# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /var/folders/sr/1jd3j8k5791g_37qk2vdcc_80069vh/T/tfx-interactive-2021-04-03T16_45_34.079859-3hgwznh2 as root for pipeline outputs.
WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /var/folders/sr/1jd3j8k5791g_37qk2vdcc_80069vh/T/tfx-interactive-2021-04-03T16_45_34.079859-3hgwznh2/metadata.sqlite.

3. Carga de datos

Descargamos el dataset de posts de Stack Overflow y convertimos a formato TFRecord antes de iniciar el pipeline:

from tensorflow.keras import utils
import pathlib
from pathlib import Path

def create_tf_example(text, label):

    tf_example = tf.train.Example(features=tf.train.Features(feature={
        'text': tf.train.Feature(bytes_list=tf.train.BytesList(value=[text.encode('utf-8')])),
        'label':tf.train.Feature(int64_list=tf.train.Int64List(value=[label])),
        #'Label': tf.train.Feature(bytes_list=tf.train.BytesList(value=[label.encode('utf-8')])),
    }))
    return tf_example

data_url = 'https://storage.googleapis.com/download.tensorflow.org/data/stack_overflow_16k.tar.gz'
dataset = utils.get_file(
    'stack_overflow_16k.tar.gz',
    data_url,
    untar=True,
    cache_dir='stack_overflow',
    cache_subdir='')
dataset_dir = pathlib.Path(dataset).parent

# Read train data as TFRecords
labels_dict = {'java':0, 'python':1, 'csharp':2, 'javascript':3}
with tf.io.TFRecordWriter(str(dataset_dir/'train/stackoverflow-train.tfrecords')) as writer:
    for train_directory in [dataset_dir/'train/java', dataset_dir/'train/python', dataset_dir/'train/csharp', dataset_dir/'train/javascript']:
        print("Generating: TRAIN ",train_directory.parts[-1], " ", len(list(train_directory.glob('*'))))
        for file in Path(train_directory).iterdir():
            if file.is_file():
                with open (file, "r") as myfile:
                    data = myfile.read().replace("\n", " ")
                myfile.close()
            # data: text; labels: int64, according to labels_dict
            example = create_tf_example(data, labels_dict[ str(train_directory.parts[-1]) ])
            writer.write(example.SerializeToString())
writer.close()
Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/stack_overflow_16k.tar.gz
6053888/6053168 [==============================] - 0s 0us/step
Generating: TRAIN  java   2000
Generating: TRAIN  python   2000
Generating: TRAIN  csharp   2000
Generating: TRAIN  javascript   2000

4. Ejecución interactiva de los componentes TFX individuales

En las siguientes celdas, se van a ejecutar los componentes de TFX uno a uno, y se visualizarán algunos artifacts de salida.

ExampleGen

El componente ExampleGen es normalmente el primero del pipeline de TFX. Genera datos para otros componentes como SchemaGen, StatisticsGen, Transform y otros.Permite leer de CSV, tf.record y Bigquery, pero se podría crear un executor para leer de Avro ó parque. Puede hacer un split de train/eval, ó importar el que ya esté hecho.

Funciones:

  1. Realiza split de datos en entrenamiento y evaluación (por defecto, 2/3 entrenamiento + 1/3 evaluación)
  2. Convierte datos en formato tf.Example
  3. Copia datos en el directorio _tfx_root para que accedan otros componentes

ExampleGen takes as input the path to your data source. In our case, this is the _data_root path that contains the downloaded CSV.

from tfx.components.example_gen.import_example_gen.component import ImportExampleGen
from tfx.proto import example_gen_pb2


path_to_tfrecord_dir = str(dataset_dir/'train')

# Output 2 splits: train:eval=3:1.
output = example_gen_pb2.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=3),
                 example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
example_gen = ImportExampleGen(input_base=path_to_tfrecord_dir, output_config=output)
context.run(example_gen)
INFO:absl:Running driver for ImportExampleGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:Running executor for ImportExampleGen
INFO:absl:Generating examples.
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
INFO:absl:Reading input TFRecord data /tmp/.keras/train/*.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Running publisher for ImportExampleGen
INFO:absl:MetadataStore with DB connection initialized
ExecutionResult at 0x10e255c70
.execution_id 1
.component
.component.inputs {}
.component.outputs
['examples']

Artifacts de salida de ExampleGen. Este componente genera dos artifacts, el artifact de entrenamiento y el de evaluación:

artifact = example_gen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)

Vemos las tres primeras muestras:

# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)

Una vez que ExampleGen ha terminado la ingestión, pasamos al análisis de los datos.

StatisticsGen

El componente StatisticsGen calcula estadísticas de tu dataset para su análisis, y para uso de componentes posteriores en el pipeline. Hace uso de la librería TensorFlow Data Validation.

StatisticsGen tiene como entrada los datos ingestados de ExampleGen.

statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)
context.show(statistics_gen.outputs['statistics'])

Artifacts de salida de StatisticsGen. Se pueden generar diferentes gráficas distintas a la mostrada.

SchemaGen

El componente SchemaGen genera un esquema de tus datos. Un esquema define los tipos de datos, márgenes y propiedades de las features de tu dataset. Usa la librería de TensorFlow Data Validation.

El esquema generado es best-effort, ya que simplemente hace lo que puede para averiguar ciertas propiedades de los datos. Se espera que el usuario revise y modifique según aplique.

Note: The generated schema is best-effort and only tries to infer basic properties of the data. It is expected that you review and modify it as needed.

SchemaGen usa como entrada las estadísticas generadas con StatisticsGen, mirando al split de datos usado por defecto.

IMPORTANTE: the desired behavior of infer_schema(..., infer_feature_shape=True) would be to infer feature shapes for FixedLenFeatures while parsing VarLenFeatures to SparseFeatures in the schema

schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=True)
context.run(schema_gen)

Artifact de salida de SchemaGen, que es el esquema mostrado como tabla:

context.show(schema_gen.outputs['schema'])

Each feature in your dataset shows up as a row in the schema table, alongside its properties. The schema also captures all the values that a categorical feature takes on, denoted as its domain.

To learn more about schemas, see the SchemaGen documentation.

ExampleValidator

El componente ExampleValidator detecta anomalías en los datos, según el esquema generado anteriormente. Hace uso de la librería TensorFlow Data Validation .

ExampleValidator usa como entrada la salida de StatisticsGen, y la salida de SchemaGen (esquema)

example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)

Artifact de salida de ExampleValidator.

context.show(example_validator.outputs['anomalies'])

In the anomalies table, we can see that there are no anomalies. This is what we'd expect, since this the first dataset that we've analyzed and the schema is tailored to it. You should review this schema -- anything unexpected means an anomaly in the data. Once reviewed, the schema can be used to guard future data, and anomalies produced here can be used to debug model performance, understand how your data evolves over time, and identify data errors.

Transform

El componente Transform realiza Feature engineering tanto en entrenamiento como en serving. Hace uso de la librería TensorFlow Transform.

Transform toma como entrada los datos de ExampleGen, el esquema de SchemaGen (ojo al parámetro infer_feature_shape), así como el código python que realiza la transformación.

Let's see an example of user-defined Transform code below (for an introduction to the TensorFlow Transform APIs, see the tutorial). First, we define a few constants for feature engineering:

Nota: La celda %%writefile graba el contenido .py en un archivo, que luego se carga al llamar al componente Transform.

_stackoverflow_constants_module_file = 'stackoverflow_constants.py'
%%writefile {_stackoverflow_constants_module_file}

_FEATURE_KEY = 'text'
_LABEL_KEY = 'label'

_DROPOUT_RATE = 0.2
_EMBEDDING_UNITS = 64
_EVAL_BATCH_SIZE = 5
_HIDDEN_UNITS = 64
_LEARNING_RATE = 1e-4
_L2_REGULARIZER=0.01
_LSTM_UNITS = 64
_VOCAB_SIZE = 8000
_MAX_LEN = 400
_TRAIN_BATCH_SIZE = 10
_NUM_CLASSES = 4
_NUM_FILTERS=200
_FILTER_SIZE=4

Escribimos la función preprocessing_fn que es la que llamará el componente Transform.

_stackoverflow_transform_module_file = 'stackoverflow_transform.py'
%%writefile {_stackoverflow_transform_module_file}

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from typing import List, Text

import stackoverflow_constants

import absl
import tensorflow as tf
from tensorflow import keras
import tensorflow_transform as tft

from tfx.components.trainer.fn_args_utils import FnArgs

_FEATURE_KEY = stackoverflow_constants._FEATURE_KEY
_LABEL_KEY = stackoverflow_constants._LABEL_KEY

_DROPOUT_RATE = stackoverflow_constants._DROPOUT_RATE
_EMBEDDING_UNITS = stackoverflow_constants._EMBEDDING_UNITS
_EVAL_BATCH_SIZE = stackoverflow_constants._EVAL_BATCH_SIZE
_HIDDEN_UNITS = stackoverflow_constants._HIDDEN_UNITS
_LEARNING_RATE = stackoverflow_constants._LEARNING_RATE
_L2_REGULARIZER = stackoverflow_constants._L2_REGULARIZER
_LSTM_UNITS = stackoverflow_constants._LSTM_UNITS
_VOCAB_SIZE = stackoverflow_constants._VOCAB_SIZE
_MAX_LEN = stackoverflow_constants._MAX_LEN
_TRAIN_BATCH_SIZE = stackoverflow_constants._TRAIN_BATCH_SIZE
_NUM_CLASSES = stackoverflow_constants._NUM_CLASSES
_NUM_FILTERS= stackoverflow_constants._NUM_FILTERS
_FILTER_SIZE= stackoverflow_constants._FILTER_SIZE

def _transformed_name(key, is_input=False):
  return key + ('_xf_input' if is_input else '_xf')

def _tokenize_text(text):
  print(text)
  text_sparse = tf.strings.split(tf.reshape(text, [-1])).to_sparse()
  # tft.apply_vocabulary doesn't reserve 0 for oov words. In order to comply
  # with convention and use mask_zero in keras.embedding layer, set oov value
  # to _VOCAB_SIZE and padding value to -1. Then add 1 to all the tokens.
  text_indices = tft.compute_and_apply_vocabulary(
      text_sparse, default_value=_VOCAB_SIZE, top_k=_VOCAB_SIZE)
  dense = tf.sparse.to_dense(text_indices, default_value=-1)
  # TFX transform expects the transform result to be FixedLenFeature.
  padding_config = [[0, 0], [0, _MAX_LEN]]
  dense = tf.pad(dense, padding_config, 'CONSTANT', -1)
  padded = tf.slice(dense, [0, 0], [-1, _MAX_LEN])
  padded += 1
  return padded


# TFX Transform will call this function.
def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.
  Args:
    inputs: map from feature keys to raw not-yet-transformed features.
  Returns:
    Map from string feature key to transformed feature operations.
  """
  return {
      _transformed_name(_LABEL_KEY):
          inputs[_LABEL_KEY],
      _transformed_name(_FEATURE_KEY, True):
          _tokenize_text(inputs[_FEATURE_KEY])
  }

Ejecutamos el componente Transform que transforma los datos.

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath(_stackoverflow_transform_module_file))

context.run(transform)

Artifacts de salida de Transform. Genera dos artifacts:

  • transform_graph is the graph that can perform the preprocessing operations (this graph will be included in the serving and evaluation models).
  • transformed_examples represents the preprocessed training and evaluation data.
transform.outputs

Vemos el artifact transform_graph, que apunta a un directorio que contiene 3 subdirectorios.

train_uri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(train_uri)
  1. El subdirectorio transformed_metadata contiene el esquema de los datos preprocesados.
  2. El subdirectorio transform_fn contiene el grfo de procesado.
  3. El subdirectorio metadata contiene el esquema de los datos originales.

Vamos las tres primeras transformaciones.

# Get the URI of the output artifact representing the transformed examples, which is a directory
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)

Después de tokenizar con el componente Transform, el siguiente paso es el entrenamiento del modelo.

Trainer

El componente Trainer entrena un modelo definido en TensorFlow. Default Trainer support Estimator API, to use Keras API, you need to specify Generic Trainer by setup custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor) in Trainer's contructor.

Trainer toma como entrada el esquema de SchemaGen, los datos transformados y el grafo de Transform, parámetros de entrenamiento, y el código python para entrenar.

Let's see an example of user-defined model code below (for an introduction to the TensorFlow Keras APIs, see the tutorial):

_stackoverflow_trainer_module_file = 'stackoverflow_trainer.py'
%%writefile {_stackoverflow_trainer_module_file}

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from typing import List, Text

import absl
import tensorflow as tf
from tensorflow import keras
import tensorflow_transform as tft

from tfx.components.trainer.fn_args_utils import FnArgs

import stackoverflow_constants

_FEATURE_KEY = stackoverflow_constants._FEATURE_KEY
_LABEL_KEY = stackoverflow_constants._LABEL_KEY

_DROPOUT_RATE = stackoverflow_constants._DROPOUT_RATE
_EMBEDDING_UNITS = stackoverflow_constants._EMBEDDING_UNITS
_EVAL_BATCH_SIZE = stackoverflow_constants._EVAL_BATCH_SIZE
_HIDDEN_UNITS = stackoverflow_constants._HIDDEN_UNITS
_LEARNING_RATE = stackoverflow_constants._LEARNING_RATE
_L2_REGULARIZER = stackoverflow_constants._L2_REGULARIZER
_LSTM_UNITS = stackoverflow_constants._LSTM_UNITS
_VOCAB_SIZE = stackoverflow_constants._VOCAB_SIZE
_MAX_LEN = stackoverflow_constants._MAX_LEN
_TRAIN_BATCH_SIZE = stackoverflow_constants._TRAIN_BATCH_SIZE
_NUM_CLASSES = stackoverflow_constants._NUM_CLASSES
_NUM_FILTERS= stackoverflow_constants._NUM_FILTERS
_FILTER_SIZE= stackoverflow_constants._FILTER_SIZE

def _transformed_name(key, is_input=False):
  return key + ('_xf_input' if is_input else '_xf')

def _gzip_reader_fn(filenames):
  return tf.data.TFRecordDataset(filenames, compression_type='GZIP')

def _input_fn(file_pattern: List[Text],
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:

  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())

  dataset = tf.data.experimental.make_batched_features_dataset(
      file_pattern=file_pattern,
      batch_size=batch_size,
      features=transformed_feature_spec,
      reader=_gzip_reader_fn,
      label_key=_transformed_name(_LABEL_KEY))

  return dataset


def _build_keras_model() -> keras.Model:

  model = keras.Sequential([
      keras.layers.Embedding(_VOCAB_SIZE + 2,_EMBEDDING_UNITS,name=_transformed_name(_FEATURE_KEY)),
      #keras.layers.Bidirectional(keras.layers.LSTM(_LSTM_UNITS, dropout=_DROPOUT_RATE)),
      keras.layers.Reshape((_MAX_LEN, _EMBEDDING_UNITS, 1)),
      keras.layers.Conv2D(_NUM_FILTERS,(_FILTER_SIZE,_EMBEDDING_UNITS),activation='relu',kernel_regularizer=keras.regularizers.l2(_L2_REGULARIZER)),
      keras.layers.Flatten(),
      keras.layers.Dropout(_DROPOUT_RATE),
      keras.layers.Dense(_HIDDEN_UNITS, activation='relu'),
      keras.layers.Dense(_NUM_CLASSES, activation ='softmax')
  ])

  model.compile(
      loss='sparse_categorical_crossentropy',
      optimizer=keras.optimizers.Adam(_LEARNING_RATE),
      metrics=['sparse_categorical_accuracy'])



  model.summary(print_fn=absl.logging.info)
  return model


def _get_serve_tf_examples_fn(model, tf_transform_output):

  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
    transformed_features = model.tft_layer(parsed_features)
    return model(transformed_features)

  return serve_tf_examples_fn


# TFX Trainer will call this function.
def run_fn(fn_args: FnArgs):

  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(
      fn_args.train_files, tf_transform_output, batch_size=_TRAIN_BATCH_SIZE)

  eval_dataset = _input_fn(
      fn_args.eval_files, tf_transform_output, batch_size=_EVAL_BATCH_SIZE)

  mirrored_strategy = tf.distribute.MirroredStrategy()
  with mirrored_strategy.scope():
    model = _build_keras_model()


  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }

  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Pasamos el código al componente Trainer para entrenar:

trainer = Trainer(
    module_file=os.path.abspath(_stackoverflow_trainer_module_file),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=3000),
    eval_args=trainer_pb2.EvalArgs(num_steps=3000))
context.run(trainer)

Analyze Training with TensorBoard

Take a peek at the trainer artifact. It points to a directory containing the model subdirectories.

model_artifact_dir = trainer.outputs['model'].get()[0].uri
pp.pprint(os.listdir(model_artifact_dir))
model_dir = os.path.join(model_artifact_dir, 'serving_model_dir')
pp.pprint(os.listdir(model_dir))

Optionally, we can connect TensorBoard to the Trainer to analyze our model's training curves.

#model_run_artifact_dir = trainer.outputs['model_run'].get()[0].uri

#%load_ext tensorboard
#%tensorboard --logdir {model_run_artifact_dir}

Evaluator

El componente Evaluator evalúa métricas sobre el set de evaluación. hace uso de la librería TensorFlow Model Analysis. Evaluator también valida si un modelo nuevo es mejor que el anterior, y en caso afirmativo, lo promociona. Esto es útil en el case de un pipeline de producción donde se entrenan y validan modelos diariamente. en este notebook sólo entrenamos un modelo, así que es el único que se promociona en cualqui caso.

Evaluator tomo como entrada ExampleGen, el modelo de Trainer, and slicing configuration. The slicing configuration allows you to slice your metrics on feature values (e.g. how does your model perform on taxi trips that start at 8am versus 8pm?). See an example of this configuration below:

print(example_gen.outputs['examples'])
eval_config = tfma.EvalConfig(
      model_specs=[tfma.ModelSpec(label_key='label')],
      slicing_specs=[tfma.SlicingSpec()],
      metrics_specs=[
          tfma.MetricsSpec(metrics=[
              tfma.MetricConfig(
                  class_name='SparseCategoricalAccuracy',
                  threshold=tfma.MetricThreshold(
                      value_threshold=tfma.GenericValueThreshold(
                          # Si el modelo no tiene una precisión mayor de 0.8
                          lower_bound={'value': 0.8}),
                      change_threshold=tfma.GenericChangeThreshold(
                          direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                          absolute={'value': -1e-2})))
          ])
      ]
      )

Pasamos la configuración anterior al componente Evaluator:

from tfx.types import standard_artifacts
from tfx.types import channel

# Use TFMA to compute a evaluation statistics over features of a model and
# validate them against a baseline.

# The model resolver is only required if performing model validation in addition
# to evaluation. In this case we validate against the latest blessed model. If
# no model has been blessed before (as in this case) the evaluator will make our
# candidate the first blessed model.
model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
context.run(model_resolver)

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    # Change threshold will be ignored if there is no baseline (first run).
    eval_config=eval_config)
context.run(evaluator)

Now let's examine the output artifacts of Evaluator.

evaluator.outputs

Using the evaluation output we can show the default visualization of global metrics on the entire evaluation set.

context.show(evaluator.outputs['evaluation'])

To see the visualization for sliced evaluation metrics, we can directly call the TensorFlow Model Analysis library.

import tensorflow_model_analysis as tfma

# Get the TFMA output result path and load the result.
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
tfma_result = tfma.load_eval_result(PATH_TO_RESULT)

# Show data sliced along feature column trip_start_hour.
#tfma.view.render_slicing_metrics(
#    tfma_result, slicing_column='trip_start_hour')

This visualization shows the same metrics, but computed at every feature value of trip_start_hour instead of on the entire evaluation set.

TensorFlow Model Analysis supports many other visualizations, such as Fairness Indicators and plotting a time series of model performance. To learn more, see the tutorial.

Al añadir umbrales a la configuración, podemos ver también la salida de la validación. La presencia de un artifact blessing (bendecido) indica que nuestro modelo pasó la validación. Si esta es la primera validación, el modelo candidato es automáticamente bendecido.

blessing_uri = evaluator.outputs.blessing.get()[0].uri
!ls -l {blessing_uri}

Now can also verify the success by loading the validation result record:

PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
print(tfma.load_validation_result(PATH_TO_RESULT))

Pusher

El componente Pusher suele ser el último del pipeline de TFX. Comprueba si se ha pasado la validación, y exporta el modelo al directorio _serving_model_dir.

_serving_model_dir = os.path.join("./", "my_serving_model")


pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=_serving_model_dir)))
context.run(pusher)

Artifacts de salida de Pusher.

pusher.outputs

En concreto, Pusher puede exportar el modelo en formato SavedModel:

push_uri = pusher.outputs.model_push.get()[0].uri
model = tf.saved_model.load(push_uri)

for item in model.signatures.items():
  pp.pprint(item)

Esto concluye el pipeline de TFX