"""
@date: 24/02/2022
"""
from honestnft_utils import config
def get_sale_data(COLLECTION_NAME: str, CONTRACT: str, CHAIN: str, FOLDER: str) -> None:
import os
import requests
import json
import time
from pandas import json_normalize
if not os.path.exists(FOLDER):
os.mkdir(FOLDER)
headers = {"Content-type": "application/json", "x-api-key": config.MORALIS_API_KEY}
print(f"Getting sales data for {COLLECTION_NAME}")
more_results = True
page = 1
start_time = time.time()
all_data = list() # empty list to store data as it comes
cursor = ""
while more_results:
if cursor == "":
url = "https://deep-index.moralis.io/api/v2/nft/{}/trades?chain={}&marketplace=opensea&limit={}".format(
CONTRACT, CHAIN, MAX_RESULTS
)
else:
url = "https://deep-index.moralis.io/api/v2/nft/{}/trades?chain={}&marketplace=opensea&limit={}&cursor={}".format(
CONTRACT, CHAIN, MAX_RESULTS, cursor
)
print(f"getting page {page} ...")
response = requests.get(url, headers=headers)
if response.status_code == 200:
cursor = response.json()["cursor"]
print(
"Successfully received page {} of {}".format(
page + 1, int(1 + response.json()["total"] / MAX_RESULTS)
)
)
# add new data to existing list
all_data.extend(response.json()["result"])
page += 1
# if results in this response is less than MAX_RESULTS then it's the last page
if len(response.json()["result"]) < MAX_RESULTS:
more_results = False
else:
time.sleep(TIME_DELTA)
elif response.status_code in [429, 503, 520]:
print(
f"Got a {response.status_code} response from the server. Waiting {TIME_DELTA_2} seconds and retrying"
)
time.sleep(TIME_DELTA_2)
else:
print(f"status_code = {response.status_code}")
print("Received a unexpected error from Moralis API. Closing process.")
more_results = False
# Save full json data to one master file
if KEEP_ALL_DATA:
folder = f"{FOLDER}/raw_data"
if not os.path.exists(folder):
os.mkdir(folder)
PATH = f"{FOLDER}/raw_data/{COLLECTION_NAME}.json"
with open(PATH, "w") as destination_file:
json.dump(all_data, destination_file)
df = json_normalize(all_data)
df["length"] = df["token_ids"].apply(lambda x: len(x) > 1)
# expand transactions with batch sales into multiple columns
df = df.explode("token_ids")
# clean 'time' field to make it compatible with the csv produced by 'find_minting_data.ipynb'
df["block_timestamp"] = df["block_timestamp"].str.replace(".000Z", "", regex=False)
try:
# make sure token_id is an integer
df["token_ids"] = df["token_ids"].astype(int)
except:
print(
"Caught an error in one or more token_ids. Please review the data received"
)
# Catches cases where token id is an unusual high number that can't be converted to an integer
mask = df["token_ids"].str.len() < 20
df = df.loc[mask]
df["token_ids"] = df["token_ids"].astype(int)
# discard unwanted columns
df = df[
[
"transaction_hash",
"seller_address",
"buyer_address",
"token_ids",
"price",
"block_timestamp",
]
]
# get matching columns names to HonestNFT csv format
df.columns = [
"transaction_hash",
"seller_address",
"buyer_address",
"TOKEN_ID",
"price",
"saleDate",
]
df.to_csv(f"{FOLDER}/{COLLECTION_NAME}.csv")
print("--- %s seconds ---" % (round(time.time() - start_time, 1)))
print("finished")
get_sale_data(COLLECTION_NAME, CONTRACT, CHAIN, config.SALES_DATA_FOLDER)