from collections import defaultdict numRows = videoTimeView.count() rowStart = 0 rowEnd = 10000 while (rowStart < numRows) query = "SELECT userid, lessonid, videoid, createdat FROM (SELECT userid, lessonid, videoid, createdat, ROW_NUMBER() OVER (ORDER BY userid, lessonid, videoid, createdat) AS rn FROM videotimeview4) q WHERE rn > %d AND rn < %d ORDER BY userid, lessonid, videoid, createdat" % (rowStart, rowEnd) ulvdata = hiveContext.sql(query).collect() datapoints = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list)))) for x in ulvdata: id = str(x['id']) userid = str(x['userid']) lessonid = str(x['lessonid']) videoid = str(x['videoid']) sceneid = x['sceneid'] time = str(x['createdat'])[:-5] datapoints[userid][lessonid][videoid][time].append((sceneid, id)) datapointsToDelete = [] for u, lessons in sorted(datapoints.items()): for l,video in sorted(lessons.items()): for v,time in sorted(video.items()): for t,scenes in sorted(time.items()): if len(scenes) > 1: sortedScenes = sorted(scenes, key=lambda tup: tup[0]) datapointsToDelete.extend(sortedScenes[1:]) print u, l, v, t, sortedScenes # sudo code # delete datapoints from other db # cleanup memory # get next batch of data rowStart = rowStart + 10000 rowEnd = rowEnd + 10000