Merge branch 'main' of git.starbeamrainbowlabs.com:sbrl/PhD-Rainfall-Radar

This commit is contained in:
Starbeamrainbowlabs 2022-11-10 20:49:01 +00:00
commit d8be26d476
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
20 changed files with 225 additions and 48 deletions

View file

@ -5,7 +5,7 @@
#SBATCH --gres=gpu:1 #SBATCH --gres=gpu:1
#SBATCH -o %j.%N.%a.out.log #SBATCH -o %j.%N.%a.out.log
#SBATCH -e %j.%N.%a.err.log #SBATCH -e %j.%N.%a.err.log
#SBATCH -p gpu05 #SBATCH -p gpu05,gpu
#SBATCH --time=5-00:00:00 #SBATCH --time=5-00:00:00
#SBATCH --mem=61440 #SBATCH --mem=61440
# 61440 = 60GiB memory required # 61440 = 60GiB memory required

View file

@ -1,5 +1,5 @@
#!/usr/bin/env bash #!/usr/bin/env bash
#SBATCH -J RainAIv3 #SBATCH -J RainAISG
#SBATCH -N 1 #SBATCH -N 1
#SBATCH -n 14 #SBATCH -n 14
#SBATCH --gres=gpu:1 #SBATCH --gres=gpu:1

View file

@ -85,7 +85,7 @@ class RainfallWaterContraster(object):
validation_data=dataset_validate, validation_data=dataset_validate,
epochs=self.epochs, epochs=self.epochs,
callbacks=make_callbacks(self.dir_output, self.model_predict), callbacks=make_callbacks(self.dir_output, self.model_predict),
steps_per_epoch=10 # For testing # steps_per_epoch=10 # For testing
) )
def embed(self, dataset): def embed(self, dataset):

View file

@ -14,6 +14,7 @@ from .model_rainfallwater_segmentation import model_rainfallwater_segmentation
from .helpers import make_callbacks from .helpers import make_callbacks
from .helpers import summarywriter from .helpers import summarywriter
from .components.LayerConvNeXtGamma import LayerConvNeXtGamma from .components.LayerConvNeXtGamma import LayerConvNeXtGamma
from .components.LayerStack2Image import LayerStack2Image
from .helpers.summarywriter import summarywriter from .helpers.summarywriter import summarywriter
class RainfallWaterSegmenter(object): class RainfallWaterSegmenter(object):
@ -70,6 +71,7 @@ class RainfallWaterSegmenter(object):
self.model = tf.keras.models.load_model(filepath_checkpoint, custom_objects={ self.model = tf.keras.models.load_model(filepath_checkpoint, custom_objects={
"LayerConvNeXtGamma": LayerConvNeXtGamma, "LayerConvNeXtGamma": LayerConvNeXtGamma,
"LayerStack2Image": LayerStack2Image
}) })
@ -80,7 +82,7 @@ class RainfallWaterSegmenter(object):
validation_data=dataset_validate, validation_data=dataset_validate,
epochs=self.epochs, epochs=self.epochs,
callbacks=make_callbacks(self.dir_output, self.model), callbacks=make_callbacks(self.dir_output, self.model),
steps_per_epoch=10 # For testing # steps_per_epoch=10 # For testing
) )
def embed(self, rainfall_embed): def embed(self, rainfall_embed):

View file

@ -0,0 +1,30 @@
import tensorflow as tf
from lib.io.handle_open import handle_open
class CallbackNBatchCsv(tf.keras.callbacks.Callback):
def __init__(self, filepath, n_batches=1, separator="\t", **kwargs) -> None:
super().__init__(**kwargs)
self.n_batches = n_batches
self.separator = separator
self.handle = handle_open(filepath, "w")
self.batches_seen = 0
self.keys = None
def write_header(self, logs): # logs = metrics
self.keys = logs.keys()
self.keys = sorted(self.keys)
self.handle.write("\t".join(self.keys)+"\n")
def on_batch_end(self, batch, logs=None): # logs = metrics
if self.batches_seen == 0:
self.write_header(logs)
if self.batches_seen % self.n_batches == 0:
self.handle.write(self.separator.join([str(logs[key]) for key in self.keys]) + "\n")
self.batches_seen += 1

