博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
大数据下基于Tensorflow框架的深度学习示例教程
阅读量:6608 次
发布时间:2019-06-24

本文共 13701 字,大约阅读时间需要 45 分钟。

hot3.png

近几年,信息时代的快速发展产生了海量数据,诞生了无数前沿的大数据技术与应用。在当今大数据时代的产业界,商业决策日益基于数据的分析作出。当数据膨胀到一定规模时,基于机器学习对海量复杂数据的分析更能产生较好的价值,而深度学习在大数据场景下更能揭示数据内部的逻辑关系。本文就以大数据作为场景,通过自底向上的教程详述在大数据架构体系中如何应用深度学习这一技术。大数据架构中采用的是hadoop系统以及Kerberos安全认证,深度学习采用的是分布式的Tensorflow架构,hadoop解决了大数据的存储问题,而分布式Tensorflow解决了大数据训练的问题。本教程是我们团队在开发基于深度学习的实时欺诈预警服务时,部署深度学习这一模块时总结出的经验,感兴趣的欢迎深入交流。

安装Tensorflow

我们安装Tensorflow选择的是Centos7,因为Tensorflow需要使用GNU发布的1.5版本的libc库,Centos6系统并不适用该版本库而被抛弃。对于如何联网在线安装Tensorflow,有比较详尽的教程。本教程着重讲一下网上资料较少的离线安装方式,系统的安装更需要在意的是各软件版本的一致性,下面教程也是解决了很多版本不一致的问题后给出的一个方案。首先我们先将整个系统搭建起来吧。

1.安装编程语言Python3.5:在下载软件并解压后执行如下安装命令:

./configure make make testsudo make install

2.安装基于Python的科学计算包python-numpy:在下载软件并解压后执行如下安装命令:

python setup.py install

3.安装Python模块管理的工具wheel:在下载软件后执行如下安装命令:

pip install wheel-0.30.0a0-py2.py3-none-any.whl

4.安装自动下载、构建、安装和管理 python 模块的工具setuptools:在下载软件并解压后执行如下安装命令:

python setup.py install

5.安装Python开发包python-devel:在下载软件后执行如下安装命令:

sudo rpm -i --nodeps python3-devel-3.5.2-4.fc25.x86_64.rpm

6.安装Python包安装管理工具six:在下载软件后执行如下安装命令:

sudo pip install six-1.10.0-py2.py3-none-any.whl

7.安装Java 开发环境JDK8:在下载软件并解压后执行如下移动命令:

mv java1.8 /usr/local/software/jdk

设置JDK的环境变量,编辑文件 .bashrc,加入下面内容

export JAVA_HOME=/usr/local/software/jdkexport JRE_HOME=${JAVA_HOME}/jreexport CLASSPATH=$CLASSPATH:${JAVA_HOME}/lib:${JRE_HOME}/libexport PATH=$PATH:${JAVA_HOME}/bin

进行Java版本的切换,选择对应的版本

sudo update-alternatives --config javasudo update-alternatives --config javac

8.安装Bazel:Bazel是一个类似于Make的工具,是Google为其内部软件开发的特点量身定制的工具,构建Tensorflow项目。在下载后执行如下安装命令:

chmod +x bazel-0.4.3-installer-linux-x86_64.sh./bazel-0.4.3-installer-linux-x86_64.sh –user

9.安装Tensorflow:在下载软件后执行如下安装命令:

pip install --upgrade tensorflow-0.12.1-cp35-cp35m-linux_x86_64.whl

Tensorflow访问HDFS的部署

1.首先安装Hadoop客户端,在下载后执行下面解压移动命令:

tar zxvf hadoop-2.6.0.tar.gzmv hadoop-2.6.0.tar.gz /usr/local/software/Hadoop

进行环境变量的配置/etc/profile,加入如下内容

export PATH=$PATH:/usr/local/software/hadoop/binexport LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/jre/lib/amd64/serverexport HADOOP_HOME=/usr/local/software/hadoopexport HADOOP_HDFS_HOME=/usr/local/software/hadoop

配置完后进行配置更新source /etc/profile

2.其次,安装完客户端后,配置自己的hadoop集群环境文件。

Tensorflow与Kerberos验证的部署

在Tesorflow0.12版本中已经支持了Kerberos验证,本机只要配置好Kerberos文件即可使用。该文中不详述Kerberos的配置内容,罗列一下相关的配置流程。

  • 首先在/etc/krb5.conf文件中进行服务器跟验证策略的配置;
  • 然后在Kerberos服务端生成一个用户文件传至本机;
  • 最后进行Kerberos客户端的权限认证并设置定时任务。

大数据场景下基于分布式Tensorflow的深度学习示例

