Deduplicate using values from one column

chemical
Level 1
Deduplicate using values from one column

Hello, I'm trying deduplicating my output with Python but I can't understand how to edit the code. I need to deduplicate the table TDD_FT_CUSTOMER_JOURNEY_joined_filtered_joined because - as you can see from the screen below - I have for the same ID (first column) duplicate values. Indeed the column CD_BOOKING also has the same repeating values.

dataiku_ded.PNG

This is the Pyton code that I haven't touched yet.

 

# -*- coding: utf-8 -*-
import dataiku
import pandas as pd, numpy as np
from dataiku import pandasutils as pdu

# Read recipe inputs
TDD_FT_CUSTOMER_JOURNEY_joined_filtered_joined = dataiku.Dataset("TDD_FT_CUSTOMER_JOURNEY_joined_filtered_joined")
TDD_FT_CUSTOMER_JOURNEY_joined_filtered_joined_df = TDD_FT_CUSTOMER_JOURNEY_joined_filtered_joined.get_dataframe()


# Compute recipe outputs from inputs
# TODO: Replace this part by your actual code that computes the output, as a Pandas dataframe
# NB: DSS also supports other kinds of APIs for reading and writing data. Please see doc.

phone_deduplicated_df = TDD_FT_CUSTOMER_JOURNEY_joined_filtered_joined_df # For this sample code, simply copy input to output


# Write recipe outputs
phone_deduplicated = dataiku.Dataset("phone_deduplicated")
phone_deduplicated.write_with_schema(phone_deduplicated_df)

 

0 Kudos
5 Replies
VitaliyD
Dataiker

Hi, 

This is the Pandas library usage question. You would need to use the drop_duplicates method.

In your case it will be something like below:

# -*- coding: utf-8 -*-
import dataiku
import pandas as pd, numpy as np
from dataiku import pandasutils as pdu

# Read recipe inputs
TDD_FT_CUSTOMER_JOURNEY_joined_filtered_joined = dataiku.Dataset("TDD_FT_CUSTOMER_JOURNEY_joined_filtered_joined")
TDD_FT_CUSTOMER_JOURNEY_joined_filtered_joined_df = TDD_FT_CUSTOMER_JOURNEY_joined_filtered_joined.get_dataframe()


# Compute recipe outputs from inputs
# TODO: Replace this part by your actual code that computes the output, as a Pandas dataframe
# NB: DSS also supports other kinds of APIs for reading and writing data. Please see doc.

phone_deduplicated_df = TDD_FT_CUSTOMER_JOURNEY_joined_filtered_joined_df.drop_duplicates() # For this sample code, simply copy input to output


# Write recipe outputs
phone_deduplicated = dataiku.Dataset("phone_deduplicated")
phone_deduplicated.write_with_schema(phone_deduplicated_df)

 

Best, 

Vitaliy

0 Kudos
chemical
Level 1
Author

Hi VitalyD,

many thanks for your help, the job took 3 minutes to get done and it seemed strange, considering the original table contained 11,872,276 data. In the output there were 740,234 but there were still duplicates, so I exported the file in Excel and did remove duplicates there.

Do you know why Dataiku didn't cancel all duplicates?

0 Kudos
tgb417

@chemical 

Regarding duplicates not removed.

If I remember correctly MS Excel does duplicate removals in a case insensitive way and may also do some white space trimming.

Are the duplicates that remain in the python Pandas method duplicates that have case differences.  Again if I remember correctly pandas is going to look for exact duplicates in a case sensitive way (a != A).  If you want to do duplicate matching in a case insensitive way you may need to find a way to deal with that.

There are many ways you could choose to do this.  For example trimming and setting everything to lower case.  Then do the deduplication.

Here are some more threads on deduplication that may have some further useful insights for you.

https://community.dataiku.com/t5/Using-Dataiku/Distinct-recipe/m-p/31172 

https://community.dataiku.com/t5/Using-Dataiku/How-to-identify-duplicates-in-a-data-set/m-p/25831 

 

--Tom
0 Kudos
chemical
Level 1
Author

Is there any section of the community where I can ask for support about log explanation in case a recipe failed? This is the log.