View file

@ -16,20 +16,22 @@ class LayerCheeseMultipleOut(tf.keras.layers.Layer):
self.param_batch_size = batch_size self.param_batch_size = batch_size
self.param_feature_dim = feature_dim self.param_feature_dim = feature_dim
self.weight_temperature = tf.Variable(name="loss_temperature", shape=1, initial_value=tf.constant([0.07])) self.weight_temperature = tf.Variable(name="loss_temperature", shape=1, initial_value=tf.constant([
self.weight_nce = tf.Variable( math.log(1 / 0.07)
name="loss_nce", ]))
shape=(batch_size, feature_dim), # self.weight_nce = tf.Variable(
initial_value=tf.random.truncated_normal( # name="loss_nce",
(feature_dim), # shape=(batch_size, feature_dim),
stddev=1.0 / math.sqrt(128) # initial_value=tf.random.truncated_normal(
) # [feature_dim],
) # stddev=1.0 / math.sqrt(128)
self.weight_nce_bias = tf.Variable( # )
name="loss_nce_bias", # )
shape=(feature_dim), # self.weight_nce_bias = tf.Variable(
initial_value=tf.zeros((feature_dim)) # name="loss_nce_bias",
) # shape=(feature_dim),
# initial_value=tf.zeros((feature_dim))
# )
def get_config(self): def get_config(self):
config = super(LayerCheeseMultipleOut, self).get_config() config = super(LayerCheeseMultipleOut, self).get_config()

View file

@ -1,3 +1,5 @@
import math
import tensorflow as tf import tensorflow as tf
class LossContrastive(tf.keras.losses.Loss): class LossContrastive(tf.keras.losses.Loss):
@ -10,24 +12,44 @@ class LossContrastive(tf.keras.losses.Loss):
rainfall, water = tf.unstack(y_pred, axis=-2) rainfall, water = tf.unstack(y_pred, axis=-2)
# print("LOSS:call y_true", y_true.shape) # print("LOSS:call y_true", y_true.shape)
# print("LOSS:call y_pred", y_pred.shape) # print("LOSS:call y_pred", y_pred.shape)
# print("BEFORE_RESHAPE rainfall", rainfall) print("START rainfall", rainfall)
# print("BEFORE_RESHAPE water", water) print("START water", water)
# # Ensure the shapes are defined # # Ensure the shapes are defined
# rainfall = tf.reshape(rainfall, [self.batch_size, rainfall.shape[1]]) # rainfall = tf.reshape(rainfall, [self.batch_size, rainfall.shape[1]])
# water = tf.reshape(water, [self.batch_size, water.shape[1]]) # water = tf.reshape(water, [self.batch_size, water.shape[1]])
# normalise features
# rainfall = rainfall / tf.math.l2_normalize(rainfall, axis=1)
# water = water / tf.math.l2_normalize(water, axis=1)
logits = tf.linalg.matmul(rainfall, tf.transpose(water)) * tf.clip_by_value(tf.math.exp(self.weight_temperature), 0, 100) print("AFTER_L2 rainfall", rainfall)
print("AFTER_L2 water", water)
# print("LOGITS", logits) # logits = tf.linalg.matmul(rainfall, tf.transpose(water)) * tf.clip_by_value(tf.math.exp(self.weight_temperature), 0, 100)
logits = tf.linalg.matmul(rainfall, tf.transpose(water)) * tf.math.exp(self.weight_temperature)
labels = tf.eye(self.batch_size, dtype=tf.int32) print("LOGITS", logits)
loss_rainfall = tf.keras.metrics.binary_crossentropy(labels, logits, from_logits=True, axis=0)
loss_water = tf.keras.metrics.binary_crossentropy(labels, logits, from_logits=True, axis=1) # labels = tf.eye(self.batch_size, dtype=tf.int32) # we *would* do this if we were using mean squared error...
labels = tf.range(self.batch_size, dtype=tf.int32) # each row is a different category we think
loss_rainfall = tf.keras.metrics.sparse_categorical_crossentropy(labels, logits, from_logits=True, axis=0)
loss_water = tf.keras.metrics.sparse_categorical_crossentropy(labels, logits, from_logits=True, axis=1)
# loss_rainfall = tf.keras.metrics.binary_crossentropy(labels, logits, from_logits=True, axis=0)
# loss_water = tf.keras.metrics.binary_crossentropy(labels, logits, from_logits=True, axis=1)
print("LABELS", labels)
print("LOSS_RAINFALL", loss_rainfall)
print("LOSS_WATER", loss_water)
loss = (loss_rainfall + loss_water) / 2 loss = (loss_rainfall + loss_water) / 2
print("LOSS", loss)
loss = tf.math.reduce_mean(loss)
print("LOSS FINAL", loss)
# cosine_similarity results in tensor of range -1 - 1, but tf.sparse.eye has range 0 - 1 # cosine_similarity results in tensor of range -1 - 1, but tf.sparse.eye has range 0 - 1
# print("LABELS", labels) # print("LABELS", labels)
# print("LOSS_rainfall", loss_rainfall) # print("LOSS_rainfall", loss_rainfall)
@ -35,3 +57,12 @@ class LossContrastive(tf.keras.losses.Loss):
# print("LOSS", loss) # print("LOSS", loss)
return loss return loss
if __name__ == "__main__":
weight_temperature = tf.Variable(name="loss_temperature", shape=1, initial_value=tf.constant([
math.log(1 / 0.07)
]))
loss = LossContrastive(weight_temperature=weight_temperature, batch_size=64)
tensor_input = tf.random.uniform([64, 2, 512])
print(loss(tf.constant(1), tensor_input))

