Running OpenPose on Inferentia — AWS Neuron Documentation (original) (raw)

Running OpenPose on Inferentia#

Note: this tutorial runs on tensorflow-neuron 1.x only#

Introduction:#

In this tutorial we will compile and deploy Openpose model for Inferentia. This jupyter notebook should run on an inf1.6xlarge instance for compilation and inference. The inference part of this tutorial requires inf1.6xlarge and not the compilation itself. For simplicity we will run this tutorial on a single instance but in real life scenario the compilation can be done on a compute c5.4xlarge instance and the deployment on the inf1 instance family.

In this tutorial we provide two main sections:

  1. Compile the OpenPose model on inf1x6large.
  2. Infer the same compiled model on inf1x6large.

Verify that this Jupyter notebook is running the Python kernel environment that was set up according to the Tensorflow Installation Guide. You can select the Kernel from the “Kernel -> Change Kernel” option on the top of this Jupyter notebook page.

Acknowledgement:#

Many thanks to ildoonet for providing pretrained model as well as the image preprocessing/pose estimating infrastructure.

Download tensorflow pose net frozen graph.#

!wget -c --tries=2 $( wget -q -O - http://www.mediafire.com/file/qlzzr20mpocnpa3/graph_opt.pb | grep -o 'http*://download[^"]*' | tail -n 1 ) -O graph_opt.pb

!pip install tensorflow_neuron==1.15.5.2.8.9.0 --extra-index-url=https://pip.repos.neuron.amazonaws.com/ !pip install neuron_cc==1.13.5.0 --extra-index-url=https://pip.repos.neuron.amazonaws.com

Compile#

Compile the pose net frozen graph into AWS Neuron compatible form. Network input image resolution is adjustable with argument –net_resolution (e. g., –net_resolution=656x368). The compiled model can accept arbitrary batch size input at runtime.

""" Usage: python convert_graph_opt.py /path/to/graph_opt.pb /path/to/graph_opt_neuron.pb """ #import argparse import numpy as np import tensorflow as tf from tensorflow.core.framework.tensor_shape_pb2 import TensorShapeProto import tensorflow.neuron as tfn

def compile(): #parser = argparse.ArgumentParser() #parser.add_argument('input_pb_path', help='Input serialized GraphDef protobuf') #parser.add_argument('output_pb_path', help='Ouput serialized GraphDef protobuf') #parser.add_argument('--net_resolution', default='656x368', help='Network resolution in WxH format, e. g., --net_resolution=656x368') #parser.add_argument('--debug_verify', action='store_true') #args = parser.parse_args()

input_pb_path = './graph_opt.pb'
net_resolution = '656x368'
output_pb_path = './graph_opt_neuron_' + net_resolution + '.pb'

debug_verify = 'store_true'
dim_w, dim_h = net_resolution.split('x')
dim_w = int(dim_w)
dim_h = int(dim_h)
graph_def = tf.GraphDef()
with open(input_pb_path, 'rb') as f:
    graph_def.ParseFromString(f.read())

if debug_verify:
    np.random.seed(0)
    feed_dict = {'image:0': np.random.rand(1, dim_h, dim_w, 3)}
    output_name = 'Openpose/concat_stage7:0'
    with tf.Session(graph=tf.Graph()) as sess:
        tf.import_graph_def(graph_def, name='')
        result_reference = sess.run(output_name, feed_dict)

preprocessing_ops = {'preprocess_divide', 'preprocess_divide/y', 'preprocess_subtract', 'preprocess_subtract/y'}
graph_def = nhwc_to_nchw(graph_def, preprocessing_ops)
graph_def = inline_float32_to_float16(graph_def, preprocessing_ops)
with tf.Session(graph=tf.Graph()) as sess:
    tf.import_graph_def(graph_def, name='')
    no_fuse_ops = preprocessing_ops.union({'Openpose/concat_stage7'})
    infer_graph = tfn.graph_util.inference_graph_from_session(
        sess, shape_feed_dict={'image:0': [1, dim_h, dim_w, 3]}, output_tensors=['Openpose/concat_stage7:0'],
        no_fuse_ops=no_fuse_ops, dynamic_batch_size=True,
    )
with open(output_pb_path, 'wb') as f:
    f.write(infer_graph.as_graph_def().SerializeToString())

if debug_verify:
    with tf.Session(graph=infer_graph) as sess:
        result_compiled = sess.run(output_name, feed_dict)
    np.testing.assert_allclose(result_compiled, result_reference, rtol=1e-2, atol=1e-3)

def inline_float32_to_float16(graph_def, preprocessing_ops): float32_enum = tf.float32.as_datatype_enum float16_enum = tf.float16.as_datatype_enum graph = tf.Graph() with graph.as_default(): tf.import_graph_def(graph_def, name='') graph_def = graph.as_graph_def() for node in graph_def.node: if node.name in preprocessing_ops or node.op == 'Placeholder': cast_input_node_name = node.name continue if node.op == 'Const': if node.attr['dtype'].type == float32_enum: node.attr['dtype'].type = float16_enum tensor_def = node.attr['value'].tensor tensor_def.dtype = float16_enum if tensor_def.tensor_content: const_np = np.frombuffer(tensor_def.tensor_content, dtype=np.float32).astype(np.float16) tensor_def.tensor_content = const_np.tobytes() elif len(tensor_def.float_val): const_np = np.array(tensor_def.float_val).astype(np.float16).view(np.uint16) tensor_def.float_val[:] = [] tensor_def.half_val[:] = list(const_np) else: raise NotImplementedError elif 'T' in node.attr and node.attr['T'].type == float32_enum: node.attr['T'].type = float16_enum for node in graph_def.node: if node.name == cast_input_node_name: node.name = '{}_PreCastFloat32ToFlot16'.format(node.name) input_node = node break cast_input_node = _gen_cast_node_def(cast_input_node_name, tf.float16, input_node)

output_node = graph_def.node[-1]
cast_output_node_name = output_node.name
output_node.name = '{}_PreCastFloat16ToFlot32'.format(output_node.name)
cast_output_node = _gen_cast_node_def(cast_output_node_name, tf.float32, output_node)

preprocessing_ops.add(input_node.name)
new_graph_def = tf.GraphDef()
new_graph_def.node.extend(graph_def.node)
new_graph_def.node.append(cast_input_node)
new_graph_def.node.append(cast_output_node)
graph = tf.Graph()
with graph.as_default():
    tf.import_graph_def(new_graph_def, name='')
return graph.as_graph_def()

def nhwc_to_nchw(graph_def, preprocessing_ops): graph = tf.Graph() with graph.as_default(): tf.import_graph_def(graph_def, name='') graph_def = graph.as_graph_def() node_name_to_node = {node.name: node for node in graph_def.node} for node in graph_def.node: if node.name in preprocessing_ops or node.op == 'Placeholder': transpose_input_node_name = node.name continue if node.op == 'Conv2D': node.attr['data_format'].s = b'NCHW' strides = node.attr['strides'].list.i strides[:] = [strides[0], strides[3], strides[1], strides[2]] elif node.op == 'BiasAdd': if node.name != 'probs/BiasAdd': node.attr['data_format'].s = b'NCHW' elif node.op == 'MaxPool': node.attr['data_format'].s = b'NCHW' ksize = node.attr['ksize'].list.i ksize[:] = [ksize[0], ksize[3], ksize[1], ksize[2]] strides = node.attr['strides'].list.i strides[:] = [strides[0], strides[3], strides[1], strides[2]] elif node.op in {'Concat', 'ConcatV2'}: node_axes = node_name_to_node[node.input[-1]] node_axes.attr['value'].tensor.int_val[:] = [1] for node in graph_def.node: if node.name == transpose_input_node_name: node.name = '{}_PreTransposeNHWC2NCHW'.format(node.name) input_node = node break transpose_input_node, transpose_input_perm_node = _gen_transpose_def(transpose_input_node_name, [0, 3, 1, 2], input_node)

output_node = graph_def.node[-1]
transpose_output_node_name = output_node.name
output_node.name = '{}_PreTransposeNCHW2NHWC'.format(output_node.name)
transpose_output_node, transpose_output_perm_node = _gen_transpose_def(transpose_output_node_name, [0, 2, 3, 1], output_node)

preprocessing_ops.add(input_node.name)
preprocessing_ops.add(transpose_input_perm_node.name)
new_graph_def = tf.GraphDef()
new_graph_def.node.extend(graph_def.node)
new_graph_def.node.append(transpose_input_perm_node)
new_graph_def.node.append(transpose_input_node)
new_graph_def.node.append(transpose_output_perm_node)
new_graph_def.node.append(transpose_output_node)
graph = tf.Graph()
with graph.as_default():
    tf.import_graph_def(new_graph_def, name='')
return graph.as_graph_def()

def _gen_cast_node_def(name, target_dtype, input_node): cast_node = tf.NodeDef(name=name, op='Cast') cast_node.input.append(input_node.name) cast_node.attr['DstT'].type = target_dtype.as_datatype_enum cast_node.attr['SrcT'].type = input_node.attr['T'].type cast_node.attr['Truncate'].b = False return cast_node

def _gen_transpose_def(name, perm, input_node): perm_node = tf.NodeDef(name='{}/perm'.format(name), op='Const') perm_node.attr['dtype'].type = tf.int32.as_datatype_enum tensor_def = perm_node.attr['value'].tensor tensor_def.dtype = tf.int32.as_datatype_enum tensor_def.tensor_shape.dim.append(TensorShapeProto.Dim(size=4)) tensor_def.tensor_content = np.array(perm, dtype=np.int32).tobytes() transpose_node = tf.NodeDef(name=name, op='Transpose') transpose_node.input.append(input_node.name) transpose_node.input.append(perm_node.name) transpose_node.attr['T'].type = input_node.attr['T'].type transpose_node.attr['Tperm'].type = tf.int32.as_datatype_enum return transpose_node, perm_node

compile()

Sample output will look like below:

WARNING:tensorflow:From :47: inference_graph_from_session (from tensorflow_neuron.python.graph_util) is deprecated and will be removed in a future version.

Instructions for updating:

Please refer to AWS documentation on Neuron integrated TensorFlow 2.0.

INFO:tensorflow:Froze 0 variables.

INFO:tensorflow:Converted 0 variables to const ops.

INFO:tensorflow:fusing subgraph {subgraph neuron_op_ed41d2deb8c54255 with input tensors ["<tf.Tensor 'preprocess_subtract0/_0:0' shape=(1, 3, 368, 656) dtype=float16>"], output tensors ["<tf.Tensor 'Openpose/concat_stage7_PreCastFloat16ToFlot32:0' shape=(1, 46, 82, 57) dtype=float16>"]} with neuron-cc

INFO:tensorflow:Number of operations in TensorFlow session: 474

INFO:tensorflow:Number of operations after tf.neuron optimizations: 474

INFO:tensorflow:Number of operations placed on Neuron runtime: 465

Deploy#

Using same instance to deploy the model. In case of different deployment instance, launch a deployment inf1 instance and copy the AWS Neuron optimized tensorflow frozen graph graph_opt_neuron_656x368.pb to the deployment inf1 instance. The smallest instance type inf1.xlarge is sufficient for this demo.

Your graph_opt_neuron_656x368.pb can now be plugged into ildoonet seemlessly if you have tensorflow-neuron installed. When it is used at runtime, please ensure that the image resolution is the same as compile-time image resolution, i. e., 656x368.

Measure performance on the compiled frozen graph using dummy inputs.

""" Copyright (C) 2020, Amazon.com. All Rights Reserved """ import os import atexit import time import math import json from collections import OrderedDict, Counter from contextlib import contextmanager, ContextDecorator from functools import wraps from tensorflow.python.client import session from tensorflow.python.platform import tf_logging as logging

class measure_performance(ContextDecorator): """Convenient tool for performance measurements. Can be apply on tensorflow session.run, tf-serving unary gRPC calls, or a given custom function. Usage: To generate performance report for the entire Python or gRPC-client process, insert the following function call before running inferences: tfn.measure_performance() Then latency/throughput report will be generated when the process terminates. Alternatively, it is possible to use tfn.measure_performance programmatically as a context manager. Performance measurement will be done for all inferences happening under this context. Report will be displayed as INFO level log when exiting the context. It is also possible to obtain a JSON format report in Python. For example: with tfn.measure_performance() as perf: ... (run some inferences) ... report_json = perf.report() report_full_json = perf.report(verbosity=1) """

def __init__(self, func=None, window_size=1):
    self.perf_tracker = PerformanceTracker(window_size)
    atexit.register(self.perf_tracker.report)
    self._original_run = session.Session.run
    self._original_grpc_call = None
    if callable(func):
        self.perf_tracker.register_func(self._track_performance(func))
    else:
        session.Session.run = self._track_performance(session.Session.run)
        try:
            import grpc
            from tensorflow_serving.apis import prediction_service_pb2_grpc
            dummy_stub = prediction_service_pb2_grpc.PredictionServiceStub(grpc.insecure_channel(''))
            self._grpc_callable_type = type(dummy_stub.Predict)
            self._original_grpc_call = self._grpc_callable_type.__call__
        except ImportError:
            pass
        if callable(self._original_grpc_call):
            self._grpc_callable_type.__call__ = self._track_performance(
                grpc._channel._UnaryUnaryMultiCallable.__call__
            )

def __enter__(self):
    return self.perf_tracker

def __exit__(self, *exc):
    atexit.unregister(self.perf_tracker.report)
    self.perf_tracker.report()
    session.Session.run = self._original_run
    if self._original_grpc_call is not None:
        self._grpc_callable_type.__call__ = self._original_grpc_call
    return False

def _track_performance(self, func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        self.perf_tracker.add_timestamps(start, end)
        return result
    return wrapper

class PerformanceTracker(ContextDecorator):

description = (
    "Latency unit: second. Throughput unit: number of batched inferences per second. "
    "Reported throughput is a lower bound of the actual throughput as inferences "
    "spanning across window boundaries are not counted towards any of the windows. "
    "'Quiet' periods (i. e., window buckets where the inference function is not called) "
    "are not counted towards the reported average throughput."
)

def __init__(self, window_size):
    self.window_size = window_size
    self.timestamps_list = []
    self._func = None

def __call__(self, *args, **kwargs):
    return self._func(*args, **kwargs)

def register_func(self, func):
    self._func = func

def add_timestamps(self, start, end):
    self.timestamps_list.append([start, end])

def report(self, verbosity=0):
    if self.timestamps_list:
        latency_list = [end - start for start, end in self.timestamps_list]
        latency_json = {
            'p50': percentile(latency_list, 50),
            'p90': percentile(latency_list, 90),
            'p99': percentile(latency_list, 99),
            'p100': percentile(latency_list, 100),
        }
        bucketed_timestamps = [self._get_bucket(start, end) for start, end in self.timestamps_list]
        counted_buckets = Counter(item for item in bucketed_timestamps if item is not None)
        bucket_throughputs = [(key, value / self.window_size) for key, value in sorted(counted_buckets.items())]
        busy_throughputs = list(OrderedDict((key, value) for key, value in bucket_throughputs).values())
        throughput_json = {
            'peak': max(busy_throughputs),
            'median': percentile(busy_throughputs, 50),
            'average': sum(busy_throughputs) / len(busy_throughputs),
        }
        if verbosity > 0:
            throughput_json['trend'] = busy_throughputs
        report_json = {
            'pid': os.getpid(),
            'throughput': throughput_json,
            'latency': latency_json,
            'description': PerformanceTracker.description,
        }
        with _logging_show_info():
            logging.info('performance report:\n{}'.format(json.dumps(report_json, indent=4)))
        return report_json

def _get_bucket(self, start, end):
    bucketed_start = math.floor(start / self.window_size) * self.window_size
    bucketed_end = math.ceil(end / self.window_size) * self.window_size
    if bucketed_end - bucketed_start == self.window_size:
        return bucketed_start
    else:
        return None

def percentile(number_list, percent): pos_float = len(number_list) * percent / 100 max_pos = len(number_list) - 1 pos_floor = min(math.floor(pos_float), max_pos) pos_ceil = min(math.ceil(pos_float), max_pos) number_list = sorted(number_list) return number_list[pos_ceil] if pos_float - pos_floor > 0.5 else number_list[pos_floor]

@contextmanager def _logging_show_info(): try: verbosity = logging.get_verbosity() logging.set_verbosity(logging.INFO) yield finally: logging.set_verbosity(verbosity)

""" Below are the inputs for compiled frozen graph

pb_path is a /path/graph_opt_neuron_656x368.pb num_thread = 8 ( Number of threads that work on each tensorflow session ) batch_size =1 ( batch_size ) net_resolution ,default=656x368 num_inferences = 200 """ import os from concurrent import futures import numpy as np import tensorflow as tf import tensorflow.neuron as tfn

def run_with_dummy(sess, dummy_feed_dict, num_inferences): for _ in range(num_inferences): sess.run('Openpose/concat_stage7:0', dummy_feed_dict)

def main(): NUM_NEURON_CORES = 16 pb_path = './graph_opt_neuron_656x368.pb' num_thread = 8 batch_size = 1 net_resolution = '656x368' num_inferences = 200 dim_w, dim_h = net_resolution.split('x') dim_w = int(dim_w) dim_h = int(dim_h) graph_def = tf.GraphDef() with open(pb_path, 'rb') as f: graph_def.ParseFromString(f.read())

graph_def = tfn.graph_util.tag_multicore(graph_def, NUM_NEURON_CORES)

with tfn.measure_performance() as perf:
    with tf.Session(graph=tf.Graph()) as sess:
        tf.import_graph_def(graph_def, name='')
        input_name = 'image:0'
        input_shape = sess.graph.get_tensor_by_name(input_name).shape.as_list()
        input_shape[0] = batch_size
        input_shape[1] = dim_h
        input_shape[2] = dim_w
        dummy_feed_dict = {input_name: np.zeros(input_shape).astype(np.float32)}
        with futures.ThreadPoolExecutor(max_workers=num_thread) as executor:
            fut_list = [executor.submit(run_with_dummy, sess, dummy_feed_dict, num_inferences) for _ in range(num_thread)]
            res_list = [fut.result() for fut in fut_list]

main()

Sample output will look like below:

INFO:tensorflow:performance report:

{

"pid": 17713,

"throughput": {

"peak": 66.0,

"median": 64.0,

"average": 61.56521739130435

},

"latency": {

"p50": 0.1106414794921875,

"p90": 0.11212301254272461,

"p99": 0.11337876319885254,

"p100": 7.08282732963562

},

"description": "Latency unit: second. Throughput unit: number of batched inferences per second. Reported throughput is a lower bound of the actual throughput as inferences spanning across window boundaries are not counted towards any of the windows. 'Quiet' periods (i. e., window buckets where the inference function is not called) are not counted towards the reported average throughput."

}