pretrain-predict: add .tfrecord output function

This commit is contained in:
Starbeamrainbowlabs 2022-09-27 16:59:31 +01:00
parent 30b8dd063e
commit f95fd8f9e4
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
5 changed files with 217 additions and 15 deletions

View file

@ -93,9 +93,10 @@ class RainfallWaterContraster(object):
for batch in batched_iterator(dataset, tensors_in_item=2, batch_size=self.batch_size):
i_batch += 1
rainfall = self.model_predict(batch[0], training=False) # ((rainfall, water), dummy_label)
for step in tf.unstack(rainfall, axis=0):
yield step
rainfall = tf.unstack(rainfall, axis=0)
water = tf.unstack(batch[1], axis=0)
for step_rainfall, step_water in zip(rainfall, water):
yield step_rainfall, step_water
# def embed_rainfall(self, dataset):

View file

@ -0,0 +1,102 @@
import os
import json
from loguru import logger
import tensorflow as tf
from ..dataset.batched_iterator import batched_iterator
from ..io.find_paramsjson import find_paramsjson
from ..io.readfile import readfile
from ..io.writefile import writefile
from .model_rainfallwater_segmentation import model_rainfallwater_segmentation
from .helpers import make_callbacks
from .helpers import summarywriter
from .components.LayerConvNeXtGamma import LayerConvNeXtGamma
from .helpers.summarywriter import summarywriter
class RainfallWaterSegmenter(object):
def __init__(self, dir_output=None, filepath_checkpoint=None, epochs=50, batch_size=64, **kwargs):
super(RainfallWaterSegmenter, self).__init__()
self.dir_output = dir_output
self.epochs = epochs
self.kwargs = kwargs
self.batch_size = batch_size
if filepath_checkpoint == None:
if self.dir_output == None:
raise Exception("Error: dir_output was not specified, and since no checkpoint was loaded training mode is activated.")
if not os.path.exists(self.dir_output):
os.mkdir(self.dir_output)
self.filepath_summary = os.path.join(self.dir_output, "summary.txt")
writefile(self.filepath_summary, "") # Empty the file ahead of time
self.make_model()
summarywriter(self.model, self.filepath_summary, append=True)
writefile(os.path.join(self.dir_output, "params.json"), json.dumps(self.get_config()))
else:
self.load_model(filepath_checkpoint)
def get_config(self):
return {
"epochs": self.epochs,
"batch_size": self.batch_size,
**self.kwargs
}
@staticmethod
def from_checkpoint(filepath_checkpoint, **hyperparams):
logger.info(f"Loading from checkpoint: {filepath_checkpoint}")
return RainfallWaterSegmenter(filepath_checkpoint=filepath_checkpoint, **hyperparams)
def make_model(self):
self.model = model_rainfallwater_segmentation(
batch_size=self.batch_size,
summary_file=self.filepath_summary,
**self.kwargs
)
def load_model(self, filepath_checkpoint):
"""
Loads a saved model from the given filename.
filepath_checkpoint (string): The filepath to load the saved model from.
"""
self.model = tf.keras.models.load_model(filepath_checkpoint, custom_objects={
"LayerConvNeXtGamma": LayerConvNeXtGamma,
})
def train(self, dataset_train, dataset_validate):
return self.model.fit(
dataset_train,
validation_data=dataset_validate,
epochs=self.epochs,
callbacks=make_callbacks(self.dir_output, self.model_predict),
steps_per_epoch=10 # For testing
)
def embed(self, dataset):
i_batch = -1
for batch in batched_iterator(dataset, tensors_in_item=2, batch_size=self.batch_size):
i_batch += 1
rainfall = self.model(batch[0], training=False) # ((rainfall, water), dummy_label)
for step in tf.unstack(rainfall, axis=0):
yield step
# def embed_rainfall(self, dataset):
# result = []
# for batch in dataset:
# result_batch = self.model_predict(batch)
# result.extend(tf.unstack(result_batch, axis=0))
# return result

View file

