-
- from collections import defaultdict
- numRows = videoTimeView.count()
- rowStart = 0
- rowEnd = 10000
- while (rowStart < numRows)
- query = "SELECT userid, lessonid, videoid, createdat FROM (SELECT userid, lessonid, videoid, createdat, ROW_NUMBER() OVER (ORDER BY userid, lessonid, videoid, createdat) AS rn FROM videotimeview4) q WHERE rn > %d AND rn < %d ORDER BY userid, lessonid, videoid, createdat" % (rowStart, rowEnd)
- ulvdata = hiveContext.sql(query).collect()
-
- datapoints = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list))))
- for x in ulvdata:
- id = str(x['id'])
- userid = str(x['userid'])
- lessonid = str(x['lessonid'])
- videoid = str(x['videoid'])
- sceneid = x['sceneid']
- time = str(x['createdat'])[:-5]
- datapoints[userid][lessonid][videoid][time].append((sceneid, id))
-
- datapointsToDelete = []
- for u, lessons in sorted(datapoints.items()):
- for l,video in sorted(lessons.items()):
- for v,time in sorted(video.items()):
- for t,scenes in sorted(time.items()):
- if len(scenes) > 1:
- sortedScenes = sorted(scenes, key=lambda tup: tup[0])
- datapointsToDelete.extend(sortedScenes[1:])
- print u, l, v, t, sortedScenes
-
- # sudo code
- # delete datapoints from other db
- # cleanup memory
- # get next batch of data
- rowStart = rowStart + 10000
- rowEnd = rowEnd + 10000
-