Ready for Dataiku 10? Try out the Crash Course on new features!GET STARTED

Select files depending on their versions

Solved!
Ineedi2
Level 2
Level 2
Select files depending on their versions

Hi everyone,

I will try to explain my problem as best as possible.
Imagine I have a general folder called General. In this folder I have a subfolder by year 2021, 2020, 2019 .. and in each of these subfolders I still have subfolders for each of the months, 01,02,03 etc ...

If I only have one file for each of the month subfolders, I just have to choose to import "General" so that DSS automatically stack all the files to form one in my flow.

Now imagine that in each of the month files I can have several file versions, for example AA202001_v1, AA202001_v2, AA202001_v3, and that in this case I only have to choose the latest file version, namely here AA202001_V3 (which will then be in General> 2020> 01).
What do you think is the most suitable solution?

I was thinking of going through a python script which creates a list of all filenames with their latest version and ask it to stack them. However, your opinions are of great interest to me.

thanks 

Ineedi2

0 Kudos
1 Solution
Ineedi2
Level 2
Level 2
Author

I have made good progress, but I am encountering a difficulty for which I have no solution.

I have created a managed folder where my files are in HDFS. This allows me to manipulate them via the API. Then for each of the subfolders (these are YYYY-MM partitions), I run the following query to create a copy of my latest version of the file (here by sorting down the file name) whose name ends with "_latest":

import dataiku
import ntpath

dossier = dataiku.Folder("zzZZzzZ")

for partitionn in dossier.list_partitions():
list_files_in_partitions = dossier.list_paths_in_partition(partition=partitionn)
list_files_in_partitions.sort(reverse=True)
with dossier.get_download_stream(list_files_in_partitions[0]) as f:
dossier.upload_stream(ntpath.split(list_files_in_partitions[0])[0]+"/"+"Chiffres"+str(partitionn)+"_latest.txt", f)

On the Notebook, the code works very well. Then I use "Files from folder" with the following regex: ^.*?latest.txt$ to filter only the _latest created in each subfolder.

Then to launch the code each time a new file arrives in General, I create a trigger "Trigger on dataset change" and select the managed folder.
Everything works fine.

The problem is when I put my python code in the step of my scenario and it runs, I get the following message:

java.io.EOFException
EOFException

I am aware that this does not give much information but do you have any idea anyway?

Thanks

Ineedi2

View solution in original post

0 Kudos
9 Replies
AlexT
Dataiker
Dataiker

Hi @Ineedi2,

Indeed using python to read and filter for the files you need is probably the most flexible solution.

There is an example you could build on here: https://community.dataiku.com/t5/Using-Dataiku/Listing-and-Reading-all-the-files-in-a-Managed-Folder... 

The other option would be to use the "Files in Folder" and filter by blob or regex. This would only work if you want to retrieve v3 for all subfolders that have v2 as the others v3 then this approach would not work. Unless you can rename the files to make sure the latest version has a specific identifier e.g  "_latest" for example and filter for that. 

 

Screenshot 2021-10-15 at 12.39.09.png

Ineedi2
Level 2
Level 2
Author

Thank you for your answer.

I think i'm going to copy the last version file for each subfolder with "_latest" at the end.

My files are stored HDFS. Can I handle a file directly on HDFS without going through a DSS dataset?

 

0 Kudos
Ineedi2
Level 2
Level 2
Author

However I just noticed that I can have access to an HDFS directory directly with a managed folder.
Logic would want me to place my managed folder at General level and then query with a Python recipe to build the latest ones, and then filter and stack them?

0 Kudos
AlexT
Dataiker
Dataiker

Yes, the approach sounds reasonable Let us know if you have any issues or further. 

 

Ineedi2
Level 2
Level 2
Author

I have made good progress, but I am encountering a difficulty for which I have no solution.

I have created a managed folder where my files are in HDFS. This allows me to manipulate them via the API. Then for each of the subfolders (these are YYYY-MM partitions), I run the following query to create a copy of my latest version of the file (here by sorting down the file name) whose name ends with "_latest":

import dataiku
import ntpath

dossier = dataiku.Folder("zzZZzzZ")

for partitionn in dossier.list_partitions():
list_files_in_partitions = dossier.list_paths_in_partition(partition=partitionn)
list_files_in_partitions.sort(reverse=True)
with dossier.get_download_stream(list_files_in_partitions[0]) as f:
dossier.upload_stream(ntpath.split(list_files_in_partitions[0])[0]+"/"+"Chiffres"+str(partitionn)+"_latest.txt", f)

On the Notebook, the code works very well. Then I use "Files from folder" with the following regex: ^.*?latest.txt$ to filter only the _latest created in each subfolder.

Then to launch the code each time a new file arrives in General, I create a trigger "Trigger on dataset change" and select the managed folder.
Everything works fine.

The problem is when I put my python code in the step of my scenario and it runs, I get the following message:

java.io.EOFException
EOFException

I am aware that this does not give much information but do you have any idea anyway?

Thanks

Ineedi2

View solution in original post

0 Kudos
AlexT
Dataiker
Dataiker

Hi @Ineedi2 ,

Indeed strange, I tested your code snippet and it worked fine for me within a scenario. 

The error EOFException itself is not very revealing without more context and full stack trace. 

It may be work checking

1) Who the scenario is running as and make sure the user has permissions on the managed folder. 

Try adding a line :

filesystem_path = folder.get_path()

After : 

dossier = dataiku.Folder("zzZZzzZ")

To see if you still get the same error. If you do then and you can't find more information in the logs that can be shared here. I would suggest you open a support ticket with us and share the scenario diagnostics. 

https://doc.dataiku.com/dss/latest/troubleshooting/obtaining-support.html#guidelines-for-submitting-... 

Thanks

 

Ineedi2
Level 2
Level 2
Author

Thank you for your reply. I recreated the custom python step and ... it worked ...

I noticed that the trigger (which fires when my managed folder changes) loops. This seems logical because the new _latest files change the modification date ...

I haven't thought about it yet, is there an easy way to fix it? Otherwise I will do it again for a custom python for the trigger.

0 Kudos
AlexT
Dataiker
Dataiker

If I understand correctly you just need to disable the recheck option in your case since it will trigger again by making changes via your scenario.

 

Screenshot 2021-10-19 at 11.33.46.png

0 Kudos
Ineedi2
Level 2
Level 2
Author

Today, all is OK , thanks AlexT for your time.

Have a good day

0 Kudos
A banner prompting to get Dataiku DSS