View file

@ -3,10 +3,12 @@ import os
import tensorflow as tf import tensorflow as tf
from ..components.CallbackCustomModelCheckpoint import CallbackCustomModelCheckpoint from ..components.CallbackCustomModelCheckpoint import CallbackCustomModelCheckpoint
from ..components.CallbackNBatchCsv import CallbackNBatchCsv
def make_callbacks(dirpath, model_predict): def make_callbacks(dirpath, model_predict):
dirpath_checkpoints = os.path.join(dirpath, "checkpoints") dirpath_checkpoints = os.path.join(dirpath, "checkpoints")
filepath_metrics = os.path.join(dirpath, "metrics.tsv") filepath_metrics = os.path.join(dirpath, "metrics.tsv")
filepath_metrics_batch = os.path.join(dirpath, "metrics_batch64.tsv")
if not os.path.exists(dirpath_checkpoints): if not os.path.exists(dirpath_checkpoints):
os.mkdir(dirpath_checkpoints) os.mkdir(dirpath_checkpoints)
@ -24,5 +26,9 @@ def make_callbacks(dirpath, model_predict):
filename=filepath_metrics, filename=filepath_metrics,
separator="\t" separator="\t"
), ),
tf.keras.callbacks.ProgbarLogger() CallbackNBatchCsv(
filepath=filepath_metrics_batch,
n_batches=64
),
tf.keras.callbacks.ProgbarLogger(count_mode="steps") # batches
] ]

View file

@ -5,6 +5,9 @@ import json
from ..io.readfile import readfile from ..io.readfile import readfile
def read_metadata(dirpath_dataset): def read_metadata(dirpath_dataset):
if os.path.isfile(dirpath_dataset):
filepath_metadata = os.path.join(os.path.dirname(dirpath_dataset), "metadata.json")
else:
filepath_metadata = os.path.join(dirpath_dataset, "metadata.json") filepath_metadata = os.path.join(dirpath_dataset, "metadata.json")
return json.loads(readfile(filepath_metadata)) return json.loads(readfile(filepath_metadata))

View file

