spacepaste

  1.  
  2. @async_generator
  3. async def execute_select_query(engine, query_string, fetch_size, flush_after=None):
  4. async with engine.connect() as conn:
  5. try:
  6. result = await conn.execute(query_string)
  7. empty = False
  8. while not empty:
  9. batch = await result.fetchmany(fetch_size)
  10. if not batch:
  11. empty = True
  12. for row in batch:
  13. await yield_(row)
  14. finally:
  15. if flush_after is not None:
  16. await conn.execute(f"FLUSH TABLE `{flush_after}`")
  17. async def primary_key_finder(engine, table_name):
  18. async with engine.connect() as conn:
  19. get_key_query = await conn.execute(
  20. f"SHOW KEYS FROM `{table_name}` WHERE Key_name = %s", ("PRIMARY",)
  21. )
  22. await conn.execute(f"FLUSH TABLE `{table_name}`")
  23. keys = await get_key_query.fetchall()
  24. return [row["Column_name"] for row in keys]
  25. async def check_for_mojibake(row, found_mojibake, database, table, has_pri_key=False):
  26. for column, data in row.items():
  27. if data is None or not column.startswith("HEX"):
  28. continue
  29. field_bytes = binascii.unhexlify(data)
  30. try:
  31. field_bytes.decode("ascii")
  32. except UnicodeDecodeError:
  33. try:
  34. field_bytes.decode("utf-8")
  35. if not has_pri_key:
  36. log_keyless_table(database, table)
  37. raise ValueError
  38. found_mojibake[database][table][rm_hex_wrap(column)]["rows_found"] += 1
  39. except UnicodeDecodeError:
  40. pass
  41. async def table_mojibake_scan(
  42. engine, database, table, table_columns, found_mojibake, table_limiter
  43. ):
  44. hex_wrapped_cols = ", ".join(
  45. "HEX(`{0}`)".format(column) for column in table_columns["text_columns"]
  46. )
  47. has_pri_key = False
  48. pri_key = await primary_key_finder(engine, table)
  49. if pri_key:
  50. has_pri_key = True
  51. key_cols = ", ".join("`{0}`".format(key_col) for key_col in pri_key)
  52. cols_query = f"SELECT {hex_wrapped_cols}, {key_cols} FROM `{table}`"
  53. else:
  54. cols_query = f"SELECT {hex_wrapped_cols} FROM `{table}`"
  55. with warnings.catch_warnings():
  56. warnings.simplefilter("ignore")
  57. async with aclosing(
  58. execute_select_query(engine, cols_query, fetch_size=500, flush_after=table)
  59. ) as text_columns:
  60. async for row in text_columns:
  61. try:
  62. await check_for_mojibake(
  63. row, found_mojibake, database, table, has_pri_key
  64. )
  65. except ValueError:
  66. break
  67. table_limiter.release()
  68.