diff --git a/aimodel/src/lib/dataset/dataset.py b/aimodel/src/lib/dataset/dataset.py index daa756a..88593c3 100644 --- a/aimodel/src/lib/dataset/dataset.py +++ b/aimodel/src/lib/dataset/dataset.py @@ -9,38 +9,35 @@ import tensorflow as tf from shuffle import shuffle -def parse_line(line): - if tf.strings.length(line) <= 0: - return None - try: - # Yes, this is really what the function is called that converts a string tensor to a regular python string..... - obj = json.loads(line.numpy()) - except: - logger.warn("Ignoring invalid line.") - return None - - rainfall = tf.constant(obj.rainfallradar, dtype=tf.float32) - waterdepth = tf.constant(obj.waterdepth, dtype=tf.float32) - - # Inputs, dummy label since we'll be using semi-supervised contrastive learning - return rainfall, waterdepth -def make_dataset(filepaths, batch_size, shuffle_buffer_size=128, parallel_reads_multiplier=2): - return tf.data.TextLineDataset( - filenames=tf.data.Dataset.from_tensor_slices(filepaths).shuffle(len(filepaths), reshuffle_each_iteration=True), - compression_type=tf.constant("GZIP"), - num_parallel_reads=math.ceil(os.cpu_count() * parallel_reads_multiplier) # iowait can cause issues - especially on Viper - # TODO: Get rid of this tf.py_function call somehow, because it acquires the Python Global Interpreter lock, which prevents more than 1 thread to run at a time, and .map() uses threads.... - ).map(tf.py_function(parse_line), num_parallel_calls=tf.data.AUTOTUNE) \ - .filter(lambda item : item is not None) \ - .shuffle(1) \ +# TO PARSE: +@tf.function +def parse_item(item): + parsed = tf.io.parse_single_example(item, features={ + "rainfallradar": tf.io.FixedLenFeature([], tf.string), + "waterdepth": tf.io.FixedLenFeature([], tf.string) + }) + rainfall = tf.io.parse_tensor(parsed["rainfallradar"], out_type=tf.float32) + water = tf.io.parse_tensor(parsed["waterdepth"], out_type=tf.float32) + + # TODO: The shape of the resulting tensor can't be statically determined, so we need to reshape here + + # TODO: Any other additional parsing here, since multiple .map() calls are not optimal + return rainfall, water + +def make_dataset(filenames, compression_type="GZIP", parallel_reads_multiplier=1.5, shuffle_buffer_size=128, batch_size=64): + return tf.data.TFRecordDataset(filenames, + compression_type=compression_type, + num_parallel_reads=math.ceil(os.cpu_count() * parallel_reads_multiplier) + ).shuffle(shuffle_buffer_size) \ + .map(parse_item, num_parallel_calls=tf.data.AUTOTUNE) \ .batch(batch_size) \ .prefetch(tf.data.AUTOTUNE) - -def dataset(dirpath_input, batch_size=64, train_percentage=0.8): + +def dataset(dirpath_input, batch_size=64, train_percentage=0.8, parallel_reads_multiplier=1.5): filepaths = shuffle(list(filter( - lambda filepath: str(filepath).endswith(".jsonl.gz"), + lambda filepath: str(filepath).endswith(".tfrecord.gz"), [ file.path for file in os.scandir(dirpath_input) ] # .path on a DirEntry object yields the absolute filepath ))) filepaths_count = len(filepaths) @@ -49,8 +46,9 @@ def dataset(dirpath_input, batch_size=64, train_percentage=0.8): filepaths_train = filepaths[:dataset_splitpoint] filepaths_validate = filepaths[dataset_splitpoint:] - dataset_train = make_dataset(filepaths_train, batch_size) - dataset_validate = make_dataset(filepaths_validate, batch_size) + dataset_train = make_dataset(filepaths_train, batch_size=batch_size, parallel_reads_multiplier=parallel_reads_multiplier) + dataset_validate = make_dataset(filepaths_validate, batch_size=batch_size, parallel_reads_multiplier=parallel_reads_multiplier) + return dataset_train, dataset_validate diff --git a/rainfallwrangler/src/lib/python/json2tfrecord.py b/rainfallwrangler/src/lib/python/json2tfrecord.py index 1c9619c..f2caebc 100755 --- a/rainfallwrangler/src/lib/python/json2tfrecord.py +++ b/rainfallwrangler/src/lib/python/json2tfrecord.py @@ -11,27 +11,6 @@ if not os.environ.get("NO_SILENCE"): silence_tensorflow() import tensorflow as tf -# TO PARSE: -@tf.function -def parse_item(item): - parsed = tf.io.parse_single_example(item, features={ - "rainfallradar": tf.io.FixedLenFeature([], tf.string), - "waterdepth": tf.io.FixedLenFeature([], tf.string) - }) - rainfall = tf.io.parse_tensor(parsed["rainfallradar"], out_type=tf.float32) - water = tf.io.parse_tensor(parsed["waterdepth"], out_type=tf.float32) - - # TODO: The shape of the resulting tensor can't be statically determined, so we need to reshape here - - # TODO: Any other additional parsing here, since multiple .map() calls are not optimal - return rainfall, water - -def parse_example(filenames, compression_type="GZIP", parallel_reads_multiplier=1.5): - return tf.data.TFRecordDataset(filenames, - compression_type=compression_type, - num_parallel_reads=math.ceil(os.cpu_count() * parallel_reads_multiplier) - ).map(parse_item, num_parallel_calls=tf.data.AUTOTUNE) - def parse_args(): parser = argparse.ArgumentParser(description="Convert a generated .jsonl.gz file to a .tfrecord.gz file") parser.add_argument("--input", "-i", help="Path to the input file to convert.", required=True)