一、进行数据格式的转换

本文的示例是做的MNIST数据的识别模型,为了更好的读取数据更好的利用内存,我们将本地GZ文件转换成Tensorflow的内定标准格式TFRecord,然后再将转换后的文件上传到HDFS存储。在实际应用中,我们实际利用Spark做了大规模格式转换的处理程序。我们对本地数据处理的相应的转换代码为:

from __future__ import absolute_importfrom __future__ import divisionfrom __future__ import print_functionimport argparseimport osimport tensorflow as tffrom tensorflow.contrib.learn.python.learn.datasets import mnistSOURCE_URL = 'http://yann.lecun.com/exdb/mnist/'TRAIN_IMAGES = 'train-images-idx3-ubyte.gz'  # MNIST filenamesTRAIN_LABELS = 'train-labels-idx1-ubyte.gz'TEST_IMAGES = 't10k-images-idx3-ubyte.gz'TEST_LABELS = 't10k-labels-idx1-ubyte.gz'FLAGS = Nonedef _int64_feature(value):  return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))def _bytes_feature(value):  return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))def convert_to(data_set, name):  images = data_set.images  labels = data_set.labels  num_examples = data_set.num_examples  if images.shape[0] != num_examples:    raise ValueError('Images size %d does not match label size %d.' %                     (images.shape[0], num_examples))  rows = images.shape[1]  cols = images.shape[2]  depth = images.shape[3]  filename = os.path.join(FLAGS.directory, name + '.tfrecords')  print('Writing', filename)  writer = tf.python_io.TFRecordWriter(filename)  for index in range(num_examples):    image_raw = images[index].tostring()    example = tf.train.Example(features=tf.train.Features(feature={        'height': _int64_feature(rows),        'width': _int64_feature(cols),        'depth': _int64_feature(depth),        'label': _int64_feature(int(labels[index])),        'image_raw': _bytes_feature(image_raw)}))    writer.write(example.SerializeToString())  writer.close()def main(argv):  # Get the data.  data_sets = mnist.read_data_sets(FLAGS.directory,                                   dtype=tf.uint8,                                   reshape=False,                                   validation_size=FLAGS.validation_size)  # Convert to Examples and write the result to TFRecords.  convert_to(data_sets.train, 'train')  convert_to(data_sets.validation, 'validation')  convert_to(data_sets.test, 'test')if __name__ == '__main__':  parser = argparse.ArgumentParser()  parser.add_argument(      '--directory',      type=str,      default='/tmp/data',      help='Directory to download data files and write the converted result'  )  parser.add_argument(      '--validation_size',      type=int,      default=5000,      help="""\      Number of examples to separate from the training data for the validation      set.\      """  )  FLAGS = parser.parse_args()  tf.app.run()

二、Tensorflow读取HDFS数据的设置

文中前面内容介绍了HDFS的配置以及将数据转换后存储到HDFS,Tensorflow读取HDFS时只需要简单的两步,首先执行项目时需要加入环境前缀:

CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) python example.py

其次读取数据时,需要在数据的路径前面加入HDFS前缀,比如:

hdfs://default/user/data/example.txt

三、分布式模型的示例代码

该示例代码是读取HDFS上的MNIST数据,建立相应的server与work集群构建出一个三层的深度网络,包含两层卷积层以及一层SoftMax层。代码如下:

from __future__ import print_functionimport mathimport osimport tensorflow as tfflags = tf.app.flags# Flags for configuring the taskflags.DEFINE_string("job_name", None, "job name: worker or ps")flags.DEFINE_integer("task_index", 0,                     "Worker task index, should be >= 0. task_index=0 is "                     "the chief worker task the performs the variable "                     "initialization")flags.DEFINE_string("ps_hosts", "",                    "Comma-separated list of hostname:port pairs")flags.DEFINE_string("worker_hosts", "",                    "Comma-separated list of hostname:port pairs")# Training related flagsflags.DEFINE_string("data_dir", None,                    "Directory where the mnist data is stored")flags.DEFINE_string("train_dir", None,                    "Directory for storing the checkpoints")flags.DEFINE_integer("hidden1", 128,                     "Number of units in the 1st hidden layer of the NN")flags.DEFINE_integer("hidden2", 128,                     "Number of units in the 2nd hidden layer of the NN")flags.DEFINE_integer("batch_size", 100, "Training batch size")flags.DEFINE_float("learning_rate", 0.01, "Learning rate")FLAGS = flags.FLAGSTRAIN_FILE = "train.tfrecords"NUM_CLASSES = 10IMAGE_SIZE = 28IMAGE_PIXELS = IMAGE_SIZE * IMAGE_SIZEdef inference(images, hidden1_units, hidden2_units):  with tf.name_scope('hidden1'):    weights = tf.Variable(        tf.truncated_normal([IMAGE_PIXELS, hidden1_units],                            stddev=1.0 / math.sqrt(float(IMAGE_PIXELS))),name='weights')    biases = tf.Variable(tf.zeros([hidden1_units]),name='biases')    hidden1 = tf.nn.relu(tf.matmul(images, weights) + biases)  with tf.name_scope('hidden2'):    weights = tf.Variable(        tf.truncated_normal([hidden1_units, hidden2_units],                            stddev=1.0 / math.sqrt(float(hidden1_units))),        name='weights')    biases = tf.Variable(tf.zeros([hidden2_units]),                         name='biases')    hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)  with tf.name_scope('softmax_linear'):    weights = tf.Variable(        tf.truncated_normal([hidden2_units, NUM_CLASSES],                            stddev=1.0 / math.sqrt(float(hidden2_units))),name='weights')    biases = tf.Variable(tf.zeros([NUM_CLASSES]),name='biases')    logits = tf.matmul(hidden2, weights) + biases  return logitsdef lossFunction(logits, labels):  labels = tf.to_int64(labels)  cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(      logits, labels, name='xentropy')  loss = tf.reduce_mean(cross_entropy, name='xentropy_mean')  return lossdef training(loss, learning_rate):  tf.summary.scalar(loss.op.name, loss)  optimizer = tf.train.GradientDescentOptimizer(learning_rate)  global_step = tf.Variable(0, name='global_step', trainable=False)  train_op = optimizer.minimize(loss, global_step=global_step)  return train_opdef read_and_decode(filename_queue):  reader = tf.TFRecordReader()  _, serialized_example = reader.read(filename_queue)  features = tf.parse_single_example(      serialized_example,      # Defaults are not specified since both keys are required.      features={          'image_raw': tf.FixedLenFeature([], tf.string),          'label': tf.FixedLenFeature([], tf.int64),      })  # Convert from a scalar string tensor (whose single string has  # length mnist.IMAGE_PIXELS) to a uint8 tensor with shape  # [mnist.IMAGE_PIXELS].  image = tf.decode_raw(features['image_raw'], tf.uint8)  image.set_shape([IMAGE_PIXELS])  image = tf.cast(image, tf.float32) * (1. / 255) - 0.5  # Convert label from a scalar uint8 tensor to an int32 scalar.  label = tf.cast(features['label'], tf.int32)  return image, labeldef inputs(batch_size):  """Reads input data.  Args:    batch_size: Number of examples per returned batch.  Returns:    A tuple (images, labels), where:    * images is a float tensor with shape [batch_size, mnist.IMAGE_PIXELS]      in the range [-0.5, 0.5].    * labels is an int32 tensor with shape [batch_size] with the true label,      a number in the range [0, mnist.NUM_CLASSES).  """  filename = os.path.join(FLAGS.data_dir, TRAIN_FILE)  with tf.name_scope('input'):    filename_queue = tf.train.string_input_producer([filename])    # Even when reading in multiple threads, share the filename    # queue.    image, label = read_and_decode(filename_queue)    # Shuffle the examples and collect them into batch_size batches.    # (Internally uses a RandomShuffleQueue.)    # We run this in two threads to avoid being a bottleneck.    images, sparse_labels = tf.train.shuffle_batch(        [image, label], batch_size=batch_size, num_threads=2,        capacity=1000 + 3 * batch_size,        # Ensures a minimum amount of shuffling of examples.        min_after_dequeue=1000)    return images, sparse_labelsdef device_and_target():  # If FLAGS.job_name is not set, we're running single-machine TensorFlow.  # Don't set a device.  if FLAGS.job_name is None:    raise ValueError("Must specify an explicit `job_name`")  # Otherwise we're running distributed TensorFlow.  print("Running distributed training")  if FLAGS.task_index is None or FLAGS.task_index == "":    raise ValueError("Must specify an explicit `task_index`")  if FLAGS.ps_hosts is None or FLAGS.ps_hosts == "":    raise ValueError("Must specify an explicit `ps_hosts`")  if FLAGS.worker_hosts is None or FLAGS.worker_hosts == "":    raise ValueError("Must specify an explicit `worker_hosts`")  cluster_spec = tf.train.ClusterSpec({      "ps": FLAGS.ps_hosts.split(","),      "worker": FLAGS.worker_hosts.split(","),  })  server = tf.train.Server(      cluster_spec, job_name=FLAGS.job_name, task_index=FLAGS.task_index)  return (      cluster_spec,      server,  )def main(unused_argv):  if FLAGS.data_dir is None or FLAGS.data_dir == "":    raise ValueError("Must specify an explicit `data_dir`")  if FLAGS.train_dir is None or FLAGS.train_dir == "":    raise ValueError("Must specify an explicit `train_dir`")  cluster_spec, server = device_and_target()  if FLAGS.job_name == "ps":      server.join()  elif FLAGS.job_name == "worker":      with tf.device(tf.train.replica_device_setter(worker_device = "/job:worker/task:{}".format(FLAGS.task_index), cluster=cluster_spec)):        images, labels = inputs(FLAGS.batch_size)        logits = inference(images, FLAGS.hidden1, FLAGS.hidden2)        loss = lossFunction(logits, labels)        train_op = training(loss, FLAGS.learning_rate)      with tf.train.MonitoredTrainingSession(          master=server.target,          is_chief=(FLAGS.task_index == 0),          checkpoint_dir=FLAGS.train_dir) as sess:        while not sess.should_stop():          sess.run(train_op)if __name__ == "__main__":  tf.app.run()

