-
- @async_generator
- async def execute_select_query(engine, query_string, fetch_size, flush_after=None):
- async with engine.connect() as conn:
- try:
- result = await conn.execute(query_string)
- empty = False
- while not empty:
- batch = await result.fetchmany(fetch_size)
- if not batch:
- empty = True
- for row in batch:
- await yield_(row)
- finally:
- if flush_after is not None:
- await conn.execute(f"FLUSH TABLE `{flush_after}`")
-
- async def primary_key_finder(engine, table_name):
- async with engine.connect() as conn:
- get_key_query = await conn.execute(
- f"SHOW KEYS FROM `{table_name}` WHERE Key_name = %s", ("PRIMARY",)
- )
- await conn.execute(f"FLUSH TABLE `{table_name}`")
- keys = await get_key_query.fetchall()
- return [row["Column_name"] for row in keys]
-
-
- async def check_for_mojibake(row, found_mojibake, database, table, has_pri_key=False):
- for column, data in row.items():
- if data is None or not column.startswith("HEX"):
- continue
- field_bytes = binascii.unhexlify(data)
- try:
- field_bytes.decode("ascii")
- except UnicodeDecodeError:
- try:
- field_bytes.decode("utf-8")
- if not has_pri_key:
- log_keyless_table(database, table)
- raise ValueError
- found_mojibake[database][table][rm_hex_wrap(column)]["rows_found"] += 1
- except UnicodeDecodeError:
- pass
-
-
- async def table_mojibake_scan(
- engine, database, table, table_columns, found_mojibake, table_limiter
- ):
- hex_wrapped_cols = ", ".join(
- "HEX(`{0}`)".format(column) for column in table_columns["text_columns"]
- )
- has_pri_key = False
- pri_key = await primary_key_finder(engine, table)
- if pri_key:
- has_pri_key = True
- key_cols = ", ".join("`{0}`".format(key_col) for key_col in pri_key)
- cols_query = f"SELECT {hex_wrapped_cols}, {key_cols} FROM `{table}`"
- else:
- cols_query = f"SELECT {hex_wrapped_cols} FROM `{table}`"
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- async with aclosing(
- execute_select_query(engine, cols_query, fetch_size=500, flush_after=table)
- ) as text_columns:
- async for row in text_columns:
- try:
- await check_for_mojibake(
- row, found_mojibake, database, table, has_pri_key
- )
- except ValueError:
- break
- table_limiter.release()
-