@ -2,7 +2,10 @@ import io
import gzip import gzip
def handle_open(filepath, mode): def handle_open(filepath, mode, force_textwrite_gzip=True):
if mode == "w" and mode.endswith(".gz") and force_textwrite_gzip:
mode = "wt"
if filepath.endswith(".gz"): if filepath.endswith(".gz"):
return gzip.open(filepath, mode) return gzip.open(filepath, mode)
else: else:

View file

@ -21,6 +21,13 @@ def parse_args():
return parser return parser
def count_batches(dataset):
count = 0
for _ in dataset:
count += 1
return count
def run(args): def run(args):
if (not hasattr(args, "water_size")) or args.water_size == None: if (not hasattr(args, "water_size")) or args.water_size == None:
args.water_size = 100 args.water_size = 100
@ -40,7 +47,11 @@ def run(args):
dirpath_input=args.input, dirpath_input=args.input,
batch_size=args.batch_size, batch_size=args.batch_size,
) )
dataset_metadata = read_metadata(args.input)
# print("BATCHES_TRAIN", count_batches(dataset_train)) # 18500 for the full 2006-2020 dataset
# print("BATCHES_VALIDATE", count_batches(dataset_validate)) # 4653 for the full 2006-2020 dataset
# for (items, label) in dataset_train: # for (items, label) in dataset_train:
# print("ITEMS", len(items), [ item.shape for item in items ]) # print("ITEMS", len(items), [ item.shape for item in items ])
@ -59,4 +70,3 @@ def run(args):
) )
ai.train(dataset_train, dataset_validate) ai.train(dataset_train, dataset_validate)

View file

@ -140,4 +140,4 @@ def run(args):
handle.close() handle.close()
sys.stderr.write(">>> Complete\n") sys.stderr.write("\n>>> Complete\n")

View file

@ -40,6 +40,9 @@ def run(args):
if (not hasattr(args, "params")) or args.params == None: if (not hasattr(args, "params")) or args.params == None:
args.params = find_paramsjson(args.checkpoint) args.params = find_paramsjson(args.checkpoint)
if args.params == None:
logger.error("Error: Failed to find params.json. Please ensure it's either in the same directory as the checkpoint or 1 level above")
return
if (not hasattr(args, "read_multiplier")) or args.read_multiplier == None: if (not hasattr(args, "read_multiplier")) or args.read_multiplier == None:
args.read_multiplier = 0 args.read_multiplier = 0
if (not hasattr(args, "records_per_file")) or args.records_per_file == None: if (not hasattr(args, "records_per_file")) or args.records_per_file == None:
@ -87,13 +90,13 @@ def run(args):
logger.info(f"Records per file: {args.records_per_file}") logger.info(f"Records per file: {args.records_per_file}")
if output_mode == MODE_JSONL: if output_mode == MODE_JSONL:
do_jsonl(args, ai, dataset, args.model_code, model_params) do_jsonl(args, ai, dataset, model_params)
else: else:
do_png(args, ai, dataset, args.model_code, model_params) do_png(args, ai, dataset, model_params)
sys.stderr.write(">>> Complete\n") sys.stderr.write(">>> Complete\n")
def do_png(args, ai, dataset, model_code, model_params): def do_png(args, ai, dataset, model_params):
if not os.path.exists(os.path.dirname(args.output)): if not os.path.exists(os.path.dirname(args.output)):
os.mkdir(os.path.dirname(args.output)) os.mkdir(os.path.dirname(args.output))
@ -114,7 +117,7 @@ def do_png(args, ai, dataset, model_code, model_params):
segmentation_plot( segmentation_plot(
water_actual, water_predict, water_actual, water_predict,
model_code, args.model_code,
args.output.replace("+d", str(i)) args.output.replace("+d", str(i))
) )
@ -152,7 +155,7 @@ def do_jsonl(args, ai, dataset, model_params):
i_batch = 0 i_batch = 0
for water_predict in water_predict_batch: for water_predict in water_predict_batch:
# [ width, height, softmax_probabilities ] → [ batch, width, height ] # [ width, height, softmax_probabilities ] → [ batch, width, height ]
water_predict = tf.math.argmax(water_predict, axis=-1) # water_predict = tf.math.argmax(water_predict, axis=-1)
# [ width, height ] # [ width, height ]
water_actual = tf.squeeze(water_actual_batch[i_batch]) water_actual = tf.squeeze(water_actual_batch[i_batch])
@ -165,11 +168,11 @@ def do_jsonl(args, ai, dataset, model_params):
item_obj = {} item_obj = {}
if "rainfall_actual" in args.log: if "rainfall_actual" in args.log:
item_obj["rainfall_actual"] = rainfall_actual_batch[i_batch].numpy().list() item_obj["rainfall_actual"] = rainfall_actual_batch[i_batch].numpy().tolist()
if "water_actual" in args.log: if "water_actual" in args.log:
item_obj["water_actual"] = water_actual.numpy().list() item_obj["water_actual"] = water_actual.numpy().tolist()
if "water_predict" in args.log: if "water_predict" in args.log:
item_obj["water_predict"] = water_predict.numpy().list() item_obj["water_predict"] = water_predict.numpy().tolist()
handle.write(json.dumps(item_obj, separators=(',', ':'))+"\n") # Ref https://stackoverflow.com/a/64710892/1460422 handle.write(json.dumps(item_obj, separators=(',', ':'))+"\n") # Ref https://stackoverflow.com/a/64710892/1460422

