Airdrop or Mint

[ ]:
"""
Update Parameters Here
"""

COLLECTION = "Quaks"
BLOCKCHAIN = "ethereum"  # Accepted values: "arbitrum", "avalanche", "binance", "ethereum", "fantom", "optimism", "polygon"
[ ]:
import concurrent.futures
import os

import numpy as np
import pandas as pd
from web3 import Web3

from honestnft_utils import config


if BLOCKCHAIN == "arbitrum":
    endpoint = config.ARBITRUM_ENDPOINT
elif BLOCKCHAIN == "avalanche":
    endpoint = config.AVALANCHE_ENDPOINT
elif BLOCKCHAIN == "binance":
    endpoint = config.BINANCE_ENDPOINT
elif BLOCKCHAIN == "ethereum":
    endpoint = config.ENDPOINT
elif BLOCKCHAIN == "fantom":
    endpoint = config.FANTOM_ENDPOINT
elif BLOCKCHAIN == "optimism":
    endpoint = config.OPTIMISM_ENDPOINT
elif BLOCKCHAIN == "polygon":
    endpoint = config.POLYGON_ENDPOINT
else:
    raise ValueError(f"Blockchain {BLOCKCHAIN} not supported")


def analyse_transaction(df_series: pd.Series) -> str:
    txid = df_series["txid"].values[0]
    recipient = df_series["to_account"].values[0]
    web3 = Web3(Web3.HTTPProvider(endpoint))
    transaction = web3.eth.get_transaction(transaction_hash=txid)
    if transaction["from"].lower() == recipient.lower():
        return "Mint"
    else:
        return "Airdrop"


def analyse_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    grouped = df.groupby(["txid", "to_account"])
    df = df.merge(
        grouped.apply(analyse_transaction).rename("airdrop_or_mint"),
        on=["txid", "to_account"],
    )
    return df


MINT_PATH = f"{config.MINTING_FOLDER}/{COLLECTION}_minting.csv"
MINT_DB = pd.read_csv(MINT_PATH)

# Drop existing airdrop_or_mint column
MINT_DB.drop(columns=["airdrop_or_mint"], inplace=True, errors="ignore")

threads = min(32, os.cpu_count() + 4)  # type: ignore
df_results = []
splitted_df = np.array_split(ary=MINT_DB, indices_or_sections=threads)

with concurrent.futures.ProcessPoolExecutor(max_workers=threads) as executor:
    results = [executor.submit(analyse_dataframe, df=df) for df in splitted_df]
    for result in concurrent.futures.as_completed(results):
        try:
            df_results.append(result.result())
        except Exception as exc:
            print(exc)
            raise

combined_df = pd.concat(df_results)

combined_df.sort_values(by=["TOKEN_ID"], ascending=True, inplace=True)

combined_df.to_csv(MINT_PATH, index=False)

# Count number of mints and airdrops
mints = combined_df[combined_df["airdrop_or_mint"] == "Mint"].shape[0]
airdrops = combined_df[combined_df["airdrop_or_mint"] == "Airdrop"].shape[0]
print(f"Mints: {mints}")
print(f"Airdrops: {airdrops}")
print(f"Total: {mints + airdrops}")
Mints: 5950
Airdrops: 0
Total: 5950