[PYTHON] [TensorFlow 2] It is recommended to read features from TFRecord in batch units.

Introduction

When training a large amount of data with TensorFlow, it is convenient to use the Dataset API to read the features saved in TFRecord. [\ TensorFlow 2.x compatible version ] How to train a large amount of data using TFRecord & DataSet in TensorFlow (Keras) --Qiita

You can find a lot of sample code by searching, but in fact, I found that by devising a way to read it, it may be possible to read it much faster than the method you often see.

Verification environment

Well-introduced reading method

In addition to the above article, you can use tf.io.parse_single_example () as a reading method that is often introduced in official documents and other sites. [Usage of TFRecords and tf.Example | TensorFlow Core](https://www.tensorflow.org/tutorials/load_data/tfrecord?hl=ja#tfrecord_%E3%83%95%E3%82%A1%E3% 82% A4% E3% 83% AB% E3% 81% AE% E8% AA% AD% E3% 81% BF% E8% BE% BC% E3% 81% BF)

import tensorflow as tf
import numpy as np

feature_dim = 784
def parse_example(example):
    features = tf.io.parse_single_example(
        example,
        features={
            "x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
            "y": tf.io.FixedLenFeature([], dtype=tf.float32)
        })
    x = features["x"]
    y = features["y"]
    return x, y

ds1 = tf.data.TFRecordDataset(["test.tfrecords"]).map(parse_example).batch(512)
print(ds1)
print(next(iter(ds1)))

Like this, we will use map () to convert each record to a feature in the Dataset. Probably the most major usage.

However, I feel that the processing is slow ... Even if I'm learning with GPU, the GPU usage isn't sticking to nearly 100%, but the CPU usage isn't increasing. I feel that I / O is the bottleneck.

Can't read in batch units?

Looking at the official documentation, as a general theory when converting a Dataset

Invoking a user-defined function passed into the map transformation has overhead related to scheduling and executing the user-defined function. We recommend vectorizing the user-defined function (that is, have it operate over a batch of inputs at once) and apply the batch transformation before the map transformation.

is what it reads. Better performance with the tf.data API | TensorFlow Core

In short, it is recommended to do map () using user-defined functions in batch units. If so, would performance improve if data could be read and decoded in batch units?

I couldn't find any Japanese material at all, but it seems that features can be decoded in batch units by using tf.data.experimental.parse_example_dataset (). [^ 1] The decoding process starts after batching as shown below.

[^ 1]: There is also tf.io.parse_example (), and [sample code](https://stackoverflow.com/questions/37151895/tensorflow-read-all-examples-from-a- I also found tfrecords-at-once), but I couldn't use it well because it seems to be a remnant of 1.x series (0.x series?). (When I tried to use TFRecordReader, I was angry that it couldn't be used with Eager Execution)

feature_dim = 784
ds2 = tf.data.TFRecordDataset(["test.tfrecords"]) \
          .batch(512) \
          .apply(tf.data.experimental.parse_example_dataset({
              "x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
              "y": tf.io.FixedLenFeature([], dtype=tf.float32)
          }))
print(ds2)
print(next(iter(ds2)))

Each record is returned in dict format, so you have to convert it to a tuple separately when learning with keras.Model.fit (). In the case of record unit, you can write the conversion to tuple at once in parse_example (), but here you need to add the conversion process to tuple separately with map ().

Performance comparison

I actually tried it. Write 10000 MNIST test data and measure the processing time of the part that reads it. I will not try it until learning this time, but since it is assumed that it will be used for learning after that, in the case of batch unit, the process of converting records to tuples is also included.

First, write the data to the TFRecord file.

data2tfrecord.py


import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist

def feature_float_list(l):
    return tf.train.Feature(float_list=tf.train.FloatList(value=l))

def record2example(r_x, r_y):
    return tf.train.Example(features=tf.train.Features(feature={
        "x": feature_float_list(r_x),
        "y": feature_float_list(r_y)
    }))

filename_test  = "test.tfrecords"

#Write 10000 evaluation data of MNIST
_, (x_test, y_test) = mnist.load_data()
print("x_test    : ", x_test.shape)  # x_test    :  (10000, 28, 28)
print("y_test    : ", y_test.shape)  # y_test    :  (10000,)
x_test  = x_test.reshape((-1, 28*28)).astype("float32") / 255.0
y_test  = y_test.reshape((-1, 1)).astype("float32")
with tf.io.TFRecordWriter(filename_test) as writer:
    for r_x, r_y in zip(x_test, y_test):
        ex = record2example(r_x, r_y)
        writer.write(ex.SerializeToString())

Then load it in two ways.

read_tfrecord.py


import tensorflow as tf
import numpy as np

feature_dim = 784

def parse_example(example):
    features = tf.io.parse_single_example(example, features={
        "x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
        "y": tf.io.FixedLenFeature([], dtype=tf.float32)
    })
    x = features["x"]
    y = features["y"]
    return x, y

ds1 = tf.data.TFRecordDataset(["test.tfrecords"]).map(parse_example).batch(512)
print(ds1) # <BatchDataset shapes: ((None, 784), (None,)), types: (tf.float32, tf.float32)>

def dict2tuple(feat):
    return feat["x"], feat["y"]

ds2 = tf.data.TFRecordDataset(["test.tfrecords"]) \
          .batch(512) \
          .apply(tf.data.experimental.parse_example_dataset({
              "x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
              "y": tf.io.FixedLenFeature([], dtype=tf.float32)
          })) \
          .map(dict2tuple)
print(ds2) # <MapDataset shapes: ((None, 784), (None,)), types: (tf.float32, tf.float32)>

Note that ds1 and ds2 are created differently, but in the end they are exactly the same data. The batch size and the returned data will be the same.

Start an interactive shell with `ʻipython -i read_tfrecord.py`` and measure the processing time required to decode all 10000 records.

ipython


In [1]: %timeit [1 for _ in iter(ds1)]
1 loop, best of 3: 1.4 s per loop

In [2]: %timeit [1 for _ in iter(ds2)]
10 loops, best of 3: 56.3 ms per loop

It's an overwhelming victory in the method of reading in batch units ...!

What if the feature is variable length?

In the previous example, x had a fixed length (784 dimensions), but it's a bit annoying when it comes to variable length (depending on the record). In general, it seems that the major method is to serialize variable-length data and treat it as tf.string.

data2tfrecord_var.py


import numpy as np
import tensorflow as tf

def feature_bytes_list(l):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=l))

def feature_float_list(l):
    return tf.train.Feature(float_list=tf.train.FloatList(value=l))

def record2example(r_x, r_y):
    return tf.train.Example(features=tf.train.Features(feature={
        "x": feature_bytes_list(r_x),
        "y": feature_float_list(r_y)
    }))

filename  = "random.tfrecords"
#Write 1000 variable length data
with tf.io.TFRecordWriter(filename) as writer:
    for i in range(1000):
        r_x = np.random.random(i+1).astype("float32")
        r_y = np.random.random(1)
        ex = record2example([r_x.tostring()], r_y)
        writer.write(ex.SerializeToString())

When decoding in record units, read as follows.

read_tfrecord_var.py


import tensorflow as tf
import numpy as np

def parse_example(example):
    features = tf.io.parse_single_example(
        example,
        features={
            "x": tf.io.FixedLenFeature([], dtype=tf.string),
            "y": tf.io.FixedLenFeature([], dtype=tf.float32)
        })
    x = tf.io.decode_raw(features["x"], tf.float32)
    y = [features["y"]]
    return x, y

ds1 = tf.data.TFRecordDataset(["random.tfrecords"]).map(parse_example).padded_batch(512, ([None], [1]))
print(ds1) # <PaddedBatchDataset shapes: ((None, None), (None, 1)), types: (tf.float32, tf.float32)>

In batch units, the number of x columns is matched to the longest feature, and the shortage is padded with zeros.

ipython


In [1]: %timeit [1 for _ in iter(ds1)]
10 loops, best of 3: 153 ms per loop

What if I do it in batches? Since the number of dimensions of x is different for each record, batching the Dataset and then doing decode_raw with map () will fail.

def dict2tuple(feature):
    return tf.io.decode_raw(feature["x"], tf.float32), [feature["y"]]

ds2 = tf.data.TFRecordDataset(["random.tfrecords"]) \
           .batch(512) \
           .apply(tf.data.experimental.parse_example_dataset({
               "x": tf.io.FixedLenFeature([], dtype=tf.string),
               "y": tf.io.FixedLenFeature([], dtype=tf.float32)
           })) \
           .map(dict2tuple)

print(next(iter(ds2)))
# InvalidArgumentError: DecodeRaw requires input strings to all be the same size, but element 1 has size 4 != 8

However, if you do ```unbatch () and then decode_raw``, you will lose the advantage of speeding up.

ds2 = tf.data.TFRecordDataset(["random.tfrecords"]) \
          .batch(512) \
          .apply(tf.data.experimental.parse_example_dataset({
              "x": tf.io.FixedLenFeature([], dtype=tf.string),
              "y": tf.io.FixedLenFeature([], dtype=tf.float32)
          })).unbatch().map(dict2tuple).padded_batch(512, ([None], [1]))

ipython


In [2]: %timeit [1 for _ in iter(ds2)]
10 loops, best of 3: 136 ms per loop

RaggedFeature

This is where the savior comes in. Only available in TensorFlow 2.1 and later, you can now specify a new type of feature called RaggedFeature when loading data. tf.io.RaggedFeature | TensorFlow Core v2.1.0

With this, the decoded features will be RaggedTensor. Ordinary Tensors need to have the same number of columns per row, but RaggedTensors don't. You can represent a Tensor with different numbers of columns for each row. tf.RaggedTensor | TensorFlow Core v2.1.0

First, when writing data, create Features using the variable length features as they are in the list of float32.

def feature_float_list(l):
    return tf.train.Feature(float_list=tf.train.FloatList(value=l))

def record2example(r_x, r_y):
    return tf.train.Example(features=tf.train.Features(feature={
        "x": feature_float_list(r_x),
        "y": feature_float_list(r_y)
    }))

filename = "random2.tfrecords" #I changed the name
with tf.io.TFRecordWriter(filename) as writer:
    for i in range(1000):
        r_x = np.random.random(i+1).astype("float32")
        r_y = np.random.random(1)
        ex = record2example(r_x, r_y)
        writer.write(ex.SerializeToString())

When loading, specify RaggedFeature as the feature.

ds2 = tf.data.TFRecordDataset(["random2.tfrecords"]) \
          .batch(512) \
          .apply(tf.data.experimental.parse_example_dataset({
              "x": tf.io.RaggedFeature(tf.float32),
              "y": tf.io.FixedLenFeature([], dtype=tf.float32)
          }))

Here, each record of ds2 becomes dict as in the case of fixed length, except that x becomes RaggedTensor. If you slice each line of the RaggedTensor, you will see the Tensor of different sizes as shown below.

ipython


In [1]: next(iter(ds2))["x"][0]
Out[1]: <tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.8635351], dtype=float32)>