View file

@ -2,8 +2,8 @@
#SBATCH -J Json2TfR #SBATCH -J Json2TfR
#SBATCH -N 1 #SBATCH -N 1
#SBATCH -n 28 #SBATCH -n 28
#SBATCH -o %j.%N.%a.out #SBATCH -o %j.%N.%a.out.log
#SBATCH -e %j.%N.%a.err #SBATCH -e %j.%N.%a.err.log
#SBATCH -p compute #SBATCH -p compute
#SBATCH --time=3-00:00:00 #SBATCH --time=3-00:00:00

View file

@ -0,0 +1,84 @@
#!/usr/bin/env bash
#SBATCH -J RWrangle
#SBATCH -N 1
#SBATCH -n 1
#SBATCH -o %j.%N.%a.rainwrangle.out.log
#SBATCH -e %j.%N.%a.rainwrangle.err.log
#SBATCH -p compute
#SBATCH --time=3-00:00:00
#SBATCH --mem=8096
# * 8GB RAM
set -e;
module load utilities/multi
module load readline/7.0
module load gcc/10.2.0
# module load cuda/11.5.0
module load python/anaconda/4.6/miniconda/3.7
RAINFALL="${RAINFALL:-$HOME/data/nimrod_ceda.jsonl.gz}";
WATER="${WATER:-$HOME/data/WaterDepths-new.stream.asc.gz}";
OUTPUT="${OUTPUT}";
COUNT_FILE="${COUNT_FILE:-4096}";
if [[ -z "${WATER}" ]]; then
echo "Error: No input water depth file specified in the WATER environment variable.";
exit 1;
fi
if [[ -z "${RAINFALL}" ]]; then
echo "Error: No input rainfall file specified in the RAINFALL environment variables.";
exit 1;
fi
if [[ -z "${OUTPUT}" ]]; then
echo "Error: No output directory specified in the OUTPUT environment variable.";
exit 1;
fi
if [[ ! -r "${RAINFALL}" ]]; then
echo "Error: That input rainfall file either doesn't exist, isn't a directory, or we don't have permission to access it.";
exit 3;
fi
if [[ ! -r "${WATER}" ]]; then
echo "Error: That input water depth file either doesn't exist, isn't a directory, or we don't have permission to access it.";
exit 3;
fi
if [[ ! -d "${OUTPUT}" ]]; then
mkdir "${OUTPUT}";
fi
export PATH=$HOME/software/bin:$PATH;
OUTPUT_UNIQ="${OUTPUT%/}_uniq"; # Stript trailing slash, if present
OUTPUT_TFRECORD="${OUTPUT%/}_tfrecord"; # Stript trailing slash, if present
mkdir -p "${OUTPUT_UNIQ}" "${OUTPUT_TFRECORD}";
echo ">>> Settings";
echo "RAINFALL $RAINFALL";
echo "WATER $WATER";
echo "OUTPUT $OUTPUT";
echo "COUNT_FILE $COUNT_FILE";
echo "ARGS $ARGS";
echo ">>> Installing requirements";
cd ../aimodel || { echo "Error: Failed to cd to ai model directory"; exit 1; };
conda run -n py38 pip install -r requirements.txt;
cd ../rainfallwrangler || { echo "Error: Failed to cd back to rainfallwrangler directory"; exit 1; };
npm install;
echo ">>> Converting dataset to .jsonl.gz";
/usr/bin/env time -v src/index.mjs recordify --verbose --rainfall "${RAINFALL}" --water "${WATER}" --output "${OUTPUT}" --count-file "${COUNT_FILE}" ${ARGS};
echo ">>> Deduplicating dataset";
# This also automatically recompresses for us - hence the source/target rather than in-place
srun --comment 'RainUniq' --exclusive -p compute --exclusive /usr/bin/env time -v src/index.mjs uniq --source "${OUTPUT}" --target "${OUTPUT_UNIQ}" --count-file "${COUNT_FILE}";
echo ">>> Removing intermediate output";
rm -r "${OUTPUT}";
echo ">>> Queuing .jsonl.gz → tfrecord";
INPUT="${OUTPUT_UNIQ}" OUTPUT="${OUTPUT_TFRECORD}" sbatch ./slurm-jsonl2tfrecord.job;
echo ">>> exited with code $?";

