TFX: ejemplo con un dataset de Stack Overflow con InteractiveContext
Ejemplo sencillo de TFX con un dataset de posts de Stack Overflow con InteractiveContext.
- 1. Introducción
- 2. Setup
- 3. Carga de datos
- 4. Ejecución interactiva de los componentes TFX individuales
1. Introducción
Este notebook implementa una ejecución interactiva de los componentes individuales de un pipeline de TensorFlow Extended (TFX). TFX es una plataforma extremo a extremo para el despliegue de modelos de ML en producción. En TFX se define un DAG con una serie de componentes, muchos de las cuales están definidas por el propio TFX. Los componentes implementados son los principales usados desde la ingestión a la puesta del modelo en servicio en producción. Se usa Keras y TensorFlow 2.
Se pueden usar los componentes estándar de TFX y sus dependencias entre ellos (es decir, qué componente es continuación de otro), ó bien que el desarrollador defina las dependencias con add_downstream_node / add_upstream_node(disponible en cada componente, pej en el componente Trainer). Nótese que algunas dependencias son implícitas, pej: el componente Pusher va siempre después de Trainer.
Se puede leer la historia de TFX aqui.
TFX implementa componentes estándar y las librerías Python para usarlas, las cuales están ya implementadas y listas para usar.
Ejecución: TFX permite la ejecución en orquestradores como Apache Airflow, Kubeflow Pipelines ó Apache Beam. En este notebook el orquestrador será el propio Notebook ya que se va a usar un entorno interactivo.
Metadatos: en un entorno en producción, los metadatos se almacenan con el ML Metadata (MLMD) API. MLMD almacena las propiedades de los metadatos en una base de datos comom MySQL ó SQLite, y el contenido en almacenamiento persistente como un disco duro. En en notebook interactico, ambas propiedades y contenido de los metadatos se almacenan los almacena en el directorio /tmp del notebook.
# OPCIONAL: en caso de usar Colab
!pip3 install tfx
# OPCIONAL: Restart Colab runtime after installing TFX
from google.colab import auth as google_auth
google_auth.authenticate_user()
# OPCIONAL: en caso de usar AI Platform Notebooks
#!gcloud auth login
import os
import pprint
import tempfile
import urllib
import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()
import tfx
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input
%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip
Comprobamos versiones de TF y TFX:
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))
# Set up logging.
absl.logging.set_verbosity(absl.logging.INFO)
Activamos InteractiveContext, lo cual nos permite que el notebook sea el orquestrador y ejecutar los componentes TFX en el propio notebook. InteractiveContext es una forma más rápida de usar TFX en un notebook, hacer depuración y fue lanzado en 2019.
Es decir, en este notebook, instanciamos los componentes uno a uno con InteractiveContext.run() y se ejecutan en el notebook. En un sistema en producción, los componentes se deben definir dentro de la clase Pipeline y se ejecutan en el orquestrador (véase Building a TFX Pipeline Guide).
# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
context = InteractiveContext()
from tensorflow.keras import utils
import pathlib
from pathlib import Path
def create_tf_example(text, label):
tf_example = tf.train.Example(features=tf.train.Features(feature={
'text': tf.train.Feature(bytes_list=tf.train.BytesList(value=[text.encode('utf-8')])),
'label':tf.train.Feature(int64_list=tf.train.Int64List(value=[label])),
#'Label': tf.train.Feature(bytes_list=tf.train.BytesList(value=[label.encode('utf-8')])),
}))
return tf_example
data_url = 'https://storage.googleapis.com/download.tensorflow.org/data/stack_overflow_16k.tar.gz'
dataset = utils.get_file(
'stack_overflow_16k.tar.gz',
data_url,
untar=True,
cache_dir='stack_overflow',
cache_subdir='')
dataset_dir = pathlib.Path(dataset).parent
# Read train data as TFRecords
labels_dict = {'java':0, 'python':1, 'csharp':2, 'javascript':3}
with tf.io.TFRecordWriter(str(dataset_dir/'train/stackoverflow-train.tfrecords')) as writer:
for train_directory in [dataset_dir/'train/java', dataset_dir/'train/python', dataset_dir/'train/csharp', dataset_dir/'train/javascript']:
print("Generating: TRAIN ",train_directory.parts[-1], " ", len(list(train_directory.glob('*'))))
for file in Path(train_directory).iterdir():
if file.is_file():
with open (file, "r") as myfile:
data = myfile.read().replace("\n", " ")
myfile.close()
# data: text; labels: int64, according to labels_dict
example = create_tf_example(data, labels_dict[ str(train_directory.parts[-1]) ])
writer.write(example.SerializeToString())
writer.close()
ExampleGen
El componente ExampleGen es normalmente el primero del pipeline de TFX. Genera datos para otros componentes como SchemaGen, StatisticsGen, Transform y otros.Permite leer de CSV, tf.record y Bigquery, pero se podría crear un executor para leer de Avro ó parque. Puede hacer un split de train/eval, ó importar el que ya esté hecho.
Funciones:
- Realiza split de datos en entrenamiento y evaluación (por defecto, 2/3 entrenamiento + 1/3 evaluación)
- Convierte datos en formato
tf.Example - Copia datos en el directorio
_tfx_rootpara que accedan otros componentes
ExampleGen takes as input the path to your data source. In our case, this is the _data_root path that contains the downloaded CSV.
from tfx.components.example_gen.import_example_gen.component import ImportExampleGen
from tfx.proto import example_gen_pb2
path_to_tfrecord_dir = str(dataset_dir/'train')
# Output 2 splits: train:eval=3:1.
output = example_gen_pb2.Output(
split_config=example_gen_pb2.SplitConfig(splits=[
example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=3),
example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
]))
example_gen = ImportExampleGen(input_base=path_to_tfrecord_dir, output_config=output)
context.run(example_gen)
Artifacts de salida de ExampleGen. Este componente genera dos artifacts, el artifact de entrenamiento y el de evaluación:
artifact = example_gen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)
Vemos las tres primeras muestras:
# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'train')
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
Una vez que ExampleGen ha terminado la ingestión, pasamos al análisis de los datos.
StatisticsGen
El componente StatisticsGen calcula estadísticas de tu dataset para su análisis, y para uso de componentes posteriores en el pipeline. Hace uso de la librería TensorFlow Data Validation.
StatisticsGen tiene como entrada los datos ingestados de ExampleGen.
statistics_gen = StatisticsGen(
examples=example_gen.outputs['examples'])
context.run(statistics_gen)
context.show(statistics_gen.outputs['statistics'])
Artifacts de salida de StatisticsGen. Se pueden generar diferentes gráficas distintas a la mostrada.
SchemaGen
El componente SchemaGen genera un esquema de tus datos. Un esquema define los tipos de datos, márgenes y propiedades de las features de tu dataset. Usa la librería de TensorFlow Data Validation.
El esquema generado es best-effort, ya que simplemente hace lo que puede para averiguar ciertas propiedades de los datos. Se espera que el usuario revise y modifique según aplique.
Note: The generated schema is best-effort and only tries to infer basic properties of the data. It is expected that you review and modify it as needed.
SchemaGen usa como entrada las estadísticas generadas con StatisticsGen, mirando al split de datos usado por defecto.
IMPORTANTE: the desired behavior of infer_schema(..., infer_feature_shape=True) would be to infer feature shapes for FixedLenFeatures while parsing VarLenFeatures to SparseFeatures in the schema
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=True)
context.run(schema_gen)
Artifact de salida de SchemaGen, que es el esquema mostrado como tabla:
context.show(schema_gen.outputs['schema'])
Each feature in your dataset shows up as a row in the schema table, alongside its properties. The schema also captures all the values that a categorical feature takes on, denoted as its domain.
To learn more about schemas, see the SchemaGen documentation.
ExampleValidator
El componente ExampleValidator detecta anomalías en los datos, según el esquema generado anteriormente. Hace uso de la librería TensorFlow Data Validation .
ExampleValidator usa como entrada la salida de StatisticsGen, y la salida de SchemaGen (esquema)
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
context.run(example_validator)
Artifact de salida de ExampleValidator.
context.show(example_validator.outputs['anomalies'])
In the anomalies table, we can see that there are no anomalies. This is what we'd expect, since this the first dataset that we've analyzed and the schema is tailored to it. You should review this schema -- anything unexpected means an anomaly in the data. Once reviewed, the schema can be used to guard future data, and anomalies produced here can be used to debug model performance, understand how your data evolves over time, and identify data errors.
Transform
El componente Transform realiza Feature engineering tanto en entrenamiento como en serving. Hace uso de la librería TensorFlow Transform.
Transform toma como entrada los datos de ExampleGen, el esquema de SchemaGen (ojo al parámetro infer_feature_shape), así como el código python que realiza la transformación.
Let's see an example of user-defined Transform code below (for an introduction to the TensorFlow Transform APIs, see the tutorial). First, we define a few constants for feature engineering:
Nota: La celda %%writefile graba el contenido .py en un archivo, que luego se carga al llamar al componente Transform.
_stackoverflow_constants_module_file = 'stackoverflow_constants.py'
%%writefile {_stackoverflow_constants_module_file}
_FEATURE_KEY = 'text'
_LABEL_KEY = 'label'
_DROPOUT_RATE = 0.2
_EMBEDDING_UNITS = 64
_EVAL_BATCH_SIZE = 5
_HIDDEN_UNITS = 64
_LEARNING_RATE = 1e-4
_L2_REGULARIZER=0.01
_LSTM_UNITS = 64
_VOCAB_SIZE = 8000
_MAX_LEN = 400
_TRAIN_BATCH_SIZE = 10
_NUM_CLASSES = 4
_NUM_FILTERS=200
_FILTER_SIZE=4
Escribimos la función preprocessing_fn que es la que llamará el componente Transform.
_stackoverflow_transform_module_file = 'stackoverflow_transform.py'
%%writefile {_stackoverflow_transform_module_file}
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from typing import List, Text
import stackoverflow_constants
import absl
import tensorflow as tf
from tensorflow import keras
import tensorflow_transform as tft
from tfx.components.trainer.fn_args_utils import FnArgs
_FEATURE_KEY = stackoverflow_constants._FEATURE_KEY
_LABEL_KEY = stackoverflow_constants._LABEL_KEY
_DROPOUT_RATE = stackoverflow_constants._DROPOUT_RATE
_EMBEDDING_UNITS = stackoverflow_constants._EMBEDDING_UNITS
_EVAL_BATCH_SIZE = stackoverflow_constants._EVAL_BATCH_SIZE
_HIDDEN_UNITS = stackoverflow_constants._HIDDEN_UNITS
_LEARNING_RATE = stackoverflow_constants._LEARNING_RATE
_L2_REGULARIZER = stackoverflow_constants._L2_REGULARIZER
_LSTM_UNITS = stackoverflow_constants._LSTM_UNITS
_VOCAB_SIZE = stackoverflow_constants._VOCAB_SIZE
_MAX_LEN = stackoverflow_constants._MAX_LEN
_TRAIN_BATCH_SIZE = stackoverflow_constants._TRAIN_BATCH_SIZE
_NUM_CLASSES = stackoverflow_constants._NUM_CLASSES
_NUM_FILTERS= stackoverflow_constants._NUM_FILTERS
_FILTER_SIZE= stackoverflow_constants._FILTER_SIZE
def _transformed_name(key, is_input=False):
return key + ('_xf_input' if is_input else '_xf')
def _tokenize_text(text):
print(text)
text_sparse = tf.strings.split(tf.reshape(text, [-1])).to_sparse()
# tft.apply_vocabulary doesn't reserve 0 for oov words. In order to comply
# with convention and use mask_zero in keras.embedding layer, set oov value
# to _VOCAB_SIZE and padding value to -1. Then add 1 to all the tokens.
text_indices = tft.compute_and_apply_vocabulary(
text_sparse, default_value=_VOCAB_SIZE, top_k=_VOCAB_SIZE)
dense = tf.sparse.to_dense(text_indices, default_value=-1)
# TFX transform expects the transform result to be FixedLenFeature.
padding_config = [[0, 0], [0, _MAX_LEN]]
dense = tf.pad(dense, padding_config, 'CONSTANT', -1)
padded = tf.slice(dense, [0, 0], [-1, _MAX_LEN])
padded += 1
return padded
# TFX Transform will call this function.
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
return {
_transformed_name(_LABEL_KEY):
inputs[_LABEL_KEY],
_transformed_name(_FEATURE_KEY, True):
_tokenize_text(inputs[_FEATURE_KEY])
}
Ejecutamos el componente Transform que transforma los datos.
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath(_stackoverflow_transform_module_file))
context.run(transform)
Artifacts de salida de Transform. Genera dos artifacts:
-
transform_graphis the graph that can perform the preprocessing operations (this graph will be included in the serving and evaluation models). -
transformed_examplesrepresents the preprocessed training and evaluation data.
transform.outputs
Vemos el artifact transform_graph, que apunta a un directorio que contiene 3 subdirectorios.
train_uri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(train_uri)
- El subdirectorio
transformed_metadatacontiene el esquema de los datos preprocesados. - El subdirectorio
transform_fncontiene el grfo de procesado. - El subdirectorio
metadatacontiene el esquema de los datos originales.
Vamos las tres primeras transformaciones.
# Get the URI of the output artifact representing the transformed examples, which is a directory
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'train')
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
Después de tokenizar con el componente Transform, el siguiente paso es el entrenamiento del modelo.
Trainer
El componente Trainer entrena un modelo definido en TensorFlow. Default Trainer support Estimator API, to use Keras API, you need to specify Generic Trainer by setup custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor) in Trainer's contructor.
Trainer toma como entrada el esquema de SchemaGen, los datos transformados y el grafo de Transform, parámetros de entrenamiento, y el código python para entrenar.
Let's see an example of user-defined model code below (for an introduction to the TensorFlow Keras APIs, see the tutorial):
_stackoverflow_trainer_module_file = 'stackoverflow_trainer.py'
%%writefile {_stackoverflow_trainer_module_file}
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from typing import List, Text
import absl
import tensorflow as tf
from tensorflow import keras
import tensorflow_transform as tft
from tfx.components.trainer.fn_args_utils import FnArgs
import stackoverflow_constants
_FEATURE_KEY = stackoverflow_constants._FEATURE_KEY
_LABEL_KEY = stackoverflow_constants._LABEL_KEY
_DROPOUT_RATE = stackoverflow_constants._DROPOUT_RATE
_EMBEDDING_UNITS = stackoverflow_constants._EMBEDDING_UNITS
_EVAL_BATCH_SIZE = stackoverflow_constants._EVAL_BATCH_SIZE
_HIDDEN_UNITS = stackoverflow_constants._HIDDEN_UNITS
_LEARNING_RATE = stackoverflow_constants._LEARNING_RATE
_L2_REGULARIZER = stackoverflow_constants._L2_REGULARIZER
_LSTM_UNITS = stackoverflow_constants._LSTM_UNITS
_VOCAB_SIZE = stackoverflow_constants._VOCAB_SIZE
_MAX_LEN = stackoverflow_constants._MAX_LEN
_TRAIN_BATCH_SIZE = stackoverflow_constants._TRAIN_BATCH_SIZE
_NUM_CLASSES = stackoverflow_constants._NUM_CLASSES
_NUM_FILTERS= stackoverflow_constants._NUM_FILTERS
_FILTER_SIZE= stackoverflow_constants._FILTER_SIZE
def _transformed_name(key, is_input=False):
return key + ('_xf_input' if is_input else '_xf')
def _gzip_reader_fn(filenames):
return tf.data.TFRecordDataset(filenames, compression_type='GZIP')
def _input_fn(file_pattern: List[Text],
tf_transform_output: tft.TFTransformOutput,
batch_size: int = 200) -> tf.data.Dataset:
transformed_feature_spec = (
tf_transform_output.transformed_feature_spec().copy())
dataset = tf.data.experimental.make_batched_features_dataset(
file_pattern=file_pattern,
batch_size=batch_size,
features=transformed_feature_spec,
reader=_gzip_reader_fn,
label_key=_transformed_name(_LABEL_KEY))
return dataset
def _build_keras_model() -> keras.Model:
model = keras.Sequential([
keras.layers.Embedding(_VOCAB_SIZE + 2,_EMBEDDING_UNITS,name=_transformed_name(_FEATURE_KEY)),
#keras.layers.Bidirectional(keras.layers.LSTM(_LSTM_UNITS, dropout=_DROPOUT_RATE)),
keras.layers.Reshape((_MAX_LEN, _EMBEDDING_UNITS, 1)),
keras.layers.Conv2D(_NUM_FILTERS,(_FILTER_SIZE,_EMBEDDING_UNITS),activation='relu',kernel_regularizer=keras.regularizers.l2(_L2_REGULARIZER)),
keras.layers.Flatten(),
keras.layers.Dropout(_DROPOUT_RATE),
keras.layers.Dense(_HIDDEN_UNITS, activation='relu'),
keras.layers.Dense(_NUM_CLASSES, activation ='softmax')
])
model.compile(
loss='sparse_categorical_crossentropy',
optimizer=keras.optimizers.Adam(_LEARNING_RATE),
metrics=['sparse_categorical_accuracy'])
model.summary(print_fn=absl.logging.info)
return model
def _get_serve_tf_examples_fn(model, tf_transform_output):
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function
def serve_tf_examples_fn(serialized_tf_examples):
"""Returns the output to be used in the serving signature."""
feature_spec = tf_transform_output.raw_feature_spec()
feature_spec.pop(_LABEL_KEY)
parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
transformed_features = model.tft_layer(parsed_features)
return model(transformed_features)
return serve_tf_examples_fn
# TFX Trainer will call this function.
def run_fn(fn_args: FnArgs):
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(
fn_args.train_files, tf_transform_output, batch_size=_TRAIN_BATCH_SIZE)
eval_dataset = _input_fn(
fn_args.eval_files, tf_transform_output, batch_size=_EVAL_BATCH_SIZE)
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
model = _build_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
signatures = {
'serving_default':
_get_serve_tf_examples_fn(model,
tf_transform_output).get_concrete_function(
tf.TensorSpec(
shape=[None],
dtype=tf.string,
name='examples')),
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
Pasamos el código al componente Trainer para entrenar:
trainer = Trainer(
module_file=os.path.abspath(_stackoverflow_trainer_module_file),
custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=3000),
eval_args=trainer_pb2.EvalArgs(num_steps=3000))
context.run(trainer)
model_artifact_dir = trainer.outputs['model'].get()[0].uri
pp.pprint(os.listdir(model_artifact_dir))
model_dir = os.path.join(model_artifact_dir, 'serving_model_dir')
pp.pprint(os.listdir(model_dir))
Optionally, we can connect TensorBoard to the Trainer to analyze our model's training curves.
#model_run_artifact_dir = trainer.outputs['model_run'].get()[0].uri
#%load_ext tensorboard
#%tensorboard --logdir {model_run_artifact_dir}
Evaluator
El componente Evaluator evalúa métricas sobre el set de evaluación. hace uso de la librería TensorFlow Model Analysis. Evaluator también valida si un modelo nuevo es mejor que el anterior, y en caso afirmativo, lo promociona. Esto es útil en el case de un pipeline de producción donde se entrenan y validan modelos diariamente. en este notebook sólo entrenamos un modelo, así que es el único que se promociona en cualqui caso.
Evaluator tomo como entrada ExampleGen, el modelo de Trainer, and slicing configuration. The slicing configuration allows you to slice your metrics on feature values (e.g. how does your model perform on taxi trips that start at 8am versus 8pm?). See an example of this configuration below:
print(example_gen.outputs['examples'])
eval_config = tfma.EvalConfig(
model_specs=[tfma.ModelSpec(label_key='label')],
slicing_specs=[tfma.SlicingSpec()],
metrics_specs=[
tfma.MetricsSpec(metrics=[
tfma.MetricConfig(
class_name='SparseCategoricalAccuracy',
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
# Si el modelo no tiene una precisión mayor de 0.8
lower_bound={'value': 0.8}),
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={'value': -1e-2})))
])
]
)
Pasamos la configuración anterior al componente Evaluator:
from tfx.types import standard_artifacts
from tfx.types import channel
# Use TFMA to compute a evaluation statistics over features of a model and
# validate them against a baseline.
# The model resolver is only required if performing model validation in addition
# to evaluation. In this case we validate against the latest blessed model. If
# no model has been blessed before (as in this case) the evaluator will make our
# candidate the first blessed model.
model_resolver = ResolverNode(
instance_name='latest_blessed_model_resolver',
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing))
context.run(model_resolver)
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
baseline_model=model_resolver.outputs['model'],
# Change threshold will be ignored if there is no baseline (first run).
eval_config=eval_config)
context.run(evaluator)
Now let's examine the output artifacts of Evaluator.
evaluator.outputs
Using the evaluation output we can show the default visualization of global metrics on the entire evaluation set.
context.show(evaluator.outputs['evaluation'])
To see the visualization for sliced evaluation metrics, we can directly call the TensorFlow Model Analysis library.
import tensorflow_model_analysis as tfma
# Get the TFMA output result path and load the result.
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
tfma_result = tfma.load_eval_result(PATH_TO_RESULT)
# Show data sliced along feature column trip_start_hour.
#tfma.view.render_slicing_metrics(
# tfma_result, slicing_column='trip_start_hour')
This visualization shows the same metrics, but computed at every feature value of trip_start_hour instead of on the entire evaluation set.
TensorFlow Model Analysis supports many other visualizations, such as Fairness Indicators and plotting a time series of model performance. To learn more, see the tutorial.
Al añadir umbrales a la configuración, podemos ver también la salida de la validación. La presencia de un artifact blessing (bendecido) indica que nuestro modelo pasó la validación. Si esta es la primera validación, el modelo candidato es automáticamente bendecido.
blessing_uri = evaluator.outputs.blessing.get()[0].uri
!ls -l {blessing_uri}
Now can also verify the success by loading the validation result record:
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
print(tfma.load_validation_result(PATH_TO_RESULT))
_serving_model_dir = os.path.join("./", "my_serving_model")
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=_serving_model_dir)))
context.run(pusher)
Artifacts de salida de Pusher.
pusher.outputs
En concreto, Pusher puede exportar el modelo en formato SavedModel:
push_uri = pusher.outputs.model_push.get()[0].uri
model = tf.saved_model.load(push_uri)
for item in model.signatures.items():
pp.pprint(item)
Esto concluye el pipeline de TFX