[10:37:25] [INFO] [dku.utils]  - *************** Recipe code failed **************
[10:37:25] [INFO] [dku.utils]  - Begin Python stack
[10:37:25] [INFO] [dku.utils]  - Traceback (most recent call last):
[10:37:25] [INFO] [dku.utils]  -   File "/data/dataiku/dss_data/jobs/SMS_DATABASE_FRA/Build_phone_deduplicated__NP__2023-07-12T05-35-36.403/compute_phone_deduplicated_NP/python-recipe/pyoutF83fSoruiiwl/python-exec-wrapper.py", line 204, in <module>
[10:37:25] [INFO] [dku.utils]  -     exec(f.read())
[10:37:25] [INFO] [dku.utils]  -   File "<string>", line 8, in <module>
[10:37:25] [INFO] [dku.utils]  -   File "/opt/dataiku-dss-11.4.0/python/dataiku/core/dataset.py", line 639, in get_dataframe
[10:37:25] [INFO] [dku.utils]  -     keep_default_na=keep_default_na)
[10:37:25] [INFO] [dku.utils]  -   File "/opt/dataiku-dss-11.4.0/python36.packages/pandas/io/parsers.py", line 678, in parser_f
[10:37:25] [INFO] [dku.utils]  -     return _read(filepath_or_buffer, kwds)
[10:37:25] [INFO] [dku.utils]  -   File "/opt/dataiku-dss-11.4.0/python36.packages/pandas/io/parsers.py", line 446, in _read
[10:37:25] [INFO] [dku.utils]  -     data = parser.read(nrows)
[10:37:25] [INFO] [dku.utils]  -   File "/opt/dataiku-dss-11.4.0/python36.packages/pandas/io/parsers.py", line 1036, in read
[10:37:25] [INFO] [dku.utils]  -     ret = self._engine.read(nrows)
[10:37:25] [INFO] [dku.utils]  -   File "/opt/dataiku-dss-11.4.0/python36.packages/pandas/io/parsers.py", line 1848, in read
[10:37:25] [INFO] [dku.utils]  -     data = self._reader.read(nrows)
[10:37:25] [INFO] [dku.utils]  -   File "pandas/_libs/parsers.pyx", line 876, in pandas._libs.parsers.TextReader.read
[10:37:25] [INFO] [dku.utils]  -   File "pandas/_libs/parsers.pyx", line 919, in pandas._libs.parsers.TextReader._read_low_memory
[10:37:25] [INFO] [dku.utils]  -   File "pandas/_libs/parsers.pyx", line 2141, in pandas._libs.parsers._concatenate_chunks
[10:37:25] [INFO] [dku.utils]  - MemoryError
[10:37:25] [INFO] [dku.utils]  - End Python stack
[10:37:25] [INFO] [dku.utils]  - 2023-07-12 10:37:25,302 INFO Check if spark is available
[10:37:25] [INFO] [dku.utils]  - 2023-07-12 10:37:25,310 INFO Not stopping a spark context: No module named 'pyspark'
[10:37:28] [INFO] [dku.utils]  - 2023-07-12 10:37:28,110 14165 INFO [Child] Process 14167 exited with exit=1 signal=0
[10:37:28] [INFO] [dku.utils]  - 2023-07-12 10:37:28,110 14165 INFO Full child code: 1
[10:37:28] [WARN] [dku.resource] - stat file for pid 14167 does not exist. Process died?
[10:37:28] [DEBUG] [dku.resourceusage] - Reporting completion of CRU:{"context":{"type":"JOB_ACTIVITY","authIdentifier":"corporate.training","projectKey":"SMS_DATABASE_FRA","jobId":"Build_phone_deduplicated__NP__2023-07-12T05-35-36.403","activityId":"compute_phone_deduplicated_NP","activityType":"recipe","recipeType":"python","recipeName":"compute_phone_deduplicated"},"type":"LOCAL_PROCESS","id":"HSLDGLyNMVPXThsK","startTime":1689156934447,"localProcess":{"pid":14167,"commandName":"/data/dataiku/dss_data/bin/python","cpuUserTimeMS":283920,"cpuSystemTimeMS":22490,"cpuChildrenUserTimeMS":0,"cpuChildrenSystemTimeMS":0,"cpuTotalMS":306410,"cpuCurrent":0.9698403311649911,"vmSizeMB":19909,"vmRSSMB":18279,"vmHWMMB":26947,"vmRSSAnonMB":18276,"vmDataMB":19330,"vmSizePeakMB":28641,"vmRSSPeakMB":26888,"vmRSSTotalMBS":12440915,"majorFaults":2,"childrenMajorFaults":0}}
[10:37:28] [INFO] [dip.exec.resultHandler] - Error file found, trying to throw it: /data/dataiku/dss_data/jobs/SMS_DATABASE_FRA/Build_phone_deduplicated__NP__2023-07-12T05-35-36.403/compute_phone_deduplicated_NP/python-recipe/pyoutF83fSoruiiwl/error.json
[10:37:28] [INFO] [dip.exec.resultHandler] - Raw error is{"errorType":"\u003cclass \u0027MemoryError\u0027\u003e","message":"","detailedMessage":"At line 8: \u003cclass \u0027MemoryError\u0027\u003e: ","stackTrace":[]}
[10:37:28] [INFO] [dip.exec.resultHandler] - After enrichment of error file, error is: {"errorType":"\u003cclass \u0027MemoryError\u0027\u003e","message":"Error in Python process: ","detailedMessage":"Error in Python process: At line 8: \u003cclass \u0027MemoryError\u0027\u003e: ","stackTrace":[]}
[10:37:28] [INFO] [dku.flow.activity] - Run thread failed for activity compute_phone_deduplicated_NP
com.dataiku.common.server.APIError$SerializedErrorException: Error in Python process: At line 8: <class 'MemoryError'>: 
	at com.dataiku.dip.dataflow.exec.JobExecutionResultHandler.throwFromErrorFileIfPossible(JobExecutionResultHandler.java:106)
	at com.dataiku.dip.dataflow.exec.JobExecutionResultHandler.throwFromErrorFileOrLogs(JobExecutionResultHandler.java:39)
	at com.dataiku.dip.dataflow.exec.JobExecutionResultHandler.throwFromErrorFileOrLogs(JobExecutionResultHandler.java:34)
	at com.dataiku.dip.dataflow.exec.JobExecutionResultHandler.handleExecutionResult(JobExecutionResultHandler.java:26)
	at com.dataiku.dip.dataflow.exec.AbstractCodeBasedActivityRunner.execute(AbstractCodeBasedActivityRunner.java:77)
	at com.dataiku.dip.dataflow.exec.AbstractPythonRecipeRunner.executeScript(AbstractPythonRecipeRunner.java:57)
	at com.dataiku.dip.recipes.code.python.PythonRecipeRunner.run(PythonRecipeRunner.java:76)
	at com.dataiku.dip.dataflow.jobrunner.ActivityRunner$FlowRunnableThread.run(ActivityRunner.java:378)
