Find minting data (Moralis)

[11]:
"""
Update Parameters Here
"""
COLLECTION_NAME = "Quaks"
CONTRACT = "0x07bbdaf30e89ea3ecf6cadc80d6e7c4b0843c729"
CHAIN = "eth"

"""
Optional parameters
"""

KEEP_ALL_DATA = False  # set to TRUE to keep the raw JSON on disk

MAX_RESULTS = 100  # max results per request
TIME_DELTA = 1  # time to wait between successful calls
TIME_DELTA_2 = 5  # time to wait after API throttling message
[ ]:
"""
A Moralis API Key is required.
The free tier includes one and is enough to get minting data.

AVAILABLE CHAINS:
    eth, ropsten, rinkeby, goerli, kovan,
    polygon, mumbai, bsc, bsc testnet,
    avalanche, avalanche testnet, fantom
"""

import os
import requests
import json
import time
import pandas as pd
from pandas import json_normalize

from honestnft_utils import config
from honestnft_utils import constants


def get_mintdata(
    COLLECTION_NAME: str,
    CONTRACT: str,
    CHAIN: str,
) -> None:

    RARITY_CSV = f"{config.RARITY_FOLDER}/{COLLECTION_NAME}_raritytools.csv"

    print(f"Rarity data loaded from: {RARITY_CSV}")
    RARITY_DB = pd.read_csv(RARITY_CSV)
    headers = {"Content-type": "application/json", "x-api-key": config.MORALIS_API_KEY}

    print(f"Getting minting data for {COLLECTION_NAME}")
    more_results = True
    page = 1
    start_time = time.time()
    all_data = list()  # empty list to store data as it comes
    cursor = ""
    while more_results:
        if cursor == "":
            url = "https://deep-index.moralis.io/api/v2/nft/{}/transfers?chain={}&format=decimal&limit={}".format(
                CONTRACT, CHAIN, MAX_RESULTS
            )
        else:
            url = "https://deep-index.moralis.io/api/v2/nft/{}/transfers?chain={}&format=decimal&limit={}&cursor={}".format(
                CONTRACT, CHAIN, MAX_RESULTS, cursor
            )

        response = requests.get(url, headers=headers)
        response_data = response.json()

        if response.status_code == 200:
            cursor = response_data["cursor"]

            PATH = (
                f"{config.MINTING_FOLDER}/{COLLECTION_NAME}/{page * MAX_RESULTS}.json"
            )

            # add new data to existing list
            all_data.extend(response_data["result"])

            page += 1

            # if results in this response is less than MAX_RESULTS then it's the last page
            if len(response_data["result"]) < MAX_RESULTS:
                more_results = False
            else:
                time.sleep(TIME_DELTA)

        elif response.status_code in [429, 503, 520]:
            print(
                f"Got a {response.status_code} response from the server. Waiting {TIME_DELTA_2} seconds and retrying"
            )
            time.sleep(TIME_DELTA_2)

        else:
            print(f"status_code = {response.status_code}")
            print("Received a unexpected error from Moralis API. Closing process.")
            print(response.json())
            more_results = False
        cursor = response.json()["cursor"] if "cursor" in response.json() else None
        more_results = cursor != ""

    # Save full json data to one master file
    if KEEP_ALL_DATA:
        folder = f"{config.MINTING_FOLDER}/{COLLECTION_NAME}/"
        if not os.path.exists(folder):
            os.mkdir(folder)

        PATH = f"{config.MINTING_FOLDER}/{COLLECTION_NAME}/{COLLECTION_NAME}.json"
        with open(PATH, "w") as destination_file:
            json.dump(all_data, destination_file)

    df = json_normalize(all_data)
    # remove non minting rows
    df = df.loc[df["from_address"] == constants.MINT_ADDRESS]

    # make sure token_id is an integer
    df["token_id"] = df["token_id"].astype(int)

    # add rarity rank to minting data
    df = df.merge(RARITY_DB, left_on="token_id", right_on="TOKEN_ID")

    # discard unwanted columns
    df = df[
        [
            "transaction_hash",
            "to_address",
            "token_id",
            "from_address",
            "Rank",
            "block_timestamp",
        ]
    ]

    df.drop_duplicates(subset=["token_id"], inplace=True)

    # get matching columns names to HonestNFT csv format
    df.columns = ["txid", "to_account", "TOKEN_ID", "current_owner", "rank", "time"]

    # clean 'time' field to make it compatible with the csv produced by 'find_minting_data.ipynb'
    df["time"] = df["time"].str.replace(".000Z", "")

    df.to_csv(f"{config.MINTING_FOLDER}/{COLLECTION_NAME}_minting.csv")

    print("--- %s seconds ---" % (round(time.time() - start_time, 1)))
    print("finished")


get_mintdata(COLLECTION_NAME, CONTRACT, CHAIN)