import concurrent.futures
import os
import numpy as np
import pandas as pd
from web3 import Web3
from honestnft_utils import config
if BLOCKCHAIN == "arbitrum":
endpoint = config.ARBITRUM_ENDPOINT
elif BLOCKCHAIN == "avalanche":
endpoint = config.AVALANCHE_ENDPOINT
elif BLOCKCHAIN == "binance":
endpoint = config.BINANCE_ENDPOINT
elif BLOCKCHAIN == "ethereum":
endpoint = config.ENDPOINT
elif BLOCKCHAIN == "fantom":
endpoint = config.FANTOM_ENDPOINT
elif BLOCKCHAIN == "optimism":
endpoint = config.OPTIMISM_ENDPOINT
elif BLOCKCHAIN == "polygon":
endpoint = config.POLYGON_ENDPOINT
else:
raise ValueError(f"Blockchain {BLOCKCHAIN} not supported")
def analyse_transaction(df_series: pd.Series) -> str:
txid = df_series["txid"].values[0]
recipient = df_series["to_account"].values[0]
web3 = Web3(Web3.HTTPProvider(endpoint))
transaction = web3.eth.get_transaction(transaction_hash=txid)
if transaction["from"].lower() == recipient.lower():
return "Mint"
else:
return "Airdrop"
def analyse_dataframe(df: pd.DataFrame) -> pd.DataFrame:
grouped = df.groupby(["txid", "to_account"])
df = df.merge(
grouped.apply(analyse_transaction).rename("airdrop_or_mint"),
on=["txid", "to_account"],
)
return df
MINT_PATH = f"{config.MINTING_FOLDER}/{COLLECTION}_minting.csv"
MINT_DB = pd.read_csv(MINT_PATH)
# Drop existing airdrop_or_mint column
MINT_DB.drop(columns=["airdrop_or_mint"], inplace=True, errors="ignore")
threads = min(32, os.cpu_count() + 4) # type: ignore
df_results = []
splitted_df = np.array_split(ary=MINT_DB, indices_or_sections=threads)
with concurrent.futures.ProcessPoolExecutor(max_workers=threads) as executor:
results = [executor.submit(analyse_dataframe, df=df) for df in splitted_df]
for result in concurrent.futures.as_completed(results):
try:
df_results.append(result.result())
except Exception as exc:
print(exc)
raise
combined_df = pd.concat(df_results)
combined_df.sort_values(by=["TOKEN_ID"], ascending=True, inplace=True)
combined_df.to_csv(MINT_PATH, index=False)
# Count number of mints and airdrops
mints = combined_df[combined_df["airdrop_or_mint"] == "Mint"].shape[0]
airdrops = combined_df[combined_df["airdrop_or_mint"] == "Airdrop"].shape[0]
print(f"Mints: {mints}")
print(f"Airdrops: {airdrops}")
print(f"Total: {mints + airdrops}")