Processing only new files in the flow

ptktmsz
Level 2
Processing only new files in the flow

Hi all!

I have fairly straightforward problem (or at least I think that it is like that :)). I have files arriving into a Azure Blob Storage container. I created a flow to process them without a problem, but now I want to automate the flow so it will run on a schedule (like once an hour etc), so it will process ONLY the new files. Is there a way to do that fairly easily? My current solution relies on partitioning data by the timestamp in the filename, and then I can run the process only for the current hour each hour for example. This poses some problems though:

- how can I specify to run the latest hour? Sometimes the latest data has a timestamp that is earlier than the CURRENT_HOUR variable I can use;

- sometimes I want to process files more often than once per hour, so I would require even more granularity;

- sometimes my files don't have any information on the timestamp in the filename.

What would be the best way to approach this? I know that there is a "if dataset changed" trigger in the scenarios. I imagine that it can listen to changes each minute or so, then if new files drop, the scenario will trigger. But then I don't how to specify that only the new files should be processed.

Any kind of help would be very welcome :).

Tomasz


Operating system used: Win10

0 Kudos
7 Replies
Turribeach

As you said the dataset change trigger is the best to use to determine if new files arrived as it can run over managed folders. In terms of only loading new files there are no built-in ways of doing this so you have several options. Ideally you would want to have subfolders within your folder and move files as you process them. For instance files arrive in ./landing then they get moved to ./processing for loading and then after successful load you move them to ./loaded. That way you always know what files you processed and if it fails loading a file you have right there in the ./processing folder to fix the code or the file and try again the same file. Another way would be to build a dataset with every file you load then when the trigger files for new files you check and only load files you havenโ€™t loaded before. 

0 Kudos
ptktmsz
Level 2
Author

Hi Turribeach! Thanks for some insights. I'm not sure if I understand you correctly. How can I identify the new files? For example - I have one new file arriving in my Blob Storage container every 10 minutes. I need my job to run every 10 minutes and process this file (writing in the end to Synapse). How can I automate my flow to process only this new file? Is there a way to move that file to the `processing` folder when it arrives? Thanks for your help :).

0 Kudos

The dataset change trigger is the best option to use to determine if new files arrived into a managed folder. However the trigger doesn't tell you what's new, just that something changed. Therefore this is all something you have to work out yourself. For instance if files arrive into ./landing and then get moved to ./processing then you know for sure that any files present on ./landing need to be processed. In terms of moving the files around the subfolders you will need to use a Python recipe with custom code for this.

0 Kudos
ptktmsz
Level 2
Author

OK, so basically I have to keep a list of processed files somewhere, right? And then check with my custom code for the ones not present to be moved to processing. Thanks for help :).

0 Kudos

Keeping a state is an option but I prefer to use the subdirectories themselves as the indicator to what state each file is in. So if a file is in ./landing it needs to be processed. When it moves to ./processing it's either being processed or failed processing. Finally when the file is loaded you can move it to ./processed so you don't load it again. This is a much more efficient way than having to keep a state for each file.

0 Kudos
ptktmsz
Level 2
Author
Hmm, yeah, but how actually you can determine the files to be moved to the "processing" folder? You need to keep a record somewhere to check which files are the "new" ones and need to be processed? Or am I missing something obvious?
0 Kudos

New files shoud go into ./landing by the source writting process. So anything in ./landing needs to be processed. 

0 Kudos