@async_generator async def execute_select_query(engine, query_string, fetch_size, flush_after=None): async with engine.connect() as conn: try: result = await conn.execute(query_string) empty = False while not empty: batch = await result.fetchmany(fetch_size) if not batch: empty = True for row in batch: await yield_(row) finally: if flush_after is not None: await conn.execute(f"FLUSH TABLE `{flush_after}`") async def primary_key_finder(engine, table_name): async with engine.connect() as conn: get_key_query = await conn.execute( f"SHOW KEYS FROM `{table_name}` WHERE Key_name = %s", ("PRIMARY",) ) await conn.execute(f"FLUSH TABLE `{table_name}`") keys = await get_key_query.fetchall() return [row["Column_name"] for row in keys] async def check_for_mojibake(row, found_mojibake, database, table, has_pri_key=False): for column, data in row.items(): if data is None or not column.startswith("HEX"): continue field_bytes = binascii.unhexlify(data) try: field_bytes.decode("ascii") except UnicodeDecodeError: try: field_bytes.decode("utf-8") if not has_pri_key: log_keyless_table(database, table) raise ValueError found_mojibake[database][table][rm_hex_wrap(column)]["rows_found"] += 1 except UnicodeDecodeError: pass async def table_mojibake_scan( engine, database, table, table_columns, found_mojibake, table_limiter ): hex_wrapped_cols = ", ".join( "HEX(`{0}`)".format(column) for column in table_columns["text_columns"] ) has_pri_key = False pri_key = await primary_key_finder(engine, table) if pri_key: has_pri_key = True key_cols = ", ".join("`{0}`".format(key_col) for key_col in pri_key) cols_query = f"SELECT {hex_wrapped_cols}, {key_cols} FROM `{table}`" else: cols_query = f"SELECT {hex_wrapped_cols} FROM `{table}`" with warnings.catch_warnings(): warnings.simplefilter("ignore") async with aclosing( execute_select_query(engine, cols_query, fetch_size=500, flush_after=table) ) as text_columns: async for row in text_columns: try: await check_for_mojibake( row, found_mojibake, database, table, has_pri_key ) except ValueError: break table_limiter.release()