In [2]: next(iter(ds2))["x"][1]
Out[2]: <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0.66411597, 0.8526721 ], dtype=float32)>

In [3]: next(iter(ds2))["x"][2]
Out[3]: <tf.Tensor: shape=(3,), dtype=float32, numpy=array([0.7902446 , 0.13108689, 0.05331135], dtype=float32)>

You can pad a short feature with zeros to make it a regular Tensor in batches. This will give you the same batch as if you were decoding record by record.

def dict2tuple(feature):
    return feature["x"].to_tensor(), [feature["y"]]

ds2 = tf.data.TFRecordDataset(["random2.tfrecords"]) \
          .batch(512) \
          .apply(tf.data.experimental.parse_example_dataset({
              "x": tf.io.RaggedFeature(tf.float32),
              "y": tf.io.FixedLenFeature([], dtype=tf.float32)
          })).map(dict2tuple)

ipython


In [4]: %timeit [1 for _ in iter(ds2)]
100 loops, best of 3: 18.6 ms per loop

It has been reduced to nearly one-tenth that of processing on a record-by-record basis. Great!

VarLenFeature

In fact, TensorFlow 1.x / 2.0 also has a way to read variable-length features. If the feature type is VarLenFeature, you can read the feature as SparseTensor. How to make TFRecord is the same as RaggedFeature.

