TensorFlow slices method using containerized execution

Solved!
ysaeed4
Level 2
TensorFlow slices method using containerized execution

Hi Experts, 

I am using the tensorflow slices to batch process my images for CNN model. The snapshot of the code is as follows and it runs very well in a Jupyter notebook in dataiku using local execution. 

local_tensor_slices.JPG

When I run the same code in dataiku containerized execution it gives the following errors, I have seen documentation on remote managed folder and using get_download_stream() but I cannot figure out who to apply this in my case. 

 I appreciate your help. 

Kind regards,

Yawar

UnknownError: {{function_node __wrapped__IteratorGetNext_output_types_2_device_/job:localhost/replica:0/task:0/device:CPU:0}} NotFoundError: {{function_node __wrapped__ReadFile_device_/job:localhost/replica:0/task:0/device:CPU:0}} /opt/dataiku/dss/managed_folders/TEST1/qgNfVXiK/BREC/293018_sample_69969_depth_473.6409_BREC.png; No such file or directory [Op:ReadFile]
Traceback (most recent call last):

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/script_ops.py", line 269, in __call__
    return func(device, token, args)

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/script_ops.py", line 147, in __call__
    outputs = self._call(device, args)

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/script_ops.py", line 154, in _call
    ret = self._func(*args)

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/autograph/impl/api.py", line 642, in wrapper
    return func(*args, **kwargs)

  File "<ipython-input-14-ee5f546eba1e>", line 3, in load_and_preprocess_image
    img = tf.io.read_file(full_path)

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/io_ops.py", line 133, in read_file
    return gen_io_ops.read_file(filename, name)

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/gen_io_ops.py", line 578, in read_file
    _ops.raise_from_not_ok_status(e, name)

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/framework/ops.py", line 7209, in raise_from_not_ok_status
    raise core._status_to_exception(e) from None  # pylint: disable=protected-access

tensorflow.python.framework.errors_impl.NotFoundError: {{function_node __wrapped__ReadFile_device_/job:localhost/replica:0/task:0/device:CPU:0}} /opt/dataiku/dss/managed_folders/TEST1/qgNfVXiK/BREC/293018_sample_69969_depth_473.6409_BREC.png; No such file or directory [Op:ReadFile]


	 [[{{node EagerPyFunc}}]] [Op:IteratorGetNext]

Operating system used: Windows 10


Operating system used: Windows 10

0 Kudos
1 Solution
AlexT
Dataiker

Hi @ysaeed4 ,

Given the formating on the post it's hard to say.

Can you please open a support ticket, run this code in a test recipe and share the job diagnostics?
https://doc.dataiku.com/dss/latest/troubleshooting/problems/job-fails.html#getting-a-job-diagnosis

https://doc.dataiku.com/dss/latest/troubleshooting/obtaining-support.html#guidelines-for-submitting-...

Thanks

View solution in original post

0 Kudos
3 Replies
AlexT
Dataiker

Hi,
Since you are using hard-coded paths to a locally managed folder, this can't work from a container as those paths don't exist when using containerized execution.

https://doc.dataiku.com/dss/latest/connecting/managed_folders.html#local-vs-non-local

It would be best if you used the managed folder read/write APIs( get_download_stream) to copy those files to tempfile/tempdir/bytesIO before you feed the paths to your code. You can find some examples here: 

https://developer.dataiku.com/latest/concepts-and-examples/managed-folders.html#load-a-model-from-a-...

Thanks

ysaeed4
Level 2
Author

Thanks @AlexT ,  I tried few things as per the examples you shared but it didnt work. The code I am using is as follows which ends up in errors. can you please review and let me know what needs to be done to run it? 

# Read recipe inputs
train_10_wells = dataiku.Dataset("train_10_wells")
df = train_10_wells.get_dataframe()

litho_10_well_images = dataiku.Folder("qgNfVXiK")

# Split the dataset into training and test sets
df_train, df_val = train_test_split(df, test_size=0.15, random_state=42)

# Create TensorFlow datasets for training and validation
dataset_train = tf.data.Dataset.from_tensor_slices((df_train['full_path'].values, df_train['label_number'].values))
dataset_val = tf.data.Dataset.from_tensor_slices((df_val['full_path'].values, df_val['label_number'].values))

# Create a temporary directory
temp_dir = tempfile.mkdtemp()

# Copy files from the managed folder to the local temporary directory
def copy_files_to_temp_dir(df, temp_dir):
for _, row in df.iterrows():
full_path = row['full_path']
local_file_path = os.path.join(temp_dir, row['path'].lstrip('/')) # Removing the leading '/'
if not os.path.exists(os.path.dirname(local_file_path)):
os.makedirs(os.path.dirname(local_file_path))

# Specify the correct path within the managed folder
with litho_10_well_images.get_download_stream(path=row['path']) as f_remote, open(local_file_path, 'wb') as f_local:
shutil.copyfileobj(f_remote, f_local)

# Copy files from the managed folder to the local temporary directory
copy_files_to_temp_dir(df, temp_dir)


def load_and_preprocess_image(full_path, labels):
# Read and decode the image using TensorFlow functions
img = tf.io.read_file(full_path)
img = tf.image.decode_image(img, channels=3) # Adjust channels as needed

# Perform any necessary preprocessing (e.g., resizing, normalization)
img = tf.image.resize(img, [224, 224]) # Adjust size as needed
img = tf.cast(img, tf.float32) / 255.0 # Normalize to [0, 1]

# Explicitly cast labels to the correct data type
labels = tf.cast(labels, tf.int64) # Adjust the data type as needed

return img, labels

# Use tf.py_function to wrap the load_and_preprocess_image function
def load_and_preprocess_image_tf(full_path, labels):
return tf.py_function(load_and_preprocess_image, [full_path, labels], [tf.float32, labels.dtype])


# Use tf.py_function to wrap the lambda function
def map_function(x, y):
return tf.py_function(lambda x: (os.path.join(temp_dir, x.decode()), y), [x], [tf.string, y.dtype])


# Map the new function to the dataset using local paths
dataset_train_preprocessed = dataset_train.map(map_function).batch(32)


# Map the new function to the dataset using local paths
dataset_val_preprocessed = dataset_val.map(map_function).batch(32)

 

# Clean up the temporary directory after processing
shutil.rmtree(temp_dir)

0 Kudos
AlexT
Dataiker

Hi @ysaeed4 ,

Given the formating on the post it's hard to say.

Can you please open a support ticket, run this code in a test recipe and share the job diagnostics?
https://doc.dataiku.com/dss/latest/troubleshooting/problems/job-fails.html#getting-a-job-diagnosis

https://doc.dataiku.com/dss/latest/troubleshooting/obtaining-support.html#guidelines-for-submitting-...

Thanks

0 Kudos

Labels

?
Labels (4)

Setup info

?
Tags (1)
A banner prompting to get Dataiku