TensorFlow slices method using containerized execution

ysaeed4
ysaeed4 Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 5
edited July 16 in Using Dataiku

Hi Experts,

I am using the tensorflow slices to batch process my images for CNN model. The snapshot of the code is as follows and it runs very well in a Jupyter notebook in dataiku using local execution.

local_tensor_slices.JPG

When I run the same code in dataiku containerized execution it gives the following errors, I have seen documentation on remote managed folder and using get_download_stream() but I cannot figure out who to apply this in my case.

I appreciate your help.

Kind regards,

Yawar

UnknownError: {{function_node __wrapped__IteratorGetNext_output_types_2_device_/job:localhost/replica:0/task:0/device:CPU:0}} NotFoundError: {{function_node __wrapped__ReadFile_device_/job:localhost/replica:0/task:0/device:CPU:0}} /opt/dataiku/dss/managed_folders/TEST1/qgNfVXiK/BREC/293018_sample_69969_depth_473.6409_BREC.png; No such file or directory [Op:ReadFile]
Traceback (most recent call last):

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/script_ops.py", line 269, in __call__
    return func(device, token, args)

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/script_ops.py", line 147, in __call__
    outputs = self._call(device, args)

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/script_ops.py", line 154, in _call
    ret = self._func(*args)

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/autograph/impl/api.py", line 642, in wrapper
    return func(*args, **kwargs)

  File "<ipython-input-14-ee5f546eba1e>", line 3, in load_and_preprocess_image
    img = tf.io.read_file(full_path)

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/io_ops.py", line 133, in read_file
    return gen_io_ops.read_file(filename, name)

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/ops/gen_io_ops.py", line 578, in read_file
    _ops.raise_from_not_ok_status(e, name)

  File "/opt/dataiku/code-env/lib64/python3.9/site-packages/tensorflow/python/framework/ops.py", line 7209, in raise_from_not_ok_status
    raise core._status_to_exception(e) from None  # pylint: disable=protected-access

tensorflow.python.framework.errors_impl.NotFoundError: {{function_node __wrapped__ReadFile_device_/job:localhost/replica:0/task:0/device:CPU:0}} /opt/dataiku/dss/managed_folders/TEST1/qgNfVXiK/BREC/293018_sample_69969_depth_473.6409_BREC.png; No such file or directory [Op:ReadFile]


     [[{{node EagerPyFunc}}]] [Op:IteratorGetNext]

Operating system used: Windows 10


Operating system used: Windows 10

Best Answer

Answers

  • Alexandru
    Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,212 Dataiker

    Hi,
    Since you are using hard-coded paths to a locally managed folder, this can't work from a container as those paths don't exist when using containerized execution.

    https://doc.dataiku.com/dss/latest/connecting/managed_folders.html#local-vs-non-local

    It would be best if you used the managed folder read/write APIs( get_download_stream) to copy those files to tempfile/tempdir/bytesIO before you feed the paths to your code. You can find some examples here:

    https://developer.dataiku.com/latest/concepts-and-examples/managed-folders.html#load-a-model-from-a-remote-managed-folder

    Thanks

  • ysaeed4
    ysaeed4 Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 5

    Thanks @AlexT
    , I tried few things as per the examples you shared but it didnt work. The code I am using is as follows which ends up in errors. can you please review and let me know what needs to be done to run it?

    # Read recipe inputs
    train_10_wells = dataiku.Dataset("train_10_wells")
    df = train_10_wells.get_dataframe()

    litho_10_well_images = dataiku.Folder("qgNfVXiK")

    # Split the dataset into training and test sets
    df_train, df_val = train_test_split(df, test_size=0.15, random_state=42)

    # Create TensorFlow datasets for training and validation
    dataset_train = tf.data.Dataset.from_tensor_slices((df_train['full_path'].values, df_train['label_number'].values))
    dataset_val = tf.data.Dataset.from_tensor_slices((df_val['full_path'].values, df_val['label_number'].values))

    # Create a temporary directory
    temp_dir = tempfile.mkdtemp()

    # Copy files from the managed folder to the local temporary directory
    def copy_files_to_temp_dir(df, temp_dir):
    for _, row in df.iterrows():
    full_path = row['full_path']
    local_file_path = os.path.join(temp_dir, row['path'].lstrip('/')) # Removing the leading '/'
    if not os.path.exists(os.path.dirname(local_file_path)):
    os.makedirs(os.path.dirname(local_file_path))

    # Specify the correct path within the managed folder
    with litho_10_well_images.get_download_stream(path=row['path']) as f_remote, open(local_file_path, 'wb') as f_local:
    shutil.copyfileobj(f_remote, f_local)

    # Copy files from the managed folder to the local temporary directory
    copy_files_to_temp_dir(df, temp_dir)


    def load_and_preprocess_image(full_path, labels):
    # Read and decode the image using TensorFlow functions
    img = tf.io.read_file(full_path)
    img = tf.image.decode_image(img, channels=3) # Adjust channels as needed

    # Perform any necessary preprocessing (e.g., resizing, normalization)
    img = tf.image.resize(img, [224, 224]) # Adjust size as needed
    img = tf.cast(img, tf.float32) / 255.0 # Normalize to [0, 1]

    # Explicitly cast labels to the correct data type
    labels = tf.cast(labels, tf.int64) # Adjust the data type as needed

    return img, labels

    # Use tf.py_function to wrap the load_and_preprocess_image function
    def load_and_preprocess_image_tf(full_path, labels):
    return tf.py_function(load_and_preprocess_image, [full_path, labels], [tf.float32, labels.dtype])


    # Use tf.py_function to wrap the lambda function
    def map_function(x, y):
    return tf.py_function(lambda x: (os.path.join(temp_dir, x.decode()), y), [x], [tf.string, y.dtype])


    # Map the new function to the dataset using local paths
    dataset_train_preprocessed = dataset_train.map(map_function).batch(32)


    # Map the new function to the dataset using local paths
    dataset_val_preprocessed = dataset_val.map(map_function).batch(32)

    # Clean up the temporary directory after processing
    shutil.rmtree(temp_dir)

Setup Info
    Tags
      Help me…