【推荐算法工程师技术栈系列】分布式&数据库–tensorflow

目录

TensorFlow 高阶API

Dataset(tf.data)

源码定义:tensorflow/python/data/ops/dataset_ops.py
用户向导:Dataset Input Pipeline

Dataset可以用来表示输入管道元素集合(张量的嵌套结构)和“逻辑计划“对这些元素的转换操作。在Dataset中元素可以是向量,元组或字典等形式。
另外,Dataset需要配合另外一个类Iterator进行使用,Iterator对象是一个迭代器,可以对Dataset中的元素进行迭代提取。

api说明
from_generator(generator,output_types,output_shapes=None)Creates a Dataset whose elements are generated by generator.
from_tensor_slices(tensors)创建dataset,其元素是给定张量的切片的元素
from_tensors(tensors)创建一个Dataset包含给定张量的单个元素。
interleave(map_func,cycle_length,block_length=1)对数据集进行map转换并合并插入结果
list_files(file_pattern,shuffle=None)A dataset of all files matching a pattern.
range(*args)Creates a Dataset of a step-separated range of values.
skip(count)Creates a Dataset that skips count elements from this dataset.
take(count)Creates a Dataset with at most count elements from this dataset.
zip(datasets)Creates a Dataset by zipping together the given datasets.
prefetch(buffer_size)创建dataset,在请求输入数据集之前从输入数据集中预提取元素
apply(transformation_func)Apply a transformation function to this dataset.
cache(filename=‘‘)Caches the elements in this dataset.
concatenate(dataset)Creates a Dataset by concatenating given dataset with this dataset.
filter(predicate)Filters this dataset according to predicate.
map(map_func,num_parallel_calls=None)Maps map_func across this dataset.
flat_map(map_func)Maps map_func across this dataset and flattens the result.
batch(batch_size)将数据集的连续元素合成批次
padded_batch(batch_size,padded_shapes,padding_values=None)将数据集的连续元素组合到填充批次中,此转换将输入数据集的多个连续元素组合为单个元素。
repeat(count=None)Repeats this dataset count times.
shard(num_shards,index)将Dataset分割成num_shards个子数据集。这个函数在分布式训练中非常有用,它允许每个设备读取唯一子集。
shuffle(buffer_size,seed=None,reshuffle_each_iteration=None)随机混洗数据集的元素。
make_initializable_iterator(shared_name=None)Creates an Iterator for enumerating the elements of this dataset.
make_one_shot_iterator()创建Iterator用于枚举此数据集的元素。(可自动初始化)

Iterator
源码定义:tensorflow/python/data/ops/iterator_ops.py
用户向导:Dataset Input Pipeline > Iterating over datasets

api说明
get_next(name=None)In graph mode, you should typically call this method once and use its result as the input to another computation.

Estimator(tf.estimator)

源码定义:tensorflow/python/estimator/estimator.py
用户向导:Regression Examples

Estimator对象包装(wraps)由model_fn指定的模型,model_fn由给定输入和其他一些参数,返回需要进行训练、计算,或预测的操作.
所有输出(检查点,事件文件等)都被写入model_dir或其子目录.如果model_dir未设置,则使用临时目录.
可以通过RunConfig对象(包含了有关执行环境的信息)传递config参数.它被传递给model_fn,如果model_fn有一个名为“config”的参数(和输入函数以相同的方式).Estimator使得模型可配置化,并且还使用其一些字段来控制内部,特别是关于检查点的控制.
该params参数包含hyperparameter,如果model_fn有一个名为“PARAMS”的参数,并且以相同的方式传递给输入函数,则将它传递给model_fn. Estimator只是沿着参数传递,并不检查它.因此,params的结构完全取决于开发人员.
不能在子类中重写任何Estimator方法(其构造函数强制执行此操作).子类应使用model_fn来配置基类,并且可以添加实现专门功能的方法.

