如何从 TFRecord 中的数据创建序列(窗口)?

问题描述 投票:0回答:1

我有大约1800个短视频。每个视频时长约 30 秒。我训练了一个 VAE 将每个帧编码为一个潜在向量(大小为 200)。

使用这个 VAE,我创建了一个 TFrecord,其中每个视频都有一个条目。每个条目包含一个大小为 830x200 的数组(830 是帧数,200 是潜在向量的大小),然后还有一个仅包含 4 个元素的数组(整数、一些元数据)。

读取 TFRecord 进行训练后,它会被读入数据集,并且该数据集将被发送到 model.fit。为此,我使用 TFRecordDataset,然后将其映射到我的函数以读取示例,然后执行随机播放、预取和批处理。

我现在正在使用文档中类似于微型 GPT 的模型来预测下一帧。该模型工作正常(忽略结果,结果不好,但至少数据进出,损失减少等等)。但对于每个时期,模型都会采用批量的完整序列,并预测该批次中每个元素的下一帧。因此,在 train_step 中,如果您打印输入,它会显示类似 (32,830,200) 的内容。其中 32 是批量大小,830 是序列长度,200 是特征。

我想要的是,我不想让模型采用 830 的完整序列,而是想将此序列分割成小的重叠序列。

首先,我尝试在 train_step 中执行此操作,用一个 for 循环包围 train_step 中的常用代码(gradienttape 和所有这些),该循环将获取输入序列。但这速度慢得令人痛苦。事实上,它甚至花了 30 分钟才开始训练。

然后我尝试在从 TFRecord 创建数据集的函数中执行此操作。数据集似乎有窗口函数,这正是我所需要的。因此,一旦使用 TFRecordDataset 加载/创建数据集,并且在将函数映射到读取示例后,我就会执行 dataset.window(...),然后像往常一样对它进行批处理,但这不起作用,并且训练也失败了。没有开始。我不知道每个条目包含 2 个数组(800x200 的数组和 4,1 的数组)这一事实是否存在问题。

所以基本上我想做的是将 800x200 序列分割成大小为 100x200 的重叠序列,对它们进行批处理,然后将它们发送到 train_step。因此,理想情况下,train_step 应该接收大小为 32,100,200 的张量。

我现在看到的唯一方法就是以这种方式编写 tfrecords。但这是浪费空间,因为序列确实重叠。

任何有关如何解决此问题的帮助都将非常受欢迎。

您可以在这里看到我用来写入/读取 TFRecords 的代码:

