TensorFlow slices method using containerized execution
Hi Experts,
I am using the tensorflow slices to batch process my images for CNN model. The snapshot of the code is as follows and it runs very well in a Jupyter notebook in dataiku using local execution.
When I run the same code in dataiku containerized execution it gives the following errors, I have seen documentation on remote managed folder and using get_download_stream() but I cannot figure out who to apply this in my case.
I appreciate your help.
Kind regards,
Yawar
UnknownError: {{function_node __wrapped__IteratorGetNext_output_types_2_device_/job:localhost/replica:0/task:0/device:CPU:0}} NotFoundError: {{function_node __wrapped__ReadFile_device_/job:localhost/replica:0/task:0/device:CPU:0}} /opt/dataiku/dss/managed_folders/TEST1/qgNfVXiK/BREC/293018_sample_69969_depth_473.6409_BREC.png; No such file or directory [Op:ReadFile]
Traceback (most recent call last):
File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/script_ops.py", line 269, in __call__
return func(device, token, args)
File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/script_ops.py", line 147, in __call__
outputs = self._call(device, args)
File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/script_ops.py", line 154, in _call
ret = self._func(*args)
File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/autograph/impl/api.py", line 642, in wrapper
return func(*args, **kwargs)
File "<ipython-input-14-ee5f546eba1e>", line 3, in load_and_preprocess_image
img = tf.io.read_file(full_path)
File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/io_ops.py", line 133, in read_file
return gen_io_ops.read_file(filename, name)
File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/gen_io_ops.py", line 578, in read_file
_ops.raise_from_not_ok_status(e, name)
File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/framework/ops.py", line 7209, in raise_from_not_ok_status
raise core._status_to_exception(e) from None # pylint: disable=protected-access
tensorflow.python.framework.errors_impl.NotFoundError: {{function_node __wrapped__ReadFile_device_/job:localhost/replica:0/task:0/device:CPU:0}} /opt/dataiku/dss/managed_folders/TEST1/qgNfVXiK/BREC/293018_sample_69969_depth_473.6409_BREC.png; No such file or directory [Op:ReadFile]
[[{{node EagerPyFunc}}]] [Op:IteratorGetNext]
Operating system used: Windows 10
Operating system used: Windows 10
Best Answer
-
Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,226 Dataiker
Hi @ysaeed4
,Given the formating on the post it's hard to say.
Can you please open a support ticket, run this code in a test recipe and share the job diagnostics?
https://doc.dataiku.com/dss/latest/troubleshooting/problems/job-fails.html#getting-a-job-diagnosis
https://doc.dataiku.com/dss/latest/troubleshooting/obtaining-support.html#guidelines-for-submitting-a-support-ticketThanks
Answers
-
Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,226 Dataiker
Hi,
Since you are using hard-coded paths to a locally managed folder, this can't work from a container as those paths don't exist when using containerized execution.
https://doc.dataiku.com/dss/latest/connecting/managed_folders.html#local-vs-non-local
It would be best if you used the managed folder read/write APIs( get_download_stream) to copy those files to tempfile/tempdir/bytesIO before you feed the paths to your code. You can find some examples here:
https://developer.dataiku.com/latest/concepts-and-examples/managed-folders.html#load-a-model-from-a-remote-managed-folder
Thanks -
ysaeed4 Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 5 ✭
Thanks @AlexT
, I tried few things as per the examples you shared but it didnt work. The code I am using is as follows which ends up in errors. can you please review and let me know what needs to be done to run it?# Read recipe inputs
train_10_wells = dataiku.Dataset("train_10_wells")
df = train_10_wells.get_dataframe()litho_10_well_images = dataiku.Folder("qgNfVXiK")
# Split the dataset into training and test sets
df_train, df_val = train_test_split(df, test_size=0.15, random_state=42)# Create TensorFlow datasets for training and validation
dataset_train = tf.data.Dataset.from_tensor_slices((df_train['full_path'].values, df_train['label_number'].values))
dataset_val = tf.data.Dataset.from_tensor_slices((df_val['full_path'].values, df_val['label_number'].values))# Create a temporary directory
temp_dir = tempfile.mkdtemp()# Copy files from the managed folder to the local temporary directory
def copy_files_to_temp_dir(df, temp_dir):
for _, row in df.iterrows():
full_path = row['full_path']
local_file_path = os.path.join(temp_dir, row['path'].lstrip('/')) # Removing the leading '/'
if not os.path.exists(os.path.dirname(local_file_path)):
os.makedirs(os.path.dirname(local_file_path))# Specify the correct path within the managed folder
with litho_10_well_images.get_download_stream(path=row['path']) as f_remote, open(local_file_path, 'wb') as f_local:
shutil.copyfileobj(f_remote, f_local)# Copy files from the managed folder to the local temporary directory
copy_files_to_temp_dir(df, temp_dir)
def load_and_preprocess_image(full_path, labels):
# Read and decode the image using TensorFlow functions
img = tf.io.read_file(full_path)
img = tf.image.decode_image(img, channels=3) # Adjust channels as needed# Perform any necessary preprocessing (e.g., resizing, normalization)
img = tf.image.resize(img, [224, 224]) # Adjust size as needed
img = tf.cast(img, tf.float32) / 255.0 # Normalize to [0, 1]# Explicitly cast labels to the correct data type
labels = tf.cast(labels, tf.int64) # Adjust the data type as neededreturn img, labels
# Use tf.py_function to wrap the load_and_preprocess_image function
def load_and_preprocess_image_tf(full_path, labels):
return tf.py_function(load_and_preprocess_image, [full_path, labels], [tf.float32, labels.dtype])
# Use tf.py_function to wrap the lambda function
def map_function(x, y):
return tf.py_function(lambda x: (os.path.join(temp_dir, x.decode()), y), [x], [tf.string, y.dtype])
# Map the new function to the dataset using local paths
dataset_train_preprocessed = dataset_train.map(map_function).batch(32)
# Map the new function to the dataset using local paths
dataset_val_preprocessed = dataset_val.map(map_function).batch(32)# Clean up the temporary directory after processing
shutil.rmtree(temp_dir)