[10:37:28] [INFO] [dku.flow.activity] running compute_phone_deduplicated_NP - activity is finished
[10:37:28] [ERROR] [dku.flow.activity] running compute_phone_deduplicated_NP - Activity failed
com.dataiku.common.server.APIError$SerializedErrorException: Error in Python process: At line 8: <class 'MemoryError'>: 
	at com.dataiku.dip.dataflow.exec.JobExecutionResultHandler.throwFromErrorFileIfPossible(JobExecutionResultHandler.java:106)
	at com.dataiku.dip.dataflow.exec.JobExecutionResultHandler.throwFromErrorFileOrLogs(JobExecutionResultHandler.java:39)
	at com.dataiku.dip.dataflow.exec.JobExecutionResultHandler.throwFromErrorFileOrLogs(JobExecutionResultHandler.java:34)
	at com.dataiku.dip.dataflow.exec.JobExecutionResultHandler.handleExecutionResult(JobExecutionResultHandler.java:26)
	at com.dataiku.dip.dataflow.exec.AbstractCodeBasedActivityRunner.execute(AbstractCodeBasedActivityRunner.java:77)
	at com.dataiku.dip.dataflow.exec.AbstractPythonRecipeRunner.executeScript(AbstractPythonRecipeRunner.java:57)
	at com.dataiku.dip.recipes.code.python.PythonRecipeRunner.run(PythonRecipeRunner.java:76)
	at com.dataiku.dip.dataflow.jobrunner.ActivityRunner$FlowRunnableThread.run(ActivityRunner.java:378)
[10:37:28] [INFO] [dku.flow.activity] running compute_phone_deduplicated_NP - Executing default post-activity lifecycle hook
[10:37:28] [INFO] [dku.flow.activity] running compute_phone_deduplicated_NP - Done post-activity tasks

 

0 Kudos
VitaliyD
Dataiker

Hi,

There is no specific section for errors. The Python process ran out of memory from the error stack you provided. So you must ensure it has enough memory to load and process the whole dataset.

If you need more details, we will need to inspect the job diagnosis, so it will be better to handle it through a support ticket https://doc.dataiku.com/dss/latest/troubleshooting/obtaining-support.html.

Regarding using Pandas to duplicate, this is not really about DSS as you are using Python with Pandas, so there is nothing extra that DSS does in the Python code recipes, and the outcome will be the same as running the code outside of DSS (I mean drop_duplicates method). This means that this is most likely source data and how you use the method related, not DSS related.

Alternatively, you can use the Distinct visual recipe instead. 

Best,

Vitaliy

0 Kudos