api说明
init(model_fn, model_dir=None, config=None, params=None)Estimator的构造方法,返回EstimatorSpec实例
evaluate(input_fn,steps=None,hooks=None,checkpoint_path=None,name=None)返回一个包含按name为键的model_fn中指定的计算指标的词典。
export_savedmodel(export_dir_base, serving_input_receiver_fn)将推理图作为SavedModel导出到给定的目录中.该方法通过首先调用serving_input_receiver_fn来获取特征Tensors来构建一个新图,然后调用这个Estimator的model_fn来基于这些特征生成模型图.它在新的会话中将给定的检查点恢复到该图中.最后它会在给定的export_dir_base下面创建一个时间戳导出目录,并在其中写入一个SavedModel,其中包含从此会话保存的单个MetaGraphDef.
get_variable_names()返回此模型中所有变量名称的列表.
get_variable_value(name)返回由名称给出的变量的值. 返回值类型:Numpy数组
latest_checkpoint()查找model_dir中最新保存的检查点文件的文件名.
predict(self,input_fn,predict_keys=None)对给定的features产生预测,返回predictions字典张量的计算值.
train(input_fn,hooks=None,steps=None,max_steps=None,saving_listeners=None)训练给定训练数据input_fn的模型.

FeatureColumns(tf.feature_column)

源码定义:tensorflow/python/feature_column/feature_column.py
用户向导:feature_columns
feature_columns2

Feature cloumns是原始数据和Estimator模型之间的桥梁,它们被用来把各种形式的原始数据转换为模型能够使用的格式

api返回值用法
tf.feature_column.bucketized_column(source_column,boundaries)Represents discretized dense input.Bucketized column用来把numeric column的值按照提供的边界(boundaries)离散化为多个值
tf.feature_column.categorical_column_with_hash_bucket(key,hash_bucket_size,dtype=tf.string)Represents sparse feature where ids are set by hashing.用户指定类别的总数,通过hash的方式来得到最终的类别ID
tf.feature_column.categorical_column_with_identity(key,num_buckets,default_value=None)A _CategoricalColumn that returns identity values.与Bucketized column类似,Categorical identity column用单个唯一值表示bucket。
tf.feature_column.categorical_column_with_vocabulary_file(key,vocabulary_file)A _CategoricalColumn with a vocabulary file.类别特征做one-hot编码,字典通过文件给出
tf.feature_column.categorical_column_with_vocabulary_list(key,vocabulary_list)A _CategoricalColumn with in-memory vocabulary.类别特征做one-hot编码,字典通过list给出
tf.feature_column.crossed_column(keys,hash_bucket_size,hash_key=None)Returns a column for performing crosses of categorical features.对keys的笛卡尔积执行hash操作,再把hash的结果对hash_bucket_size取模得到最终的结果
tf.feature_column.embedding_column(categorical_column,dimension,combiner=‘mean‘)_DenseColumn that converts from sparse, categorical input.把原始特征映射为一个低维稠密的实数向量
tf.feature_column.indicator_column(categorical_column)Represents multi-hot representation of given categorical column.把categorical column得到的稀疏tensor转换为one-hot或者multi-hot形式的稠密tensor
tf.feature_column.input_layer(features,feature_columns)Returns a dense Tensor as input layer based on given feature_columns.
tf.feature_column.linear_model(features,feature_columns)Returns a linear prediction Tensor based on given feature_columns.
tf.feature_column.make_parse_example_spec(feature_columns)Creates parsing spec dictionary from input feature_columns.
tf.feature_column.numeric_column(key,shape=(1,),default_value=None,dtype=tf.float32,normalizer_fn=None)Represents real valued or numerical features.
tf.feature_column.shared_embedding_columns(categorical_columns,dimension,combiner=‘mean‘)List of dense columns that convert from sparse, categorical input.把多个特征共享相同的embeding映射空间
tf.feature_column.weighted_categorical_column(categorical_column,weight_feature_key,dtype=tf.float32)给一个类别特征赋予一定的权重,给一个类别特征赋予一定的权重

tf.nn

激活函数op,loss部分抽象

源码定义:tensorflow/python/ops/gen_nn_ops.py
用户向导:Neural Network

api说明
tf.nn.relu(...)Computes rectified linear: max(features, 0).
tf.nn.sigmoid(...)Computes sigmoid of x element-wise.
tf.nn.bias_add(...)
tf.nn.tanh(...)Computes hyperbolic tangent of x element-wise.
tf.nn.dropout()
tf.nn.softmax()
tf.nn.sigmoid_cross_entropy_with_logits(...)Computes sigmoid cross entropy given logits.
tf.nn.top_k(...)Finds values and indices of the k largest entries for the last dimension.
tf.nn.embedding_lookup(params,ids,partition_strategy=‘mod‘)按照ids查找params里面的vector然后输出
tf.nn.embedding_lookup_sparse(params,sp_ids,sp_weights)Looks up ids in a list of embedding tensors,Computes embeddings for the given ids and weights.
tf.nn.zero_fraction(value,name=None)Returns the fraction of zeros in value.
tf.nn.nce_loss()负采样的NCE loss实现

