"""
A Moralis API Key is required.
The free tier includes one and is enough to get minting data.
AVAILABLE CHAINS:
eth, ropsten, rinkeby, goerli, kovan,
polygon, mumbai, bsc, bsc testnet,
avalanche, avalanche testnet, fantom
"""
import os
import requests
import json
import time
import pandas as pd
from pandas import json_normalize
from honestnft_utils import config
from honestnft_utils import constants
def get_mintdata(
COLLECTION_NAME: str,
CONTRACT: str,
CHAIN: str,
) -> None:
RARITY_CSV = f"{config.RARITY_FOLDER}/{COLLECTION_NAME}_raritytools.csv"
print(f"Rarity data loaded from: {RARITY_CSV}")
RARITY_DB = pd.read_csv(RARITY_CSV)
headers = {"Content-type": "application/json", "x-api-key": config.MORALIS_API_KEY}
print(f"Getting minting data for {COLLECTION_NAME}")
more_results = True
page = 1
start_time = time.time()
all_data = list() # empty list to store data as it comes
cursor = ""
while more_results:
if cursor == "":
url = "https://deep-index.moralis.io/api/v2/nft/{}/transfers?chain={}&format=decimal&limit={}".format(
CONTRACT, CHAIN, MAX_RESULTS
)
else:
url = "https://deep-index.moralis.io/api/v2/nft/{}/transfers?chain={}&format=decimal&limit={}&cursor={}".format(
CONTRACT, CHAIN, MAX_RESULTS, cursor
)
response = requests.get(url, headers=headers)
response_data = response.json()
if response.status_code == 200:
cursor = response_data["cursor"]
PATH = (
f"{config.MINTING_FOLDER}/{COLLECTION_NAME}/{page * MAX_RESULTS}.json"
)
# add new data to existing list
all_data.extend(response_data["result"])
page += 1
# if results in this response is less than MAX_RESULTS then it's the last page
if len(response_data["result"]) < MAX_RESULTS:
more_results = False
else:
time.sleep(TIME_DELTA)
elif response.status_code in [429, 503, 520]:
print(
f"Got a {response.status_code} response from the server. Waiting {TIME_DELTA_2} seconds and retrying"
)
time.sleep(TIME_DELTA_2)
else:
print(f"status_code = {response.status_code}")
print("Received a unexpected error from Moralis API. Closing process.")
print(response.json())
more_results = False
cursor = response.json()["cursor"] if "cursor" in response.json() else None
more_results = cursor != ""
# Save full json data to one master file
if KEEP_ALL_DATA:
folder = f"{config.MINTING_FOLDER}/{COLLECTION_NAME}/"
if not os.path.exists(folder):
os.mkdir(folder)
PATH = f"{config.MINTING_FOLDER}/{COLLECTION_NAME}/{COLLECTION_NAME}.json"
with open(PATH, "w") as destination_file:
json.dump(all_data, destination_file)
df = json_normalize(all_data)
# remove non minting rows
df = df.loc[df["from_address"] == constants.MINT_ADDRESS]
# make sure token_id is an integer
df["token_id"] = df["token_id"].astype(int)
# add rarity rank to minting data
df = df.merge(RARITY_DB, left_on="token_id", right_on="TOKEN_ID")
# discard unwanted columns
df = df[
[
"transaction_hash",
"to_address",
"token_id",
"from_address",
"Rank",
"block_timestamp",
]
]
df.drop_duplicates(subset=["token_id"], inplace=True)
# get matching columns names to HonestNFT csv format
df.columns = ["txid", "to_account", "TOKEN_ID", "current_owner", "rank", "time"]
# clean 'time' field to make it compatible with the csv produced by 'find_minting_data.ipynb'
df["time"] = df["time"].str.replace(".000Z", "")
df.to_csv(f"{config.MINTING_FOLDER}/{COLLECTION_NAME}_minting.csv")
print("--- %s seconds ---" % (round(time.time() - start_time, 1)))
print("finished")
get_mintdata(COLLECTION_NAME, CONTRACT, CHAIN)