def dict2tuple(feature):
    return tf.sparse.to_dense(feature["x"]), [feature["y"]]

ds3 = tf.data.TFRecordDataset(["random2.tfrecords"]) \
          .batch(512) \
          .apply(tf.data.experimental.parse_example_dataset({
              "x": tf.io.VarLenFeature(tf.float32),
              "y": tf.io.FixedLenFeature([], dtype=tf.float32)
          })) \
          .map(dict2tuple)

ipython


In [5]: %timeit [1 for _ in iter(ds3)]
10 loops, best of 3: 39.9 ms per loop

It's certainly much faster than record-by-record, but slower than RaggedFeature. If possible, I would like to use RaggedFeature in TensorFlow 2.1 or later.

Summary

--Let's read from TFRecord in batch units. --After batching, use parse_example_dataset () to convert. Specify the return value of this function in the argument of ```apply () of the Dataset. --For variable-length features, specify RaggedFeature`` for TensorFlow 2.1 or later and load it.

Recommended Posts

[TensorFlow 2] It is recommended to read features from TFRecord in batch units.
Recommended books read in 2 years from new graduates
The background of the characters in the text image is overexposed to make it easier to read.
Deep nesting in Python makes it hard to read
Read big endian binary in Python and convert it to ndarray
TensorFlow / python> // grammar> It seems to be python's integer division / In Python 2.X, describe from __future__ import division / floor division