@ -46,15 +46,17 @@ def parse_item(metadata, shape_water_desired, dummy_label=True):
return tf.function(parse_item_inner)
def make_dataset(filepaths, metadata, shape_watch_desired=[100,100], compression_type="GZIP", parallel_reads_multiplier=1.5, shuffle_buffer_size=128, batch_size=64, dummy_label=True, prefetch=True):
def make_dataset(filepaths, metadata, shape_watch_desired=[100,100], compression_type="GZIP", parallel_reads_multiplier=1.5, shuffle_buffer_size=128, batch_size=64, dummy_label=True, prefetch=True, shuffle=True):
if "NO_PREFETCH" in os.environ:
logger.info("disabling data prefetching.")
dataset = tf.data.TFRecordDataset(filepaths,
compression_type=compression_type,
num_parallel_reads=math.ceil(os.cpu_count() * parallel_reads_multiplier)
).shuffle(shuffle_buffer_size) \
.map(parse_item(metadata, shape_water_desired=shape_watch_desired, dummy_label=dummy_label), num_parallel_calls=tf.data.AUTOTUNE)
)
if shuffle:
dataset = dataset.shuffle(shuffle_buffer_size)
dataset = dataset.map(parse_item(metadata, shape_water_desired=shape_watch_desired, dummy_label=dummy_label), num_parallel_calls=tf.data.AUTOTUNE)
if batch_size != None:
dataset = dataset.batch(batch_size, drop_remainder=True)
@ -86,7 +88,7 @@ def dataset(dirpath_input, batch_size=64, train_percentage=0.8, parallel_reads_m
return dataset_train, dataset_validate #, filepaths
def dataset_predict(dirpath_input, parallel_reads_multiplier=1.5, prefetch=True):
filepaths = get_filepaths(dirpath_input)
filepaths = get_filepaths(dirpath_input) if os.path.isdir(dirpath_input) else [ dirpath_input ]
return make_dataset(
filepaths=filepaths,
@ -94,7 +96,8 @@ def dataset_predict(dirpath_input, parallel_reads_multiplier=1.5, prefetch=True)
parallel_reads_multiplier=parallel_reads_multiplier,
batch_size=None,
dummy_label=False,
prefetch=prefetch
prefetch=prefetch,
shuffle=False #even with shuffle=False we're not gonna get them all in the same order since we're reading in parallel
)
if __name__ == "__main__":

View file

@ -16,16 +16,19 @@ from lib.io.find_paramsjson import find_paramsjson
from lib.io.readfile import readfile
from lib.vis.embeddings import vis_embeddings
MODE_JSONL = 1
MODE_TFRECORD = 2
def parse_args():
parser = argparse.ArgumentParser(description="Output feature maps using a given pretrained contrastive model.")
# parser.add_argument("--config", "-c", help="Filepath to the TOML config file to load.", required=True)
parser.add_argument("--input", "-i", help="Path to input directory containing the images to predict for.", required=True)
parser.add_argument("--output", "-o", help="Path to output file to write output to. Defaults to stdout, but if specified a UMAP graph will NOT be produced.")
parser.add_argument("--input", "-i", help="Path to input directory containing the .tfrecord(.gz) files to predict for. If a single file is passed instead, then only that file will be converted.", required=True)
parser.add_argument("--output", "-o", help="Path to output file to write output to. If the file extension .tfrecord.gz is used instead of .jsonl.gz, then a tfrecord file is written.")
parser.add_argument("--records-per-file", help="Optional. If specified, this limits the number of records written to each file. When using this option, you MUST have the string '$d' (without quotes) somewhere in your output filepath.", type=int)
parser.add_argument("--checkpoint", "-c", help="Checkpoint file to load model weights from.", required=True)
parser.add_argument("--params", "-p", help="Optional. The file containing the model hyperparameters (usually called 'params.json'). If not specified, it's location will be determined automatically.")
parser.add_argument("--reads-multiplier", help="Optional. The multiplier for the number of files we should read from at once. Defaults to 1.5, which means read ceil(NUMBER_OF_CORES * 1.5). Set to a higher number of systems with high read latency to avoid starving the GPU of data.")
parser.add_argument("--no-vis",
help="Don't also plot a visualisation of the resulting embeddings.", action="store_true")
return parser
@ -37,6 +40,8 @@ def run(args):
args.params = find_paramsjson(args.checkpoint)
if (not hasattr(args, "read_multiplier")) or args.read_multiplier == None:
args.read_multiplier = 1.5
if (not hasattr(args, "records_per_file")) or args.records_per_file == None:
args.records_per_file = 0 # 0 = unlimited
if not os.path.exists(args.params):
raise Exception(f"Error: The specified filepath params.json hyperparameters ('{args.params}) does not exist.")
@ -51,6 +56,7 @@ def run(args):
sys.stderr.write(f"\n\n>>> This is TensorFlow {tf.__version__}\n\n\n")
# Note that if using a directory of input files, the output order is NOT GUARANTEED TO BE THE SAME. In fact, it probably won't be.
dataset = dataset_predict(
dirpath_input=args.input,
parallel_reads_multiplier=args.read_multiplier
@ -62,17 +68,45 @@ def run(args):
# print("ITEMS DONE")
# exit(0)
output_mode = MODE_TFRECORD if filepath_output.endswith(".tfrecord") or filepath_output.endswith(".tfrecord.gz") else MODE_JSONL
handle = sys.stdout
if filepath_output != "-":
handle = handle_open(filepath_output, "wt" if filepath_output.endswith(".gz") else "w")
handle = handle_open(
filepath_output if args.records_per_file <= 0 else filepath_output.replace("$d", 0),
"wt" if filepath_output.endswith(".gz") else "w"
)
i = 0
for rainfall in ai.embed(dataset):
handle.write(json.dumps(rainfall.numpy().tolist(), separators=(',', ':'))+"\n") # Ref https://stackoverflow.com/a/64710892/1460422
i_file = i
files_done = 0
for step_rainfall, step_water in ai.embed(dataset):
if args.records_per_file > 0 and i_file > args.records_per_file:
files_done += 1
i_file = 0
handle.close()
logger.write(f"PROGRESS:file {files_done}")
handle = handle_open(filepath_output.replace("$d", str(files_done+1)))
if output_mode == MODE_JSONL:
handle.write(json.dumps(step_rainfall.numpy().tolist(), separators=(',', ':'))+"\n") # Ref https://stackoverflow.com/a/64710892/1460422
elif output_mode == MODE_TFRECORD:
step_rainfall = tf.train.BytesList(value=[tf.io.serialize_tensor(step_rainfall, name="rainfall").numpy()])
step_water = tf.train.BytesList(value=[tf.io.serialize_tensor(step_water, name="water").numpy()])
record = tf.train.Example(features=tf.train.Features(feature={
"rainfallradar": tf.train.Feature(bytes_list=step_rainfall),
"waterdepth": tf.train.Feature(bytes_list=step_water)
}))
handle.write(record.SerializeToString())
else:
raise Exception("Error: Unknown output mode.")
if i == 0 or i % 100 == 0:
sys.stderr.write(f"[pretrain:predict] STEP {i}\r")
i += 1
i_file += 1
handle.close()

View file

@ -0,0 +1,62 @@
import math
import sys
import argparse
from asyncio.log import logger
import tensorflow as tf
from lib.ai.RainfallWaterSegmenter import RainfallWaterSegmenter
from lib.dataset.dataset import dataset
from lib.dataset.read_metadata import read_metadata
def parse_args():
parser = argparse.ArgumentParser(description="Train an image segmentation model on a directory of .tfrecord.gz embedded_rainfall+waterdepth_label files.")
# parser.add_argument("--config", "-c", help="Filepath to the TOML config file to load.", required=True)
parser.add_argument("--input", "-i", help="Path to input directory containing the .tfrecord.gz files to pretrain with", required=True)
parser.add_argument("--output", "-o", help="Path to output directory to write output to (will be automatically created if it doesn't exist)", required=True)
parser.add_argument("--feature-dim", help="The size of the input feature dimension of the model [default: 512].", type=int)
parser.add_argument("--batch-size", help="Sets the batch size [default: 64].", type=int)
parser.add_argument("--reads-multiplier", help="Optional. The multiplier for the number of files we should read from at once. Defaults to 1.5, which means read ceil(NUMBER_OF_CORES * 1.5) files at once. Set to a higher number of systems with high read latency to avoid starving the GPU of data.")
parser.add_argument("--water-size", help="The width and height of the square of pixels that the model will predict. Smaller values crop the input more [default: 100].", type=int)
return parser
def run(args):
if (not hasattr(args, "water_size")) or args.water_size == None:
args.water_size = 100
if (not hasattr(args, "batch_size")) or args.batch_size == None:
args.batch_size = 64
if (not hasattr(args, "feature_dim")) or args.feature_dim == None:
args.feature_dim = 512
if (not hasattr(args, "read_multiplier")) or args.read_multiplier == None:
args.read_multiplier = 1.5
# TODO: Validate args here.
sys.stderr.write(f"\n\n>>> This is TensorFlow {tf.__version__}\n\n\n")
dataset_train, dataset_validate = dataset(
dirpath_input=args.input,
batch_size=args.batch_size,
)
dataset_metadata = read_metadata(args.input)
# for (items, label) in dataset_train:
# print("ITEMS", len(items), [ item.shape for item in items ])
# print("LABEL", label.shape)
# print("ITEMS DONE")
# exit(0)
ai = RainfallWaterSegmenter(
dir_output=args.output,
batch_size=args.batch_size,
feature_dim=args.feature_dim,
metadata = read_metadata(args.input),
shape_water=[ args.water_size, args.water_size ] # The DESIRED output shape. the actual data will be cropped to match this.
)
ai.train(dataset_train, dataset_validate)