diff --git a/aimodel/slurm-pretrain.job b/aimodel/slurm-pretrain.job index 817c205..7c92810 100755 --- a/aimodel/slurm-pretrain.job +++ b/aimodel/slurm-pretrain.job @@ -5,7 +5,7 @@ #SBATCH --gres=gpu:1 #SBATCH -o %j.%N.%a.out.log #SBATCH -e %j.%N.%a.err.log -#SBATCH -p gpu05 +#SBATCH -p gpu05,gpu #SBATCH --time=5-00:00:00 #SBATCH --mem=61440 # 61440 = 60GiB memory required diff --git a/aimodel/slurm-train.job b/aimodel/slurm-train.job index 86368a7..ac2f092 100755 --- a/aimodel/slurm-train.job +++ b/aimodel/slurm-train.job @@ -1,5 +1,5 @@ #!/usr/bin/env bash -#SBATCH -J RainAIv3 +#SBATCH -J RainAISG #SBATCH -N 1 #SBATCH -n 14 #SBATCH --gres=gpu:1 diff --git a/aimodel/src/lib/ai/RainfallWaterContraster.py b/aimodel/src/lib/ai/RainfallWaterContraster.py index cce6469..154c5bf 100644 --- a/aimodel/src/lib/ai/RainfallWaterContraster.py +++ b/aimodel/src/lib/ai/RainfallWaterContraster.py @@ -85,7 +85,7 @@ class RainfallWaterContraster(object): validation_data=dataset_validate, epochs=self.epochs, callbacks=make_callbacks(self.dir_output, self.model_predict), - steps_per_epoch=10 # For testing + # steps_per_epoch=10 # For testing ) def embed(self, dataset): diff --git a/aimodel/src/lib/ai/RainfallWaterSegmenter.py b/aimodel/src/lib/ai/RainfallWaterSegmenter.py index 65a552b..dee79cd 100644 --- a/aimodel/src/lib/ai/RainfallWaterSegmenter.py +++ b/aimodel/src/lib/ai/RainfallWaterSegmenter.py @@ -14,6 +14,7 @@ from .model_rainfallwater_segmentation import model_rainfallwater_segmentation from .helpers import make_callbacks from .helpers import summarywriter from .components.LayerConvNeXtGamma import LayerConvNeXtGamma +from .components.LayerStack2Image import LayerStack2Image from .helpers.summarywriter import summarywriter class RainfallWaterSegmenter(object): @@ -70,6 +71,7 @@ class RainfallWaterSegmenter(object): self.model = tf.keras.models.load_model(filepath_checkpoint, custom_objects={ "LayerConvNeXtGamma": LayerConvNeXtGamma, + "LayerStack2Image": LayerStack2Image }) @@ -80,7 +82,7 @@ class RainfallWaterSegmenter(object): validation_data=dataset_validate, epochs=self.epochs, callbacks=make_callbacks(self.dir_output, self.model), - steps_per_epoch=10 # For testing + # steps_per_epoch=10 # For testing ) def embed(self, rainfall_embed): diff --git a/aimodel/src/lib/ai/components/CallbackNBatchCsv.py b/aimodel/src/lib/ai/components/CallbackNBatchCsv.py new file mode 100644 index 0000000..c308a03 --- /dev/null +++ b/aimodel/src/lib/ai/components/CallbackNBatchCsv.py @@ -0,0 +1,30 @@ +import tensorflow as tf + +from lib.io.handle_open import handle_open + +class CallbackNBatchCsv(tf.keras.callbacks.Callback): + def __init__(self, filepath, n_batches=1, separator="\t", **kwargs) -> None: + super().__init__(**kwargs) + + self.n_batches = n_batches + self.separator = separator + + self.handle = handle_open(filepath, "w") + + + self.batches_seen = 0 + self.keys = None + + def write_header(self, logs): # logs = metrics + self.keys = logs.keys() + self.keys = sorted(self.keys) + self.handle.write("\t".join(self.keys)+"\n") + + def on_batch_end(self, batch, logs=None): # logs = metrics + if self.batches_seen == 0: + self.write_header(logs) + + if self.batches_seen % self.n_batches == 0: + self.handle.write(self.separator.join([str(logs[key]) for key in self.keys]) + "\n") + + self.batches_seen += 1 diff --git a/aimodel/src/lib/ai/components/LayerCheeseMultipleOut.py b/aimodel/src/lib/ai/components/LayerCheeseMultipleOut.py index 87c2e43..5e6d13f 100644 --- a/aimodel/src/lib/ai/components/LayerCheeseMultipleOut.py +++ b/aimodel/src/lib/ai/components/LayerCheeseMultipleOut.py @@ -16,20 +16,22 @@ class LayerCheeseMultipleOut(tf.keras.layers.Layer): self.param_batch_size = batch_size self.param_feature_dim = feature_dim - self.weight_temperature = tf.Variable(name="loss_temperature", shape=1, initial_value=tf.constant([0.07])) - self.weight_nce = tf.Variable( - name="loss_nce", - shape=(batch_size, feature_dim), - initial_value=tf.random.truncated_normal( - (feature_dim), - stddev=1.0 / math.sqrt(128) - ) - ) - self.weight_nce_bias = tf.Variable( - name="loss_nce_bias", - shape=(feature_dim), - initial_value=tf.zeros((feature_dim)) - ) + self.weight_temperature = tf.Variable(name="loss_temperature", shape=1, initial_value=tf.constant([ + math.log(1 / 0.07) + ])) + # self.weight_nce = tf.Variable( + # name="loss_nce", + # shape=(batch_size, feature_dim), + # initial_value=tf.random.truncated_normal( + # [feature_dim], + # stddev=1.0 / math.sqrt(128) + # ) + # ) + # self.weight_nce_bias = tf.Variable( + # name="loss_nce_bias", + # shape=(feature_dim), + # initial_value=tf.zeros((feature_dim)) + # ) def get_config(self): config = super(LayerCheeseMultipleOut, self).get_config() @@ -42,4 +44,4 @@ class LayerCheeseMultipleOut(tf.keras.layers.Layer): # By this point, the above has already dropped through the encoder, so should be in the form [ batch_size, dim ] - return tf.stack(inputs, axis=-2) \ No newline at end of file + return tf.stack(inputs, axis=-2) diff --git a/aimodel/src/lib/ai/components/LossContrastive.py b/aimodel/src/lib/ai/components/LossContrastive.py index 43e6e74..6f3b3bc 100644 --- a/aimodel/src/lib/ai/components/LossContrastive.py +++ b/aimodel/src/lib/ai/components/LossContrastive.py @@ -1,3 +1,5 @@ +import math + import tensorflow as tf class LossContrastive(tf.keras.losses.Loss): @@ -10,28 +12,57 @@ class LossContrastive(tf.keras.losses.Loss): rainfall, water = tf.unstack(y_pred, axis=-2) # print("LOSS:call y_true", y_true.shape) # print("LOSS:call y_pred", y_pred.shape) - # print("BEFORE_RESHAPE rainfall", rainfall) - # print("BEFORE_RESHAPE water", water) + print("START rainfall", rainfall) + print("START water", water) # # Ensure the shapes are defined # rainfall = tf.reshape(rainfall, [self.batch_size, rainfall.shape[1]]) # water = tf.reshape(water, [self.batch_size, water.shape[1]]) + # normalise features + # rainfall = rainfall / tf.math.l2_normalize(rainfall, axis=1) + # water = water / tf.math.l2_normalize(water, axis=1) - logits = tf.linalg.matmul(rainfall, tf.transpose(water)) * tf.clip_by_value(tf.math.exp(self.weight_temperature), 0, 100) + print("AFTER_L2 rainfall", rainfall) + print("AFTER_L2 water", water) - # print("LOGITS", logits) + # logits = tf.linalg.matmul(rainfall, tf.transpose(water)) * tf.clip_by_value(tf.math.exp(self.weight_temperature), 0, 100) + logits = tf.linalg.matmul(rainfall, tf.transpose(water)) * tf.math.exp(self.weight_temperature) - labels = tf.eye(self.batch_size, dtype=tf.int32) - loss_rainfall = tf.keras.metrics.binary_crossentropy(labels, logits, from_logits=True, axis=0) - loss_water = tf.keras.metrics.binary_crossentropy(labels, logits, from_logits=True, axis=1) + print("LOGITS", logits) + # labels = tf.eye(self.batch_size, dtype=tf.int32) # we *would* do this if we were using mean squared error... + labels = tf.range(self.batch_size, dtype=tf.int32) # each row is a different category we think + loss_rainfall = tf.keras.metrics.sparse_categorical_crossentropy(labels, logits, from_logits=True, axis=0) + loss_water = tf.keras.metrics.sparse_categorical_crossentropy(labels, logits, from_logits=True, axis=1) + + + # loss_rainfall = tf.keras.metrics.binary_crossentropy(labels, logits, from_logits=True, axis=0) + # loss_water = tf.keras.metrics.binary_crossentropy(labels, logits, from_logits=True, axis=1) + print("LABELS", labels) + print("LOSS_RAINFALL", loss_rainfall) + print("LOSS_WATER", loss_water) + loss = (loss_rainfall + loss_water) / 2 + print("LOSS", loss) + + loss = tf.math.reduce_mean(loss) + + print("LOSS FINAL", loss) # cosine_similarity results in tensor of range -1 - 1, but tf.sparse.eye has range 0 - 1 # print("LABELS", labels) # print("LOSS_rainfall", loss_rainfall) # print("LOSS_water", loss_water) # print("LOSS", loss) return loss - \ No newline at end of file + + +if __name__ == "__main__": + weight_temperature = tf.Variable(name="loss_temperature", shape=1, initial_value=tf.constant([ + math.log(1 / 0.07) + ])) + loss = LossContrastive(weight_temperature=weight_temperature, batch_size=64) + + tensor_input = tf.random.uniform([64, 2, 512]) + print(loss(tf.constant(1), tensor_input)) \ No newline at end of file diff --git a/aimodel/src/lib/ai/components/convnext.py b/aimodel/src/lib/ai/components/convnext.py index b49fb38..dc3630a 100644 --- a/aimodel/src/lib/ai/components/convnext.py +++ b/aimodel/src/lib/ai/components/convnext.py @@ -147,7 +147,7 @@ def add_convnext_block(y, dim, drop_prob=0, prefix=""): name=f'{prefix}.pwconv1' )(y) - + y = tf.keras.layers.Activation( 'gelu', name=f'{prefix}.act' diff --git a/aimodel/src/lib/ai/helpers/make_callbacks.py b/aimodel/src/lib/ai/helpers/make_callbacks.py index bfb112a..8d3f5d8 100644 --- a/aimodel/src/lib/ai/helpers/make_callbacks.py +++ b/aimodel/src/lib/ai/helpers/make_callbacks.py @@ -3,10 +3,12 @@ import os import tensorflow as tf from ..components.CallbackCustomModelCheckpoint import CallbackCustomModelCheckpoint +from ..components.CallbackNBatchCsv import CallbackNBatchCsv def make_callbacks(dirpath, model_predict): dirpath_checkpoints = os.path.join(dirpath, "checkpoints") filepath_metrics = os.path.join(dirpath, "metrics.tsv") + filepath_metrics_batch = os.path.join(dirpath, "metrics_batch64.tsv") if not os.path.exists(dirpath_checkpoints): os.mkdir(dirpath_checkpoints) @@ -24,5 +26,9 @@ def make_callbacks(dirpath, model_predict): filename=filepath_metrics, separator="\t" ), - tf.keras.callbacks.ProgbarLogger() + CallbackNBatchCsv( + filepath=filepath_metrics_batch, + n_batches=64 + ), + tf.keras.callbacks.ProgbarLogger(count_mode="steps") # batches ] \ No newline at end of file diff --git a/aimodel/src/lib/dataset/read_metadata.py b/aimodel/src/lib/dataset/read_metadata.py index 2a1dc5f..6ae44b5 100644 --- a/aimodel/src/lib/dataset/read_metadata.py +++ b/aimodel/src/lib/dataset/read_metadata.py @@ -5,6 +5,9 @@ import json from ..io.readfile import readfile def read_metadata(dirpath_dataset): - filepath_metadata = os.path.join(dirpath_dataset, "metadata.json") + if os.path.isfile(dirpath_dataset): + filepath_metadata = os.path.join(os.path.dirname(dirpath_dataset), "metadata.json") + else: + filepath_metadata = os.path.join(dirpath_dataset, "metadata.json") return json.loads(readfile(filepath_metadata)) \ No newline at end of file diff --git a/aimodel/src/lib/io/handle_open.py b/aimodel/src/lib/io/handle_open.py index a167ea0..bd3f046 100644 --- a/aimodel/src/lib/io/handle_open.py +++ b/aimodel/src/lib/io/handle_open.py @@ -2,7 +2,10 @@ import io import gzip -def handle_open(filepath, mode): +def handle_open(filepath, mode, force_textwrite_gzip=True): + if mode == "w" and mode.endswith(".gz") and force_textwrite_gzip: + mode = "wt" + if filepath.endswith(".gz"): return gzip.open(filepath, mode) else: diff --git a/aimodel/src/subcommands/pretrain.py b/aimodel/src/subcommands/pretrain.py index 2506b8f..753f547 100644 --- a/aimodel/src/subcommands/pretrain.py +++ b/aimodel/src/subcommands/pretrain.py @@ -21,6 +21,13 @@ def parse_args(): return parser + +def count_batches(dataset): + count = 0 + for _ in dataset: + count += 1 + return count + def run(args): if (not hasattr(args, "water_size")) or args.water_size == None: args.water_size = 100 @@ -40,7 +47,11 @@ def run(args): dirpath_input=args.input, batch_size=args.batch_size, ) - dataset_metadata = read_metadata(args.input) + + # print("BATCHES_TRAIN", count_batches(dataset_train)) # 18500 for the full 2006-2020 dataset + # print("BATCHES_VALIDATE", count_batches(dataset_validate)) # 4653 for the full 2006-2020 dataset + + # for (items, label) in dataset_train: # print("ITEMS", len(items), [ item.shape for item in items ]) @@ -59,4 +70,3 @@ def run(args): ) ai.train(dataset_train, dataset_validate) - \ No newline at end of file diff --git a/aimodel/src/subcommands/pretrain_predict.py b/aimodel/src/subcommands/pretrain_predict.py index c714754..dd0aa9d 100644 --- a/aimodel/src/subcommands/pretrain_predict.py +++ b/aimodel/src/subcommands/pretrain_predict.py @@ -140,4 +140,4 @@ def run(args): handle.close() - sys.stderr.write(">>> Complete\n") \ No newline at end of file + sys.stderr.write("\n>>> Complete\n") \ No newline at end of file diff --git a/aimodel/src/subcommands/train_predict.py b/aimodel/src/subcommands/train_predict.py index f1ef0da..4eb9727 100644 --- a/aimodel/src/subcommands/train_predict.py +++ b/aimodel/src/subcommands/train_predict.py @@ -40,6 +40,9 @@ def run(args): if (not hasattr(args, "params")) or args.params == None: args.params = find_paramsjson(args.checkpoint) + if args.params == None: + logger.error("Error: Failed to find params.json. Please ensure it's either in the same directory as the checkpoint or 1 level above") + return if (not hasattr(args, "read_multiplier")) or args.read_multiplier == None: args.read_multiplier = 0 if (not hasattr(args, "records_per_file")) or args.records_per_file == None: @@ -87,13 +90,13 @@ def run(args): logger.info(f"Records per file: {args.records_per_file}") if output_mode == MODE_JSONL: - do_jsonl(args, ai, dataset, args.model_code, model_params) + do_jsonl(args, ai, dataset, model_params) else: - do_png(args, ai, dataset, args.model_code, model_params) + do_png(args, ai, dataset, model_params) sys.stderr.write(">>> Complete\n") -def do_png(args, ai, dataset, model_code, model_params): +def do_png(args, ai, dataset, model_params): if not os.path.exists(os.path.dirname(args.output)): os.mkdir(os.path.dirname(args.output)) @@ -114,7 +117,7 @@ def do_png(args, ai, dataset, model_code, model_params): segmentation_plot( water_actual, water_predict, - model_code, + args.model_code, args.output.replace("+d", str(i)) ) @@ -152,7 +155,7 @@ def do_jsonl(args, ai, dataset, model_params): i_batch = 0 for water_predict in water_predict_batch: # [ width, height, softmax_probabilities ] → [ batch, width, height ] - water_predict = tf.math.argmax(water_predict, axis=-1) + # water_predict = tf.math.argmax(water_predict, axis=-1) # [ width, height ] water_actual = tf.squeeze(water_actual_batch[i_batch]) @@ -165,11 +168,11 @@ def do_jsonl(args, ai, dataset, model_params): item_obj = {} if "rainfall_actual" in args.log: - item_obj["rainfall_actual"] = rainfall_actual_batch[i_batch].numpy().list() + item_obj["rainfall_actual"] = rainfall_actual_batch[i_batch].numpy().tolist() if "water_actual" in args.log: - item_obj["water_actual"] = water_actual.numpy().list() + item_obj["water_actual"] = water_actual.numpy().tolist() if "water_predict" in args.log: - item_obj["water_predict"] = water_predict.numpy().list() + item_obj["water_predict"] = water_predict.numpy().tolist() handle.write(json.dumps(item_obj, separators=(',', ':'))+"\n") # Ref https://stackoverflow.com/a/64710892/1460422 diff --git a/rainfallwrangler/slurm-jsonl2tfrecord.job b/rainfallwrangler/slurm-jsonl2tfrecord.job index 253ef55..5be5273 100755 --- a/rainfallwrangler/slurm-jsonl2tfrecord.job +++ b/rainfallwrangler/slurm-jsonl2tfrecord.job @@ -2,8 +2,8 @@ #SBATCH -J Json2TfR #SBATCH -N 1 #SBATCH -n 28 -#SBATCH -o %j.%N.%a.out -#SBATCH -e %j.%N.%a.err +#SBATCH -o %j.%N.%a.out.log +#SBATCH -e %j.%N.%a.err.log #SBATCH -p compute #SBATCH --time=3-00:00:00 diff --git a/rainfallwrangler/slurm-process.job b/rainfallwrangler/slurm-process.job new file mode 100755 index 0000000..5ba862b --- /dev/null +++ b/rainfallwrangler/slurm-process.job @@ -0,0 +1,84 @@ +#!/usr/bin/env bash +#SBATCH -J RWrangle +#SBATCH -N 1 +#SBATCH -n 1 +#SBATCH -o %j.%N.%a.rainwrangle.out.log +#SBATCH -e %j.%N.%a.rainwrangle.err.log +#SBATCH -p compute +#SBATCH --time=3-00:00:00 +#SBATCH --mem=8096 +# * 8GB RAM + +set -e; + +module load utilities/multi +module load readline/7.0 +module load gcc/10.2.0 + +# module load cuda/11.5.0 + +module load python/anaconda/4.6/miniconda/3.7 + +RAINFALL="${RAINFALL:-$HOME/data/nimrod_ceda.jsonl.gz}"; +WATER="${WATER:-$HOME/data/WaterDepths-new.stream.asc.gz}"; +OUTPUT="${OUTPUT}"; +COUNT_FILE="${COUNT_FILE:-4096}"; + +if [[ -z "${WATER}" ]]; then + echo "Error: No input water depth file specified in the WATER environment variable."; + exit 1; +fi +if [[ -z "${RAINFALL}" ]]; then + echo "Error: No input rainfall file specified in the RAINFALL environment variables."; + exit 1; +fi + +if [[ -z "${OUTPUT}" ]]; then + echo "Error: No output directory specified in the OUTPUT environment variable."; + exit 1; +fi + +if [[ ! -r "${RAINFALL}" ]]; then + echo "Error: That input rainfall file either doesn't exist, isn't a directory, or we don't have permission to access it."; + exit 3; +fi +if [[ ! -r "${WATER}" ]]; then + echo "Error: That input water depth file either doesn't exist, isn't a directory, or we don't have permission to access it."; + exit 3; +fi + +if [[ ! -d "${OUTPUT}" ]]; then + mkdir "${OUTPUT}"; +fi + +export PATH=$HOME/software/bin:$PATH; + + +OUTPUT_UNIQ="${OUTPUT%/}_uniq"; # Stript trailing slash, if present +OUTPUT_TFRECORD="${OUTPUT%/}_tfrecord"; # Stript trailing slash, if present + +mkdir -p "${OUTPUT_UNIQ}" "${OUTPUT_TFRECORD}"; + +echo ">>> Settings"; + +echo "RAINFALL $RAINFALL"; +echo "WATER $WATER"; +echo "OUTPUT $OUTPUT"; +echo "COUNT_FILE $COUNT_FILE"; +echo "ARGS $ARGS"; + +echo ">>> Installing requirements"; +cd ../aimodel || { echo "Error: Failed to cd to ai model directory"; exit 1; }; +conda run -n py38 pip install -r requirements.txt; +cd ../rainfallwrangler || { echo "Error: Failed to cd back to rainfallwrangler directory"; exit 1; }; +npm install; +echo ">>> Converting dataset to .jsonl.gz"; +/usr/bin/env time -v src/index.mjs recordify --verbose --rainfall "${RAINFALL}" --water "${WATER}" --output "${OUTPUT}" --count-file "${COUNT_FILE}" ${ARGS}; +echo ">>> Deduplicating dataset"; +# This also automatically recompresses for us - hence the source/target rather than in-place +srun --comment 'RainUniq' --exclusive -p compute --exclusive /usr/bin/env time -v src/index.mjs uniq --source "${OUTPUT}" --target "${OUTPUT_UNIQ}" --count-file "${COUNT_FILE}"; +echo ">>> Removing intermediate output"; +rm -r "${OUTPUT}"; +echo ">>> Queuing .jsonl.gz → tfrecord"; +INPUT="${OUTPUT_UNIQ}" OUTPUT="${OUTPUT_TFRECORD}" sbatch ./slurm-jsonl2tfrecord.job; +echo ">>> exited with code $?"; diff --git a/rainfallwrangler/slurm-wrangle-uniq.job b/rainfallwrangler/slurm-wrangle-uniq.job index c8dc40e..d97d1e3 100755 --- a/rainfallwrangler/slurm-wrangle-uniq.job +++ b/rainfallwrangler/slurm-wrangle-uniq.job @@ -2,8 +2,8 @@ #SBATCH -J RainUniq #SBATCH -N 1 #SBATCH -n 28 -#SBATCH -o %j.%N.%a.out -#SBATCH -e %j.%N.%a.err +#SBATCH -o %j.%N.%a.out.log +#SBATCH -e %j.%N.%a.err.log #SBATCH -p compute #SBATCH --time=3-00:00:00 diff --git a/rainfallwrangler/src/lib/io/GzipChildProcess.mjs b/rainfallwrangler/src/lib/io/GzipChildProcess.mjs index 6263dfc..7aa2998 100644 --- a/rainfallwrangler/src/lib/io/GzipChildProcess.mjs +++ b/rainfallwrangler/src/lib/io/GzipChildProcess.mjs @@ -9,7 +9,6 @@ import { end_safe } from './StreamHelpers.mjs'; /** * Spawns and manages a gzip child process. - * @deprecated Use spawn-stream instead * @extends EventEmitter */ class GzipChildProcess extends EventEmitter { diff --git a/rainfallwrangler/src/subcommands/recordify/meta.mjs b/rainfallwrangler/src/subcommands/recordify/meta.mjs index ea39200..bba4455 100644 --- a/rainfallwrangler/src/subcommands/recordify/meta.mjs +++ b/rainfallwrangler/src/subcommands/recordify/meta.mjs @@ -11,5 +11,5 @@ export default function(cli) { .reverse(); }) .argument("water-offset", "Make the water depth data be this many time steps ahead of the rainfall radar data. (default: 1)", 1, "integer") - .argument("output", "The path to the directory to write the generated TFRecord files to.", null, "string"); + .argument("output", "The path to the directory to write the generated .jsonl.gz files to.", null, "string"); } diff --git a/rainfallwrangler/src/subcommands/recordify/recordify.mjs b/rainfallwrangler/src/subcommands/recordify/recordify.mjs index ca8f37b..5a66229 100644 --- a/rainfallwrangler/src/subcommands/recordify/recordify.mjs +++ b/rainfallwrangler/src/subcommands/recordify/recordify.mjs @@ -18,6 +18,10 @@ export default async function() { if(typeof settings.output !== "string") throw new Error(`Error: No output directory specified.`); + if(typeof settings.count_file !== "number") + throw new Error(`Error: --count-file was not specified.`); + if(isNaN(settings.count_file)) + throw new Error(`Error: --count-file was not a number. process.argv: ${process.argv.join(" ")}`); if(!fs.existsSync(settings.output)) await fs.promises.mkdir(settings.output, { recursive: true });