diff --git a/aimodel/src/lib/ai/RainfallWaterContraster.py b/aimodel/src/lib/ai/RainfallWaterContraster.py index 7451dfe..cce6469 100644 --- a/aimodel/src/lib/ai/RainfallWaterContraster.py +++ b/aimodel/src/lib/ai/RainfallWaterContraster.py @@ -93,9 +93,10 @@ class RainfallWaterContraster(object): for batch in batched_iterator(dataset, tensors_in_item=2, batch_size=self.batch_size): i_batch += 1 rainfall = self.model_predict(batch[0], training=False) # ((rainfall, water), dummy_label) - - for step in tf.unstack(rainfall, axis=0): - yield step + rainfall = tf.unstack(rainfall, axis=0) + water = tf.unstack(batch[1], axis=0) + for step_rainfall, step_water in zip(rainfall, water): + yield step_rainfall, step_water # def embed_rainfall(self, dataset): diff --git a/aimodel/src/lib/ai/RainfallWaterSegmentor.py b/aimodel/src/lib/ai/RainfallWaterSegmentor.py new file mode 100644 index 0000000..72c5393 --- /dev/null +++ b/aimodel/src/lib/ai/RainfallWaterSegmentor.py @@ -0,0 +1,102 @@ +import os +import json + +from loguru import logger +import tensorflow as tf + +from ..dataset.batched_iterator import batched_iterator + +from ..io.find_paramsjson import find_paramsjson +from ..io.readfile import readfile +from ..io.writefile import writefile + +from .model_rainfallwater_segmentation import model_rainfallwater_segmentation +from .helpers import make_callbacks +from .helpers import summarywriter +from .components.LayerConvNeXtGamma import LayerConvNeXtGamma +from .helpers.summarywriter import summarywriter + +class RainfallWaterSegmenter(object): + def __init__(self, dir_output=None, filepath_checkpoint=None, epochs=50, batch_size=64, **kwargs): + super(RainfallWaterSegmenter, self).__init__() + + self.dir_output = dir_output + self.epochs = epochs + self.kwargs = kwargs + self.batch_size = batch_size + + + if filepath_checkpoint == None: + if self.dir_output == None: + raise Exception("Error: dir_output was not specified, and since no checkpoint was loaded training mode is activated.") + if not os.path.exists(self.dir_output): + os.mkdir(self.dir_output) + + self.filepath_summary = os.path.join(self.dir_output, "summary.txt") + + writefile(self.filepath_summary, "") # Empty the file ahead of time + self.make_model() + + summarywriter(self.model, self.filepath_summary, append=True) + writefile(os.path.join(self.dir_output, "params.json"), json.dumps(self.get_config())) + else: + self.load_model(filepath_checkpoint) + + def get_config(self): + return { + "epochs": self.epochs, + "batch_size": self.batch_size, + **self.kwargs + } + + @staticmethod + def from_checkpoint(filepath_checkpoint, **hyperparams): + logger.info(f"Loading from checkpoint: {filepath_checkpoint}") + return RainfallWaterSegmenter(filepath_checkpoint=filepath_checkpoint, **hyperparams) + + + def make_model(self): + self.model = model_rainfallwater_segmentation( + batch_size=self.batch_size, + summary_file=self.filepath_summary, + **self.kwargs + ) + + + def load_model(self, filepath_checkpoint): + """ + Loads a saved model from the given filename. + filepath_checkpoint (string): The filepath to load the saved model from. + """ + + self.model = tf.keras.models.load_model(filepath_checkpoint, custom_objects={ + "LayerConvNeXtGamma": LayerConvNeXtGamma, + }) + + + + def train(self, dataset_train, dataset_validate): + return self.model.fit( + dataset_train, + validation_data=dataset_validate, + epochs=self.epochs, + callbacks=make_callbacks(self.dir_output, self.model_predict), + steps_per_epoch=10 # For testing + ) + + def embed(self, dataset): + i_batch = -1 + for batch in batched_iterator(dataset, tensors_in_item=2, batch_size=self.batch_size): + i_batch += 1 + rainfall = self.model(batch[0], training=False) # ((rainfall, water), dummy_label) + + for step in tf.unstack(rainfall, axis=0): + yield step + + + # def embed_rainfall(self, dataset): + # result = [] + # for batch in dataset: + # result_batch = self.model_predict(batch) + # result.extend(tf.unstack(result_batch, axis=0)) + # return result \ No newline at end of file diff --git a/aimodel/src/lib/dataset/dataset.py b/aimodel/src/lib/dataset/dataset.py index 2f049ad..37ad1d3 100644 --- a/aimodel/src/lib/dataset/dataset.py +++ b/aimodel/src/lib/dataset/dataset.py @@ -46,15 +46,17 @@ def parse_item(metadata, shape_water_desired, dummy_label=True): return tf.function(parse_item_inner) -def make_dataset(filepaths, metadata, shape_watch_desired=[100,100], compression_type="GZIP", parallel_reads_multiplier=1.5, shuffle_buffer_size=128, batch_size=64, dummy_label=True, prefetch=True): +def make_dataset(filepaths, metadata, shape_watch_desired=[100,100], compression_type="GZIP", parallel_reads_multiplier=1.5, shuffle_buffer_size=128, batch_size=64, dummy_label=True, prefetch=True, shuffle=True): if "NO_PREFETCH" in os.environ: logger.info("disabling data prefetching.") dataset = tf.data.TFRecordDataset(filepaths, compression_type=compression_type, num_parallel_reads=math.ceil(os.cpu_count() * parallel_reads_multiplier) - ).shuffle(shuffle_buffer_size) \ - .map(parse_item(metadata, shape_water_desired=shape_watch_desired, dummy_label=dummy_label), num_parallel_calls=tf.data.AUTOTUNE) + ) + if shuffle: + dataset = dataset.shuffle(shuffle_buffer_size) + dataset = dataset.map(parse_item(metadata, shape_water_desired=shape_watch_desired, dummy_label=dummy_label), num_parallel_calls=tf.data.AUTOTUNE) if batch_size != None: dataset = dataset.batch(batch_size, drop_remainder=True) @@ -86,7 +88,7 @@ def dataset(dirpath_input, batch_size=64, train_percentage=0.8, parallel_reads_m return dataset_train, dataset_validate #, filepaths def dataset_predict(dirpath_input, parallel_reads_multiplier=1.5, prefetch=True): - filepaths = get_filepaths(dirpath_input) + filepaths = get_filepaths(dirpath_input) if os.path.isdir(dirpath_input) else [ dirpath_input ] return make_dataset( filepaths=filepaths, @@ -94,7 +96,8 @@ def dataset_predict(dirpath_input, parallel_reads_multiplier=1.5, prefetch=True) parallel_reads_multiplier=parallel_reads_multiplier, batch_size=None, dummy_label=False, - prefetch=prefetch + prefetch=prefetch, + shuffle=False #even with shuffle=False we're not gonna get them all in the same order since we're reading in parallel ) if __name__ == "__main__": diff --git a/aimodel/src/subcommands/pretrain_predict.py b/aimodel/src/subcommands/pretrain_predict.py index fadddf8..9d3cad5 100644 --- a/aimodel/src/subcommands/pretrain_predict.py +++ b/aimodel/src/subcommands/pretrain_predict.py @@ -16,16 +16,19 @@ from lib.io.find_paramsjson import find_paramsjson from lib.io.readfile import readfile from lib.vis.embeddings import vis_embeddings + +MODE_JSONL = 1 +MODE_TFRECORD = 2 + def parse_args(): parser = argparse.ArgumentParser(description="Output feature maps using a given pretrained contrastive model.") # parser.add_argument("--config", "-c", help="Filepath to the TOML config file to load.", required=True) - parser.add_argument("--input", "-i", help="Path to input directory containing the images to predict for.", required=True) - parser.add_argument("--output", "-o", help="Path to output file to write output to. Defaults to stdout, but if specified a UMAP graph will NOT be produced.") + parser.add_argument("--input", "-i", help="Path to input directory containing the .tfrecord(.gz) files to predict for. If a single file is passed instead, then only that file will be converted.", required=True) + parser.add_argument("--output", "-o", help="Path to output file to write output to. If the file extension .tfrecord.gz is used instead of .jsonl.gz, then a tfrecord file is written.") + parser.add_argument("--records-per-file", help="Optional. If specified, this limits the number of records written to each file. When using this option, you MUST have the string '$d' (without quotes) somewhere in your output filepath.", type=int) parser.add_argument("--checkpoint", "-c", help="Checkpoint file to load model weights from.", required=True) parser.add_argument("--params", "-p", help="Optional. The file containing the model hyperparameters (usually called 'params.json'). If not specified, it's location will be determined automatically.") parser.add_argument("--reads-multiplier", help="Optional. The multiplier for the number of files we should read from at once. Defaults to 1.5, which means read ceil(NUMBER_OF_CORES * 1.5). Set to a higher number of systems with high read latency to avoid starving the GPU of data.") - parser.add_argument("--no-vis", - help="Don't also plot a visualisation of the resulting embeddings.", action="store_true") return parser @@ -37,6 +40,8 @@ def run(args): args.params = find_paramsjson(args.checkpoint) if (not hasattr(args, "read_multiplier")) or args.read_multiplier == None: args.read_multiplier = 1.5 + if (not hasattr(args, "records_per_file")) or args.records_per_file == None: + args.records_per_file = 0 # 0 = unlimited if not os.path.exists(args.params): raise Exception(f"Error: The specified filepath params.json hyperparameters ('{args.params}) does not exist.") @@ -51,6 +56,7 @@ def run(args): sys.stderr.write(f"\n\n>>> This is TensorFlow {tf.__version__}\n\n\n") + # Note that if using a directory of input files, the output order is NOT GUARANTEED TO BE THE SAME. In fact, it probably won't be. dataset = dataset_predict( dirpath_input=args.input, parallel_reads_multiplier=args.read_multiplier @@ -62,17 +68,45 @@ def run(args): # print("ITEMS DONE") # exit(0) + output_mode = MODE_TFRECORD if filepath_output.endswith(".tfrecord") or filepath_output.endswith(".tfrecord.gz") else MODE_JSONL + handle = sys.stdout if filepath_output != "-": - handle = handle_open(filepath_output, "wt" if filepath_output.endswith(".gz") else "w") + handle = handle_open( + filepath_output if args.records_per_file <= 0 else filepath_output.replace("$d", 0), + "wt" if filepath_output.endswith(".gz") else "w" + ) i = 0 - for rainfall in ai.embed(dataset): - handle.write(json.dumps(rainfall.numpy().tolist(), separators=(',', ':'))+"\n") # Ref https://stackoverflow.com/a/64710892/1460422 + i_file = i + files_done = 0 + for step_rainfall, step_water in ai.embed(dataset): + if args.records_per_file > 0 and i_file > args.records_per_file: + files_done += 1 + i_file = 0 + handle.close() + logger.write(f"PROGRESS:file {files_done}") + handle = handle_open(filepath_output.replace("$d", str(files_done+1))) + + if output_mode == MODE_JSONL: + handle.write(json.dumps(step_rainfall.numpy().tolist(), separators=(',', ':'))+"\n") # Ref https://stackoverflow.com/a/64710892/1460422 + elif output_mode == MODE_TFRECORD: + step_rainfall = tf.train.BytesList(value=[tf.io.serialize_tensor(step_rainfall, name="rainfall").numpy()]) + step_water = tf.train.BytesList(value=[tf.io.serialize_tensor(step_water, name="water").numpy()]) + + record = tf.train.Example(features=tf.train.Features(feature={ + "rainfallradar": tf.train.Feature(bytes_list=step_rainfall), + "waterdepth": tf.train.Feature(bytes_list=step_water) + })) + handle.write(record.SerializeToString()) + else: + raise Exception("Error: Unknown output mode.") if i == 0 or i % 100 == 0: sys.stderr.write(f"[pretrain:predict] STEP {i}\r") + i += 1 + i_file += 1 handle.close() diff --git a/aimodel/src/subcommands/train.py b/aimodel/src/subcommands/train.py new file mode 100644 index 0000000..7f9c84c --- /dev/null +++ b/aimodel/src/subcommands/train.py @@ -0,0 +1,62 @@ +import math +import sys +import argparse +from asyncio.log import logger + +import tensorflow as tf + +from lib.ai.RainfallWaterSegmenter import RainfallWaterSegmenter +from lib.dataset.dataset import dataset +from lib.dataset.read_metadata import read_metadata + +def parse_args(): + parser = argparse.ArgumentParser(description="Train an image segmentation model on a directory of .tfrecord.gz embedded_rainfall+waterdepth_label files.") + # parser.add_argument("--config", "-c", help="Filepath to the TOML config file to load.", required=True) + parser.add_argument("--input", "-i", help="Path to input directory containing the .tfrecord.gz files to pretrain with", required=True) + parser.add_argument("--output", "-o", help="Path to output directory to write output to (will be automatically created if it doesn't exist)", required=True) + parser.add_argument("--feature-dim", help="The size of the input feature dimension of the model [default: 512].", type=int) + parser.add_argument("--batch-size", help="Sets the batch size [default: 64].", type=int) + parser.add_argument("--reads-multiplier", help="Optional. The multiplier for the number of files we should read from at once. Defaults to 1.5, which means read ceil(NUMBER_OF_CORES * 1.5) files at once. Set to a higher number of systems with high read latency to avoid starving the GPU of data.") + parser.add_argument("--water-size", help="The width and height of the square of pixels that the model will predict. Smaller values crop the input more [default: 100].", type=int) + + return parser + +def run(args): + if (not hasattr(args, "water_size")) or args.water_size == None: + args.water_size = 100 + if (not hasattr(args, "batch_size")) or args.batch_size == None: + args.batch_size = 64 + if (not hasattr(args, "feature_dim")) or args.feature_dim == None: + args.feature_dim = 512 + if (not hasattr(args, "read_multiplier")) or args.read_multiplier == None: + args.read_multiplier = 1.5 + + + # TODO: Validate args here. + + sys.stderr.write(f"\n\n>>> This is TensorFlow {tf.__version__}\n\n\n") + + dataset_train, dataset_validate = dataset( + dirpath_input=args.input, + batch_size=args.batch_size, + ) + dataset_metadata = read_metadata(args.input) + + # for (items, label) in dataset_train: + # print("ITEMS", len(items), [ item.shape for item in items ]) + # print("LABEL", label.shape) + # print("ITEMS DONE") + # exit(0) + + + ai = RainfallWaterSegmenter( + dir_output=args.output, + batch_size=args.batch_size, + feature_dim=args.feature_dim, + + metadata = read_metadata(args.input), + shape_water=[ args.water_size, args.water_size ] # The DESIRED output shape. the actual data will be cropped to match this. + ) + + ai.train(dataset_train, dataset_validate) + \ No newline at end of file