class TFRecordWriter(object):
    """
    Class handling writing the video data (through a VAE) to a TFRecord.
    Please check TFRecordReader to see how to read it back
    """
 
    def __init__(self, videopaths, modelpath, dillation,
                 step_size=1, batch_size=32, train_split=0.8):
        """Creates an object that will be used to save the data into a TFRecord
 
        Args:
            videopaths: Path to the folder where all the videos are. The videos must be
            binarised, created with the script VideoUtils.py
            modelpath: Path to the folder with the trained VAE (created with train_vae.py)
            dillation: "normal", "reverse" or "no". Check FramesLoader for more info
            step_size: How many frames to take. For example step_3 will take 1 out 3 frames
            batch_size: Batch size of the data when we run it against the VAE encoder
            train_split: % of data in train_set (the rest goes to test_set)
        """
        self.videopaths = videopaths
        self.modelpath = modelpath
        self.dillation = dillation
        self.step_size = step_size
        self.batch_size = batch_size
        self.train_split = train_split
 
        self.vae, self.input_dim = load_vae_model(self.modelpath)
 
        # Because the videos have different sizes, we need to break the longer ones
        # into sequences of max 416, so that all the batches have the same number of elements
        # therefore all the batches will be of size 416, latent_size
        # this number needs to be adjusted depending on self.batch_size
        self.seq_length = 416 * 2
 
 
    def serialise_to_tfrecords(self):
        """ Serialises all the data into one file with TFRecords."""
 
        # get all the videos
        run_name = (self.videopaths+'/*.avi')
        videos = glob.glob(run_name)
        random.shuffle(videos)
        db_split = int(len(videos)*self.train_split)
        train_set = videos[:db_split]
        test_set = videos[db_split:]
 
        for ti, tset in enumerate([train_set, test_set]):
 
            if ti==0:
                out_path = self.videopaths + "/train.tfrecord"
            else: 
                out_path = self.videopaths + "/test.tfrecord"
            writer = tf.io.TFRecordWriter(out_path)
 
            for video in tqdm.tqdm(tset, unit='F'):
                frames_latent_vectors = self.video2latentvectors(video)
                vector_length = frames_latent_vectors.shape[0]
                recipe = self.get_recipe(video)
 
                # break down the latent_vectors into sequences of length seq_lentgh
                for seq in range(0, vector_length, self.seq_length):
                    start = seq
                    end = seq + self.seq_length
 
                    if end > vector_length:
                        break
 
                    chunk = frames_latent_vectors[start:end]
                    # self.visualise_latent_reconstructions(frames_latent_vectors)
                    serialised_video = self.prepare_TFRecord(chunk, recipe)
                    writer.write(serialised_video)
 
                # move file to either train or test folder
                # get the folder where the file is
                current_folder = os.path.dirname(video)
                # get the name of the file
                file_name = os.path.basename(video)
 
                if ti == 0:
                    dest_folder = current_folder + "/train/"
                else:
                    dest_folder = current_folder + "/test/"
 
                destination = dest_folder + file_name
                shutil.move(video, destination)
                
            writer.close()
 
 
    def get_recipe(self, videopath):
        """ from the filename we can get the recipe. For example:
        octanoic_0_pentanol_0_octanol_9_dep_90_raw_1_bin.mp4
        means 0% octanoic, 0% pentaol, 9% octanol, 90% DEP"""
 
        # get the file name only, not the full path with folders
        file_name = os.path.basename(videopath)
        # remove the extension
        file_name = os.path.splitext(file_name)[0]
        # find the first four numbers, which are the recipe
        recipe = re.findall(r'\d+', file_name)[:4]
        # transform to int and to numpy
        recipenp = np.array( [int(x) for x in recipe] )
        # normalise and return
        return recipenp / np.sum(recipenp)
 
 
    def video2latentvectors(self, videopath):
        """
        Given a video, it will use a trained vae to return the latent vectors for each frame
        """
 
        # create dataset and load vae
        ds = self.video2dataset(videopath)
        # where to store the data as it is generated by the vae
        vectors = []
 
        # using vae get latent vectors
        for batch in ds:
            _, _, latent = self.vae.encoder(batch)
            vectors.append(latent)
 
        # this will go from (n_video, batch, latent_v) to (n_video*batch, latent_v)
        return np.concatenate( np.array(vectors) )
 
 
    def video2dataset(self, videopath):
        """
        Given a video, it will return a TF dataset with its frames
        """
 
        AUTOTUNE = tf.data.AUTOTUNE
        
        # Get a numpy array with all the frames
        frames = self.frames_from_video_file(videopath)
        # convert the numpy array into a tf dataset
        dataset = tf.data.Dataset.from_tensor_slices(frames)
 
        # batch it
        dataset = dataset.batch(self.batch_size, drop_remainder=True)
        # preprocess it
        dataset = self.preprocess_dataset(dataset)
        # configure for performance
        dataset = dataset.prefetch(buffer_size=AUTOTUNE)
 
        return dataset
    
 
    def preprocess_dataset(self, dataset):
        # perform some pre-processing as we did to train the vae
 
        normalization_layer = tf.keras.layers.Rescaling(1./255)
        dillation_layer = tf.keras.layers.MaxPool2D(pool_size=5, strides=1, padding='same')
 
        dataset = dataset.map(lambda x: tf.image.resize(
            x, (self.input_dim[0], self.input_dim[1]) ))
        
        if self.dillation == "normal":
            dataset = dataset.map(lambda x: dillation_layer(x))
        elif self.dillation == "reverse":
            dataset = dataset.map(lambda x: 1-dillation_layer(1-x))
        
        normalized_ds = dataset.map(lambda x: normalization_layer(x))
        return normalized_ds
    
 
    def frames_from_video_file(self, videopath):
        """
        Given a video, it will return the frames in a numpy array
        """
 
        frames = []
        video_capture = cv2.VideoCapture(videopath)
 
        while True:
 
            # Take one frame every step_size
            for _ in range(self.step_size):
                ret, frame = video_capture.read()
                if not ret:
                    break
 
            if not ret:
                break
 
            # the following line would convert it from 0..255 to 0..1
            # but we do a normalization layer later on, so I will comment this out
            # frame = tf.image.convert_image_dtype(frame, tf.float32)
            frames.append(frame)
 
        # last bit changes from bgr to rgb
        return np.array(frames)[..., [2, 1, 0]]
 
 
    def prepare_TFRecord(self, frames, recipe):
        # Tensorflow nomenclature to serialised data to create the TFRecords
 
        frames_feature = tf.train.Feature(
            bytes_list=tf.train.BytesList(value=[
                tf.io.serialize_tensor(frames).numpy(),
            ])
        )
 
        recipe_feature = tf.train.Feature(
            float_list=tf.train.FloatList(value=recipe),
        ) 
 
        features = tf.train.Features(feature={
            'frames': frames_feature,
            'recipe': recipe_feature
        })
        
        example = tf.train.Example(features=features)
        return example.SerializeToString()
    
 
    def visualise_latent_reconstructions(self, latent_vectors):
        """
        creates images of the latent vectors generated, to see if the previous encoding
        is correct
        """
 
        batch_size = 32
        ds = tf.data.Dataset.from_tensor_slices(latent_vectors)
        ds = ds.batch(batch_size)
 
        for entry in ds.take(1):
            generated_images = self.vae.decoder(entry)
        
        for i in range(batch_size):
            img = utils.array_to_img(generated_images[i])
            img.save("writer_img_%03d.png" % (i))
        
 
