Want to Stop Rebuilding "Expensive" Parts of your Flow? Explicit Builds are the Answer!READ MORE

Read gzip compressed json file in partitioned folder of s3

Solved!
Toshi
Level 2
Level 2
Read gzip compressed json file in partitioned folder of s3

Please tell me how to read the gzip-compressed json file in the Dataiku folder (S3) into the dictionary. The S3 path consists of 5 minute data partitions as shown below, and in the Python recipe I want to ungzip and read the data in the 5 minute partition.

YYYY / MM / DD / HH / 00 / xxxxxx00.json.gz
YYYY / MM / DD / HH / 05xxxxxx05.json.gz

 

Can you give me some sample Python code?

0 Kudos
1 Solution
Toshi
Level 2
Level 2
Author
Young-Sang san

I was able to read the file normally. thank you. This ticket will be closed.

Best Regards,
Toshiaki

View solution in original post

0 Kudos
3 Replies
Toshi
Level 2
Level 2
Author

The following error has occurred, so please give me some advice.

[2022/04/02-02:24:42.513] [FRT-35-FlowRunnable] [INFO] [dku.flow.activity] - Run thread failed for activity compute_aka4jcxL_2022-03-01-00__00
com.dataiku.common.server.APIError$SerializedErrorException: Error in python process: At line 37: <class 'AttributeError'>: 'bytes' object has no attribute 'read'
	at com.dataiku.dip.dataflow.exec.JobExecutionResultHandler.handleErrorFile(JobExecutionResultHandler.java:65)
	at com.dataiku.dip.dataflow.exec.JobExecutionResultHandler.handleExecutionResultNoProcessDiedException(JobExecutionResultHandler.java:32)

 

# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
# Write recipe outputs
predict_input_folder = dataiku.Folder("predict_input_folder")
predict_input_folder_info = predict_input_folder.get_info()

# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
# Read if input json
current_partition = dataiku.dku_flow_variables["DKU_DST_MINUTE"]
dir_path = f'{dataiku.dku_flow_variables["DKU_DST_YYYYMMDDHH"].replace("-", "/")}/{dataiku.dku_flow_variables["DKU_DST_MINUTE"]}'
filename = f'waf_logs_{dataiku.dku_flow_variables["DKU_DST_YYYYMMDDHH"].replace("-", "")}{dataiku.dku_flow_variables["DKU_DST_MINUTE"]}.json.gz'
s3_path = f'{dir_path}/{filename}'

with predict_input_folder.get_download_stream(s3_path) as f:
    data = f.read()
    jsondict_list = json.load(gzip.decompress(data))

 

 

 

0 Kudos
Young-Sang_Lee
Dataiker
Dataiker
Hi Toshi,
 
As mentioned in the below document, get_download_stream method returns a Python "file-like" object, which can be used to other python functions that support non-seekable file-like objects.

Since gzip.open supports this, you will need to modify the code like below:
import json
import gzip
with predict_input_folder.get_download_stream(s3_path) as stream:
    with gzip.open(stream) as f:
        jsondict_list​​​​​ = json.loads(f.read())
Toshi
Level 2
Level 2
Author
Young-Sang san

I was able to read the file normally. thank you. This ticket will be closed.

Best Regards,
Toshiaki
0 Kudos