2022-08-11 17:26:28 +00:00
import io
import json
import os
import sys
import argparse
import re
from loguru import logger
import tensorflow as tf
import numpy as np
2022-09-27 16:49:54 +00:00
from lib . io . writefile import writefile
2022-09-14 15:02:36 +00:00
from lib . io . handle_open import handle_open
2022-08-11 17:26:28 +00:00
from lib . ai . RainfallWaterContraster import RainfallWaterContraster
from lib . dataset . dataset import dataset_predict
from lib . io . find_paramsjson import find_paramsjson
from lib . io . readfile import readfile
from lib . vis . embeddings import vis_embeddings
2022-09-27 15:59:31 +00:00
MODE_JSONL = 1
MODE_TFRECORD = 2
2022-08-11 17:26:28 +00:00
def parse_args ( ) :
parser = argparse . ArgumentParser ( description = " Output feature maps using a given pretrained contrastive model. " )
# parser.add_argument("--config", "-c", help="Filepath to the TOML config file to load.", required=True)
2022-09-27 15:59:31 +00:00
parser . add_argument ( " --input " , " -i " , help = " Path to input directory containing the .tfrecord(.gz) files to predict for. If a single file is passed instead, then only that file will be converted. " , required = True )
parser . add_argument ( " --output " , " -o " , help = " Path to output file to write output to. If the file extension .tfrecord.gz is used instead of .jsonl.gz, then a tfrecord file is written. " )
2022-09-27 17:17:07 +00:00
parser . add_argument ( " --records-per-file " , help = " Optional. If specified, this limits the number of records written to each file. When using this option, you MUST have the string ' +d ' (without quotes) somewhere in your output filepath. " , type = int )
2022-08-11 17:26:28 +00:00
parser . add_argument ( " --checkpoint " , " -c " , help = " Checkpoint file to load model weights from. " , required = True )
parser . add_argument ( " --params " , " -p " , help = " Optional. The file containing the model hyperparameters (usually called ' params.json ' ). If not specified, it ' s location will be determined automatically. " )
parser . add_argument ( " --reads-multiplier " , help = " Optional. The multiplier for the number of files we should read from at once. Defaults to 1.5, which means read ceil(NUMBER_OF_CORES * 1.5). Set to a higher number of systems with high read latency to avoid starving the GPU of data. " )
return parser
def run ( args ) :
# Note that we do NOT check to see if the checkpoint file exists, because Tensorflow/Keras requires that we pass the stem instead of the actual index file..... :-/
if ( not hasattr ( args , " params " ) ) or args . params == None :
args . params = find_paramsjson ( args . checkpoint )
if ( not hasattr ( args , " read_multiplier " ) ) or args . read_multiplier == None :
args . read_multiplier = 1.5
2022-09-27 15:59:31 +00:00
if ( not hasattr ( args , " records_per_file " ) ) or args . records_per_file == None :
args . records_per_file = 0 # 0 = unlimited
2022-08-11 17:26:28 +00:00
if not os . path . exists ( args . params ) :
raise Exception ( f " Error: The specified filepath params.json hyperparameters ( ' { args . params } ) does not exist. " )
if not os . path . exists ( args . checkpoint ) :
raise Exception ( f " Error: The specified filepath to the checkpoint to load ( ' { args . checkpoint } ) does not exist. " )
filepath_output = args . output if hasattr ( args , " output " ) and args . output != None else " - "
2022-09-14 16:11:06 +00:00
ai = RainfallWaterContraster . from_checkpoint ( args . checkpoint , * * json . loads ( readfile ( args . params ) ) )
2022-08-11 17:26:28 +00:00
sys . stderr . write ( f " \n \n >>> This is TensorFlow { tf . __version__ } \n \n \n " )
2022-09-27 15:59:31 +00:00
# Note that if using a directory of input files, the output order is NOT GUARANTEED TO BE THE SAME. In fact, it probably won't be.
2022-09-13 18:18:59 +00:00
dataset = dataset_predict (
2022-08-11 17:26:28 +00:00
dirpath_input = args . input ,
parallel_reads_multiplier = args . read_multiplier
)
# for items in dataset_train.repeat(10):
# print("ITEMS", len(items))
# print("LEFT", [ item.shape for item in items[0] ])
# print("ITEMS DONE")
# exit(0)
2022-09-27 15:59:31 +00:00
output_mode = MODE_TFRECORD if filepath_output . endswith ( " .tfrecord " ) or filepath_output . endswith ( " .tfrecord.gz " ) else MODE_JSONL
2022-09-27 17:10:58 +00:00
logger . info ( " Output mode is " + ( " TFRECORD " if output_mode == MODE_TFRECORD else " JSONL " ) )
2022-09-27 17:25:45 +00:00
logger . info ( f " Records per file: { args . records_per_file } " )
2022-09-27 17:10:58 +00:00
2022-09-27 16:38:12 +00:00
write_mode = " wt " if filepath_output . endswith ( " .gz " ) else " w "
if output_mode == MODE_TFRECORD :
write_mode = " wb "
2022-08-11 17:26:28 +00:00
handle = sys . stdout
2022-09-28 15:35:22 +00:00
filepath_metadata = None
2022-08-11 17:26:28 +00:00
if filepath_output != " - " :
2022-09-27 15:59:31 +00:00
handle = handle_open (
2022-09-27 17:17:07 +00:00
filepath_output if args . records_per_file < = 0 else filepath_output . replace ( " +d " , str ( 0 ) ) ,
2022-09-27 16:38:12 +00:00
write_mode
2022-09-27 15:59:31 +00:00
)
2022-09-28 15:35:22 +00:00
filepath_metadata = os . path . join ( os . path . dirname ( filepath_output ) , " metadata.json " )
2022-09-27 16:49:54 +00:00
2022-09-27 17:13:21 +00:00
logger . info ( f " filepath_output: { filepath_output } " )
2022-09-28 15:35:22 +00:00
logger . info ( f " filepath_params: { filepath_metadata } " )
2022-08-11 17:26:28 +00:00
2022-09-14 14:12:07 +00:00
i = 0
2022-09-27 15:59:31 +00:00
i_file = i
files_done = 0
for step_rainfall , step_water in ai . embed ( dataset ) :
if args . records_per_file > 0 and i_file > args . records_per_file :
files_done + = 1
i_file = 0
handle . close ( )
2022-09-27 17:43:43 +00:00
logger . info ( f " PROGRESS:file { files_done } " )
2022-09-27 17:17:07 +00:00
handle = handle_open ( filepath_output . replace ( " +d " , str ( files_done + 1 ) ) , write_mode )
2022-09-27 15:59:31 +00:00
if output_mode == MODE_JSONL :
handle . write ( json . dumps ( step_rainfall . numpy ( ) . tolist ( ) , separators = ( ' , ' , ' : ' ) ) + " \n " ) # Ref https://stackoverflow.com/a/64710892/1460422
elif output_mode == MODE_TFRECORD :
2022-09-28 15:35:22 +00:00
if i == 0 and filepath_metadata is not None :
writefile ( filepath_metadata , json . dumps ( {
2022-09-27 16:49:54 +00:00
" rainfallradar " : step_rainfall . shape . as_list ( ) ,
" waterdepth " : step_water . shape . as_list ( )
} ) )
2022-09-27 15:59:31 +00:00
step_rainfall = tf . train . BytesList ( value = [ tf . io . serialize_tensor ( step_rainfall , name = " rainfall " ) . numpy ( ) ] )
step_water = tf . train . BytesList ( value = [ tf . io . serialize_tensor ( step_water , name = " water " ) . numpy ( ) ] )
record = tf . train . Example ( features = tf . train . Features ( feature = {
" rainfallradar " : tf . train . Feature ( bytes_list = step_rainfall ) ,
" waterdepth " : tf . train . Feature ( bytes_list = step_water )
} ) )
handle . write ( record . SerializeToString ( ) )
else :
raise Exception ( " Error: Unknown output mode. " )
2022-09-14 14:12:07 +00:00
2022-09-16 15:07:16 +00:00
if i == 0 or i % 100 == 0 :
2022-09-14 14:12:07 +00:00
sys . stderr . write ( f " [pretrain:predict] STEP { i } \r " )
2022-09-27 15:59:31 +00:00
2022-09-14 14:12:07 +00:00
i + = 1
2022-09-27 15:59:31 +00:00
i_file + = 1
2022-08-11 17:26:28 +00:00
2022-09-13 18:35:44 +00:00
handle . close ( )
2022-08-11 17:26:28 +00:00
sys . stderr . write ( " >>> Complete \n " )