tf.layers

网络模块抽象

api说明
average_pooling1d(...)Average Pooling layer for 1D inputs.
average_pooling2d(...)Average pooling layer for 2D inputs (e.g. images).
average_pooling3d(...)Average pooling layer for 3D inputs (e.g. volumes).
batch_normalization(...)Functional interface for the batch normalization layer.
conv1d(...)Functional interface for 1D convolution layer (e.g. temporal convolution).
conv2d(...)Functional interface for the 2D convolution layer.
conv2d_transpose(...)Functional interface for transposed 2D convolution layer.
conv3d(...)Functional interface for the 3D convolution layer.
conv3d_transpose(...)Functional interface for transposed 3D convolution layer.
dense(...)Functional interface for the densely-connected layer.
dropout(...)Applies Dropout to the input.
flatten(...)Flattens an input tensor while preserving the batch axis (axis 0).
max_pooling1d(...)Max Pooling layer for 1D inputs.
max_pooling2d(...)Max pooling layer for 2D inputs (e.g. images).
max_pooling3d(...)Max pooling layer for 3D inputs (e.g. volumes).
separable_conv1d(...)Functional interface for the depthwise separable 1D convolution layer.
separable_conv2d(...)Functional interface for the depthwise separable 2D convolution layer.

tf.train

tf.train provides a set of classes and functions that help train models.

用户向导:Training

api说明
tf.train.OptimizerThe Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.GradientDescentOptimizerThe Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.AdadeltaOptimizerThe Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.AdagradOptimizerThe Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.AdamOptimizerThe Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.SyncReplicasOptimizerThe Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.SessionRunHookTraining Hooks,Hook to extend calls to MonitoredSession.run().
tf.train.SessionRunArgsTraining Hooks,Represents arguments to be added to a Session.run() call.
tf.train.global_stepsess每执行完一个batch,就给global_step加1,用来统计目前执行的batch数
tf.train.basic_train_loop
tf.train.get_global_step()返回global_step作为name的tensor
tf.train.get_or_create_global_step()Returns and create (if necessary) the global step tensor.
tf.train.write_graph
tf.train.string_input_producer输出字符串到一个输入管道队列
tf.train.Saver(max_to_keep=5)保存模型
tf.train.latest_checkpoint(checkpoint_dir,latest_filename=None)Finds the filename of latest saved checkpoint file.
tf.train.Int64ListInt64List
tf.train.FloatListFloatList
tf.train.FeaturesFeatures
tf.train.ExampleExample
tf.train.batch(tensors,batch_size)Creates batches of tensors in tensors.
tf.trainable_variables()
tf.train.ScaffoldScaffold

tf.linalg

TensorFlow 的线性代数库。
w3cschool TensorFlow官方文档-linalg

api说明
adjoint(...)转置最后两个维度和共轭张量matrix.
band_part(...)复制张量设置每个最内层矩阵中心带外的所有内容
cholesky(...)计算一个或多个方阵的Cholesky分解.
cholesky_solve(...)A X = RHS给出的Cholesky因子分解,求解线性方程组.
det(...)计算一个或多个方阵的行列式.
diag(...)返回具有给定批处理对角线值的批处理对角线张量.
diag_part(...)返回批处理张量的批处理对角线部分.
eigh(...)计算了一批自共轭矩阵的特征分解.
eigvalsh(...)计算一个或多个自共轭矩阵的特征值.
einsum(...)任意维度的张量之间的广义收缩.
expm(...)计算一个或多个方阵的矩阵指数.
eye(...)构造一个单位矩阵或批矩阵.
inv(...)计算一个或多个平方可逆矩阵或它们的倒数
logdet(...)计算hermitian正定矩阵的行列式的对数.
logm(...)计算一个或多个方阵的矩阵对数:
lstsq(...)解决一个或多个线性最小二乘问题.
norm(...)计算向量,矩阵和张量的范数.(不赞成的参数)
qr(...)计算一个或多个矩阵的QR分解.
set_diag(...)返回具有新批处理对角线值的批处理矩阵张量.
slogdet(...)计算行列式的绝对值的符号和日志
solve(...)求解线性方程组.
svd(...)计算一个或多个矩阵的奇异值分解.
tensordot(...)a和b沿指定轴的张量收缩.
trace(...)计算张量x的轨迹.
transpose(...)转置张量a的最后两个维度.
triangular_solve(...)求解具有上三角矩阵或下三角矩阵的线性方程组.