View file

@ -2,8 +2,8 @@
#SBATCH -J RainUniq #SBATCH -J RainUniq
#SBATCH -N 1 #SBATCH -N 1
#SBATCH -n 28 #SBATCH -n 28
#SBATCH -o %j.%N.%a.out #SBATCH -o %j.%N.%a.out.log
#SBATCH -e %j.%N.%a.err #SBATCH -e %j.%N.%a.err.log
#SBATCH -p compute #SBATCH -p compute
#SBATCH --time=3-00:00:00 #SBATCH --time=3-00:00:00

View file

@ -9,7 +9,6 @@ import { end_safe } from './StreamHelpers.mjs';
/** /**
* Spawns and manages a gzip child process. * Spawns and manages a gzip child process.
* @deprecated Use spawn-stream instead
* @extends EventEmitter * @extends EventEmitter
*/ */
class GzipChildProcess extends EventEmitter { class GzipChildProcess extends EventEmitter {

View file

@ -11,5 +11,5 @@ export default function(cli) {
.reverse(); .reverse();
}) })
.argument("water-offset", "Make the water depth data be this many time steps ahead of the rainfall radar data. (default: 1)", 1, "integer") .argument("water-offset", "Make the water depth data be this many time steps ahead of the rainfall radar data. (default: 1)", 1, "integer")
.argument("output", "The path to the directory to write the generated TFRecord files to.", null, "string"); .argument("output", "The path to the directory to write the generated .jsonl.gz files to.", null, "string");
} }

View file

@ -18,6 +18,10 @@ export default async function() {
if(typeof settings.output !== "string") if(typeof settings.output !== "string")
throw new Error(`Error: No output directory specified.`); throw new Error(`Error: No output directory specified.`);
if(typeof settings.count_file !== "number")
throw new Error(`Error: --count-file was not specified.`);
if(isNaN(settings.count_file))
throw new Error(`Error: --count-file was not a number. process.argv: ${process.argv.join(" ")}`);
if(!fs.existsSync(settings.output)) if(!fs.existsSync(settings.output))
await fs.promises.mkdir(settings.output, { recursive: true }); await fs.promises.mkdir(settings.output, { recursive: true });