class TFRecordReader(object):
    """
    Class handling reading the TFRecord into a dataset to use for training
    Please check TFRecordWriter to see how it was saved to disk.
    The TFRecord to be read must have been created with TFRecordWriter
    """
 
    def __init__(self, tfrecordfile, batch_size = 64):
        self.BATCH_SIZE = batch_size
        self.tfrecordfile = tfrecordfile
        self.AUTOTUNE = tf.data.AUTOTUNE
        self.dataset = self.get_dataset()  # this will set self.dataset
        self.dataset_iter = iter(self.dataset)
 
    
    def decode_frames(self, frames):
        parsed_data = tf.io.parse_tensor(frames, tf.float32)
        parsed_data = tf.reshape(parsed_data, [832, 200]) # explicit size needed for TPU
        return parsed_data
 
 
    def read_tfrecord(self, example):
        TFREC_FORMAT = {
            "frames": tf.io.FixedLenFeature([], tf.string), # tf.string means bytestring
            "recipe": tf.io.FixedLenFeature([4], tf.float32)
        }
        example = tf.io.parse_single_example(example, TFREC_FORMAT)
        video_latent_vectors = self.decode_frames(example['frames'])
        return video_latent_vectors, example['recipe']
 
    
    def load_dataset(self):
        """ Loads a TFRecord and uses map to parse it, and stores it into self.dataset
        Check https://keras.io/examples/keras_recipes/tfrecord/ "define load methods"
        because this is basically a copy paste of that code with small modifications
 
        Args:
            properties (list, optional): Check parse_fn above
 
        Returns:
            dataset: Loadad TFRecord
        """
        ignore_order = tf.data.Options()
        ignore_order.experimental_deterministic = False  # disable order, increase speed
        dataset = tf.data.TFRecordDataset(
            self.tfrecordfile
        )  # automatically interleaves reads from multiple files
        dataset = dataset.with_options(
            ignore_order
        )  # uses data as soon as it streams in, rather than in its original order
        dataset = dataset.map(
            self.read_tfrecord,
            num_parallel_calls=self.AUTOTUNE
        )
        # returns the dataset as loaded
        return dataset
    
 
    def get_dataset(self):
        """Loads the TFRecord from the paths (filenames), and then shuffles the data and
        divides it into batches.
        """
        dataset = self.load_dataset()
        dataset = dataset.shuffle(2048)
        dataset = dataset.prefetch(buffer_size=self.AUTOTUNE)
        dataset = dataset.batch(self.BATCH_SIZE, drop_remainder=True)
        return dataset  # .repeat()
    
 
    def visualise_latent_reconstructions_and_recipes(self, vaepath):
 
        batch_size = 32
        data = next(self.dataset_iter)[0] # returns 576,200 (or whatever latent size)
        ds = tf.data.Dataset.from_tensor_slices(data.numpy()[0])
        ds = ds.batch(batch_size) # returns for example 9,32,200
 
        vae, _ = load_vae_model(vaepath)
 
        for entry in ds.take(1):
            generated_images = vae.decoder(entry)
        
        for i in range(batch_size):
            img = utils.array_to_img(generated_images[i])
            img.save("reader_img_%03d.png" % (i))

tensorflow keras tf.keras tensorflow-datasets tfrecord
1个回答
0
投票

这就是顶尖人物的代码吗?太棒了

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.