tft.experimental.ptransform_analyzer  |  TFX  |  TensorFlow (original) (raw)

tft.experimental.ptransform_analyzer

Stay organized with collections Save and categorize content based on your preferences.

Applies a user-provided PTransform over the whole dataset.

tft.experimental.ptransform_analyzer(
    inputs: Collection[tf.Tensor],
    ptransform: Union[_BeamPTransform, tft.experimental.CacheablePTransformAnalyzer],
    output_dtypes: Collection[tf.dtypes.DType],
    output_shapes: Collection[List[int]],
    output_asset_default_values: Optional[Collection[Optional[bytes]]] = None,
    name: Optional[str] = None
)

Note that in order to have asset files copied correctly, any outputs that represent asset filenames must be added to the tf.GraphKeys.ASSET_FILEPATHScollection by the caller if using Transform's APIs in compat v1 mode.

Example:

class MeanPerKey(beam.PTransform): def expand(self, pcoll: beam.PCollection[Tuple[np.ndarray, np.ndarray]]) -> Tuple[beam.PCollection[np.ndarray], beam.PCollection[np.ndarray]]: def extract_output(key_value_pairs): keys, values = zip(*key_value_pairs) return [beam.TaggedOutput('keys', keys), beam.TaggedOutput('values', values)] return tuple( pcoll | 'ZipAndFlatten' >> beam.FlatMap(lambda batches: list(zip(*batches))) | 'MeanPerKey' >> beam.CombinePerKey(beam.combiners.MeanCombineFn()) | 'ToList' >> beam.combiners.ToList() | 'Extract' >> beam.FlatMap(extract_output).with_outputs( 'keys', 'values')) def preprocessing_fn(inputs): outputs = tft.experimental.ptransform_analyzer( inputs=[inputs['s'], inputs['x']], ptransform=MeanPerKey(), output_dtypes=[tf.string, tf.float32], output_shapes=[[2], [2]]) (keys, means) = outputs mean_a = tf.reshape(tf.gather(means, tf.where(keys == 'a')), []) return { 'x/mean_a': inputs['x'] / mean_a } raw_data = [dict(x=1, s='a'), dict(x=8, s='b'), dict(x=3, s='a')] feature_spec = dict( x=tf.io.FixedLenFeature([], tf.float32), s=tf.io.FixedLenFeature([], tf.string)) raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec) with tft_beam.Context(temp_dir=tempfile.mkdtemp()): transformed_dataset, transform_fn = ( (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)) transformed_data, transformed_metadata = transformed_dataset transformed_data [{'x/mean_a': 0.5}, {'x/mean_a': 4.0}, {'x/mean_a': 1.5}]

Args
inputs An ordered collection of input Tensors.
ptransform A Beam PTransform that accepts a Beam PCollection where each element is a tuple of ndarrays. Each element in the tuple contains a batch of values for the corresponding input tensor of the analyzer and maintain their shapes and dtypes. It returns a PCollection, or a tuple of PCollections, each containing a single element which is an ndarray or a list of primitive types. The contents of these output PCollections must be consistent with the given values of output_dtypes and output_shapes. It may inherit from tft_beam.experimental.PTransformAnalyzer if access to a temp base directory is needed. Alternatively, it could be an instance oftft.experimental.CacheablePTransformAnalyzer in order to enable cache for this analyzer, when analyzer cache is enabled for this pipeline.
output_dtypes An ordered collection of TensorFlow dtypes of the output of the analyzer.
output_shapes An ordered collection of shapes of the output of the analyzer. Must have the same length as output_dtypes.
output_asset_default_values (Optional) An ordered collection of optionalbytes aligned with output_dtypes/output_shapes. Every item in this collection which is not None indicates that the output is a TF asset path, and its value would be used as the default value of this asset file prior to analysis.
name (Optional) Similar to a TF op name. Used to define a unique scope for this analyzer, which can be used for debugging info.
Returns
A list of output Tensors. These will have dtype and shape as specified by output_dtypes and output_shapes.
Raises
ValueError If output_dtypes and output_shapes have different lengths.