checkpoint(模型保存与恢复)

Estimator 将一个检查点保存到 model_dir 中后,每次调用 Estimator 的 train、eval 或 predict 方法时,都会发生下列情况:

a) Estimator 通过运行 model_fn() 构建模型图。(要详细了解 model_fn(),请参阅创建自定义 Estimator。)

b) Estimator 根据最近写入的检查点中存储的数据来初始化新模型的权重。
换言之,一旦存在检查点,TensorFlow 就会在您每次调用 train()、evaluate() 或 predict() 时重建模型。

Tensorflow Serving

官方例子

half_plus_two的例子

# Download the TensorFlow Serving Docker image and repodocker pull tensorflow/servinggit clone https://github.com/tensorflow/serving# Location of demo modelsTESTDATA="$(pwd)/serving/tensorflow_serving/servables/tensorflow/testdata"# Start TensorFlow Serving container and open the REST API port# docker run -t 表示是否允许伪TTY# docker run --rm表示如果实例已经存在,则先remove掉该实例再重新启动新实例# docker -p设置端口映射# docker -v设置磁盘映射# docker -e设置环境变量docker run -t --rm -p 8501:8501 -v "$TESTDATA/saved_model_half_plus_two_cpu:/models/half_plus_two" -e MODEL_NAME=half_plus_two tensorflow/serving &# Query the model using the predict APIcurl -d '{"instances": [1.0, 2.0, 5.0]}' -X POST http://localhost:8501/v1/models/half_plus_two:predict# Returns => { "predictions": [2.5, 3.0, 4.5] }

serving镜像提供了两种调用方式:gRPC和HTTP请求。gRPC默认端口是8500,HTTP请求的默认端口是8501,serving镜像中的程序会自动加载镜像内/models下的模型,通过MODEL_NAME指定/models下的哪个模型。

创建自定义镜像

docker run --rm -p 8501:8501 --mount type=bind,source=${model_dir},target=/models/dpp_model -e MODEL_NAME=dpp_model -t tensorflow/serving &

官方文档

架构

TensorFlow Serving可抽象为一些组件构成,每个组件实现了不同的API任务,其中最重要的是Servable, Loader, Source, 和 Manager,我们看一下组件之间是如何交互的。

Source

TF Serving发现磁盘上的模型文件,该模型服务的生命周期就开始了。Source组件负责发现模型文件,找出需要加载的新模型。实际上,该组件监控文件系统,当发现一个新的模型版本,就为该模型创建一个Loader。

Loader

Loader需要知道模型的相关信息,包括如何加载模型如何估算模型需要的资源,包括需要请求的RAM、GPU内存。Loader带一个指针,连接到磁盘上存储的模型,其中包含加载模型需要的相关元数据。不过记住,Loader现在还不允许加载模型。

Manager

Manager收到待加载模型版本,开始模型服务流程。此处有两种可能性,第一种情况是模型首次推送部署,Manager先确保模型需要的资源可用,一旦获取相应的资源,Manager赋予Loader权限去加载模型。
第二种情况是为已上线模型部署一个新版本。Manager会先查询Version Policy插件,决定加载新模型的流程如何进行。
具体来说,当加载新模型时,可选择保持 (1) 可用性(the Availability Preserving Policy)或 (2) 资源(the Resource Preserving Policy)。

Servable

最后,当用户请求模型的句柄,Manager返回句柄给Servable。

Servables 是 TensorFlow Serving 中最核心的抽象,是客户端用于执行计算 (例如:查找或推断) 的底层对象。

部署服务

模型导出

将TensorFlow构建的模型用作服务,首先需要确保导出为正确的格式,可以采用TensorFlow提供的SavedModel类。TensorFlow Saver提供模型checkpoint磁盘文件的保存/恢复。事实上SavedModel封装了TensorFlow Saver,对于模型服务是一种标准的导出方法。
SignatureDefs定义了一组TensorFlow支持的计算签名,便于在计算图中找到适合的输入输出张量。简单的说,使用这些计算签名,可以准确指定特定的输入输出节点。目前有3个服务API: 分类、预测和回归。每个签名定义关联一个RPC API。分类SignatureDef用于分类RPC API,预测SignatureDef用于RPC API等等。对于分类SignatureDef,需要一个输入张量(接收数据)以及可能的输出张量: 类别或得分。回归SignatureDef需要一个输入张量以及另一个输出张量。最后预测SignatureDef需要一个可变长度的输入输出张量。