四、分布式模型的启动

首先关闭防火墙

sudo iptable –F

然后在不同的机器上面启动服务

#在246.1机器上面运行参数服务器,命令:CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) python /home/bdusr01/tine/Distributed_Tensorflow_MNIST_Model_Used_NN_Read_TFRecords_On_HDFS_Support_Kerberos.py --ps_hosts=10.142.246.1:1120 --worker_hosts=10.142.78.41:1121,10.142.78.45:1122 --data_dir=hdfs://default/user/bdusr01/asy/MNIST_data --train_dir=/home/bdusr01/checkpoint/ --job_name=ps --task_index=0#在78.41机器上面运行worker0,命令:CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) python /home/bdusr01/tine/Distributed_Tensorflow_MNIST_Model_Used_NN_Read_TFRecords_On_HDFS_Support_Kerberos.py --ps_hosts=10.142.246.1:1120 --worker_hosts=10.142.78.41:1121,10.142.78.45:1122 --data_dir=hdfs://default/user/bdusr01/asy/MNIST_data --train_dir=/home/bdusr01/checkpoint/ --job_name=worker --task_index=0#在78.45机器上面运行worker1,命令:CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) python /home/bdusr01/tine/Distributed_Tensorflow_MNIST_Model_Used_NN_Read_TFRecords_On_HDFS_Support_Kerberos.py--ps_hosts=10.142.246.1:1120 --worker_hosts=10.142.78.41:1121,10.142.78.45:1122 --data_dir=hdfs://default/user/bdusr01/asy/MNIST_data --train_dir=/home/bdusr01/checkpoint/ --job_name=worker --task_index=1#在78.41机器上面运行监控,命令:tensorboard --logdir=/home/bdusr01/checkpoint/

五、模型监控

我们在刚刚的41机器上面启动了TensorBoard,可以通过地址进行模型的监控。模型训练过程中参数可以动态的进行观测,示例如下:

 

图片描述

 

模型的网络结构可以详细的参看每个细节,示例如下:

 

图片描述

 

当我们利用分布式的Tensorflow对大数据进行训练完成后,可以利用Bazel构建一个灵活高可用的服务–TensorFlow Serving,能够很方便的将深度学习生产化,解决了模型无法提供服务的弊端。到此为止,本文就将自己项目中的一个基础模块的示例介绍完了,本项目更有含金量的是模型建立、工程开发、业务逻辑部分,如有机会再进行更详细的交流。

 

转载于:https://my.oschina.net/u/2391658/blog/1233536

你可能感兴趣的文章
如何使版面富有节奏感
查看>>
rabbitmq 管理及常用命令
查看>>
iphone导航控制器的开发与使用
查看>>
debian python library re-install
查看>>
如何用转义来给JS添加的input元素设置单引号
查看>>
J2E——网络编程练习
查看>>
VirtualBox移植
查看>>
HTTP要被抛弃? 亚洲诚信携手宝塔开启HTTPS加密快速通道
查看>>
Chrome: 完全移除对WoSign和StartCom证书的信任
查看>>
RecyclerView侧滑删除功能
查看>>
记一个hystrix异常
查看>>
9.02-Spring IOC 容器中Bean的生命周期
查看>>
6.6 tar打包
查看>>
Spring MVC核心技术
查看>>
TCP协议如何保证传输的可靠性
查看>>
Spring Cloud云架构 - SSO单点登录之OAuth2.0 登出流程(3)
查看>>
编程之美 测试赛 石头剪刀布
查看>>
签名问题
查看>>
软件开发各阶段交付物列表
查看>>
2018-05-24 Linux学习
查看>>