Read gzip compressed json file in partitioned folder of s3

Options
Toshiaki
Toshiaki Partner, Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer Posts: 5 Partner

Please tell me how to read the gzip-compressed json file in the Dataiku folder (S3) into the dictionary. The S3 path consists of 5 minute data partitions as shown below, and in the Python recipe I want to ungzip and read the data in the 5 minute partition.

YYYY / MM / DD / HH / 00 / xxxxxx00.json.gz
YYYY / MM / DD / HH / 05xxxxxx05.json.gz

Can you give me some sample Python code?

Tagged:

Best Answer

  • Toshiaki
    Toshiaki Partner, Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer Posts: 5 Partner
    Answer ✓
    Options
    Young-Sang san

    I was able to read the file normally. thank you. This ticket will be closed.

    Best Regards,
    Toshiaki

Answers

  • Toshiaki
    Toshiaki Partner, Dataiku DSS Core Designer, Dataiku DSS & SQL, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer Posts: 5 Partner
    edited July 17
    Options

    The following error has occurred, so please give me some advice.

    [2022/04/02-02:24:42.513] [FRT-35-FlowRunnable] [INFO] [dku.flow.activity] - Run thread failed for activity compute_aka4jcxL_2022-03-01-00__00
    com.dataiku.common.server.APIError$SerializedErrorException: Error in python process: At line 37: <class 'AttributeError'>: 'bytes' object has no attribute 'read'
     at com.dataiku.dip.dataflow.exec.JobExecutionResultHandler.handleErrorFile(JobExecutionResultHandler.java:65)
     at com.dataiku.dip.dataflow.exec.JobExecutionResultHandler.handleExecutionResultNoProcessDiedException(JobExecutionResultHandler.java:32)

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    # Write recipe outputs
    predict_input_folder = dataiku.Folder("predict_input_folder")
    predict_input_folder_info = predict_input_folder.get_info()
    
    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    # Read if input json
    current_partition = dataiku.dku_flow_variables["DKU_DST_MINUTE"]
    dir_path = f'{dataiku.dku_flow_variables["DKU_DST_YYYYMMDDHH"].replace("-", "/")}/{dataiku.dku_flow_variables["DKU_DST_MINUTE"]}'
    filename = f'waf_logs_{dataiku.dku_flow_variables["DKU_DST_YYYYMMDDHH"].replace("-", "")}{dataiku.dku_flow_variables["DKU_DST_MINUTE"]}.json.gz'
    s3_path = f'{dir_path}/{filename}'
    
    with predict_input_folder.get_download_stream(s3_path) as f:
        data = f.read()
        jsondict_list = json.load(gzip.decompress(data))

  • Young-Sang
    Young-Sang Dataiker Posts: 10 Dataiker
    Options
    Hi Toshi,
    As mentioned in the below document, get_download_stream method returns a Python "file-like" object, which can be used to other python functions that support non-seekable file-like objects.

    Since gzip.open supports this, you will need to modify the code like below:
    import json
    import gzip
    with predict_input_folder.get_download_stream(s3_path) as stream:
    with gzip.open(stream) as f:
    jsondict_list​​​​​ = json.loads(f.read())
Setup Info
    Tags
      Help me…