提供tensorflow模型

tf.saved_model API

with sess.graph.as_default() as graph: builder = tf.saved_model.builder.SavedModelBuilder(saved_model_dir) # 1.8版本,1.12之后调整为tf.saved_model.predict_signature_def signature = tf.saved_model.signature_def_utils.predict_signature_def(inputs={'image': in_image}, outputs={'prediction': graph.get_tensor_by_name('final_result:0')},) builder.add_meta_graph_and_variables(sess=sess, tags=["serve"], signature_def_map={'predict':signature, tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:signature }) builder.save()

tf.estimator API

def export_savedmodel(self, export_dir, serving_input_receiver_fn=None, as_text=False, checkpoint_path=None): if serving_input_receiver_fn is None and self.feature_cfgs is None: raise ValueError('Both serving_input_receiver_fn and feature_cfgs are none.') if serving_input_receiver_fn is None: feature_spec = self.feature_cfgs.to_feature_spec(exclude_targets=True) serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec) # 自己封装的estimator savedmodel_path = self.estimator.export_savedmodel( export_dir_base=export_dir, serving_input_receiver_fn=serving_input_receiver_fn, as_text=as_text, checkpoint_path=checkpoint_path ) return savedmodel_path

其中self.feature_cfgs.to_feature_spec函数得到如下结果:

{'follow_seq': VarLenFeature(dtype=tf.int64), 'uid': VarLenFeature(dtype=tf.int64)}

说明了输入】

TensorFlow Serving ModelServer 用于发现新导出的模型,并启动 gRPC 用于提供模型服务。

API请求(predict)

TensorFlow ModelServer通过host:port接受下面这种RESTful API请求:
POST http://host:port/ :
URI: /v1/models/ \({MODEL_NAME}[/versions/\){MODEL_VERSION}]
VERB: classify|regress|predict

其中“/versions/${MODEL_VERSION}”是可选的,如果省略,则使用最新的版本。

该API基本遵循gRPC版本的PredictionService API。

请求URL的示例:

http://host:port/v1/models/iris:classifyhttp://host:port/v1/models/mnist/versions/314:predict

请求格式

预测API的请求体必须是如下格式的JSON对象:

{ // (Optional) Serving signature to use. // If unspecifed default serving signature is used. "signature_name": <string>, // Input Tensors in row ("instances") or columnar ("inputs") format. // A request can have either of them but NOT both. "instances": <value>|<(nested)list>|<list-of-objects> "inputs": <value>|<(nested)list>|<object>}

以行的形式说明输入的tensor

{ "instances": [ { "tag": "foo", "signal": [1, 2, 3, 4, 5], "sensor": [[1, 2], [3, 4]] }, { "tag": "bar", "signal": [3, 4, 1, 2, 5]], "sensor": [[4, 5], [6, 8]] } ]}

以列的形式说明输入的tensor

{ "inputs": { "tag": ["foo", "bar"], "signal": [[1, 2, 3, 4, 5], [3, 4, 1, 2, 5]], "sensor": [[[1, 2], [3, 4]], [[4, 5], [6, 8]]] }}

返回格式

行形式

{ "predictions": <value>|<(nested)list>|<list-of-objects>}

如果模型的输出只包含一个命名的tensor,我们省略名字和predictions key map,直接使用标量或者值的list。如果模型输出多个命名的tensor,我们输出对象list,和上面提到的行形式输入类似。

列形式

{ "outputs": <value>|<(nested)list>|<object>}

如果模型的输出只包含一个命名的tensor,我们省略名字和outputs key map,直接使用标量或者值的list。如果模型输出多个命名的tensor,我们输出对象,其每个key都和输出的tensor名对应,和上面提到的列形式输入类似。

API demo

$ curl -d '{"instances": [1.0,2.0,5.0]}' -X POST http://localhost:8501/v1/models/half_plus_three:predict{ "predictions": [3.5, 4.0, 5.5]}

[译]TensorFlow Serving RESTful API

源码理解

tensorflow-serving源码理解

开发中遇到的其他问题记录

实现numpy中的切片赋值

# 初始化一个3*4的tensor和大小为3的listcis = tf.zeros([3, 4)cis_list = [tf.zeros([4]) for _ in range(3)]# 替换list的一个元素cis_list[k] = eis# 重新堆叠成一个tensor,列切片赋值axis=1?cis = tf.stack(cis_list)

附录

相关文章