fix price sync
This commit is contained in:
parent
1c3a552c82
commit
93cc02874f
3 changed files with 76 additions and 90 deletions
|
|
@ -3,7 +3,7 @@ import sequelize from "../database/sequelize";
|
|||
|
||||
class ZanoPrice extends Model {
|
||||
declare readonly id: number;
|
||||
declare ts_utc: number;
|
||||
declare ts_utc: string;
|
||||
declare price_close: string;
|
||||
declare src: string;
|
||||
declare raw: object;
|
||||
|
|
@ -20,36 +20,19 @@ export type IZanoPrice = Omit<
|
|||
ZanoPrice.init(
|
||||
{
|
||||
id: { type: DataTypes.INTEGER, primaryKey: true, autoIncrement: true },
|
||||
|
||||
ts_utc: {
|
||||
type: DataTypes.BIGINT,
|
||||
type: DataTypes.STRING,
|
||||
allowNull: false,
|
||||
unique: true,
|
||||
get() {
|
||||
const v = this.getDataValue("ts_utc") as unknown;
|
||||
return v == null ? null : Number(v);
|
||||
},
|
||||
set(v: number | string | Date) {
|
||||
if (v instanceof Date) {
|
||||
this.setDataValue("ts_utc", v.getTime());
|
||||
} else {
|
||||
this.setDataValue("ts_utc", Number(v));
|
||||
}
|
||||
},
|
||||
unique: true
|
||||
},
|
||||
|
||||
price_close: { type: DataTypes.DECIMAL(20, 10), allowNull: false },
|
||||
src: { type: DataTypes.STRING, allowNull: false, defaultValue: "mexc" },
|
||||
raw: { type: DataTypes.JSONB, allowNull: false },
|
||||
},
|
||||
{
|
||||
sequelize,
|
||||
modelName: "zano_price_4h",
|
||||
modelName: "zano_price",
|
||||
timestamps: true,
|
||||
indexes: [
|
||||
{ fields: ["ts_utc"], unique: true },
|
||||
{ fields: ["src"] },
|
||||
],
|
||||
}
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ import BigNumber from "bignumber.js";
|
|||
import next from "next";
|
||||
import { rateLimit } from 'express-rate-limit';
|
||||
import bodyParser from 'body-parser';
|
||||
import { syncLatest } from "./services/zanoPrice.service";
|
||||
import { syncHistoricalPrice, syncLatestPrice } from "./services/zanoPrice.service";
|
||||
import ZanoPrice from "./schemes/ZanoPrice";
|
||||
import cron from "node-cron";
|
||||
|
||||
|
|
@ -68,7 +68,7 @@ async function waitForDb() {
|
|||
(async () => {
|
||||
|
||||
await waitForDb();
|
||||
await syncLatest();
|
||||
await syncLatestPrice().then(() => syncHistoricalPrice());
|
||||
|
||||
io.engine.on('initial_headers', (headers, req) => {
|
||||
headers['Access-Control-Allow-Origin'] = config.frontend_api
|
||||
|
|
@ -2082,7 +2082,7 @@ async function waitForDb() {
|
|||
|
||||
cron.schedule("0 */4 * * *", async () => {
|
||||
console.log("[CRON] Syncing latest Zano price...");
|
||||
await syncLatest();
|
||||
await syncLatestPrice();
|
||||
}, { timezone: "UTC" });
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -2,79 +2,82 @@ import axios from "axios";
|
|||
import ZanoPrice from "../schemes/ZanoPrice";
|
||||
|
||||
const FOUR_HOURS_MS = 4 * 60 * 60 * 1000;
|
||||
const LIMIT = 1000;
|
||||
const BASE_URL = `https://api.mexc.com/api/v3/klines?symbol=ZANOUSDT&interval=4h&limit=${LIMIT}`;
|
||||
const BASE_URL = `https://api.mexc.com/api/v3/klines?symbol=ZANOUSDT&interval=1m&limit=1`;
|
||||
const HISTORY_LIMIT_MS = 365 * 24 * 60 * 60 * 1000; // 1 year
|
||||
|
||||
function alignTo4hUTC(ms: number) {
|
||||
return Math.floor(ms / FOUR_HOURS_MS) * FOUR_HOURS_MS;
|
||||
async function fetchPriceForTimestamp(timestamp: number) {
|
||||
console.log(`Fetching price for timestamp: ${timestamp}`);
|
||||
|
||||
const url = `${BASE_URL}&startTime=${timestamp}`;
|
||||
const res = await axios.get(url);
|
||||
if (!Array.isArray(res.data) || res.data.length === 0) return null;
|
||||
|
||||
return {
|
||||
ts_utc: timestamp,
|
||||
price_close: String(res.data[0][4]),
|
||||
raw: res.data[0],
|
||||
};
|
||||
}
|
||||
|
||||
async function fetch4hRange(startMs: number, endMs: number) {
|
||||
const url = `${BASE_URL}&startTime=${startMs}&endTime=${endMs}`;
|
||||
const res = await axios.get(url);
|
||||
if (!Array.isArray(res.data) || res.data.length === 0) return [];
|
||||
export async function syncHistoricalPrice() {
|
||||
const oldestPrice = await ZanoPrice.findOne({
|
||||
order: [["ts_utc", "ASC"]],
|
||||
raw: true,
|
||||
});
|
||||
console.log(oldestPrice);
|
||||
|
||||
|
||||
if (!oldestPrice) {
|
||||
throw new Error("Sync latest price before historical");
|
||||
}
|
||||
|
||||
if (parseInt(oldestPrice.ts_utc, 10) < (+new Date() - HISTORY_LIMIT_MS)) {
|
||||
return console.log(`[zano price] Historical Price already synced`);
|
||||
}
|
||||
|
||||
const timestampsToSync: number[] = [];
|
||||
|
||||
let oldestTimestamp = parseInt(oldestPrice.ts_utc, 10);
|
||||
|
||||
while (oldestTimestamp > (+new Date() - HISTORY_LIMIT_MS)) {
|
||||
oldestTimestamp -= FOUR_HOURS_MS;
|
||||
timestampsToSync.push(oldestTimestamp);
|
||||
}
|
||||
|
||||
|
||||
for (const element of timestampsToSync) {
|
||||
try {
|
||||
const price = await fetchPriceForTimestamp(element);
|
||||
if (price) {
|
||||
await ZanoPrice.create(price);
|
||||
}
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
|
||||
}
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, 2000));
|
||||
}
|
||||
|
||||
return res.data.map((row: any[]) => ({
|
||||
ts_utc: Number(row[0]),
|
||||
price_close: String(row[4]),
|
||||
raw: row,
|
||||
}));
|
||||
}
|
||||
|
||||
export async function backfill(fromMs: number, toMs: number) {
|
||||
let currentStart = alignTo4hUTC(fromMs);
|
||||
let currentEnd = currentStart + FOUR_HOURS_MS * LIMIT;
|
||||
export async function syncLatestPrice() {
|
||||
const lastRow = await ZanoPrice.findOne({
|
||||
attributes: ["ts_utc"],
|
||||
order: [["ts_utc", "DESC"]],
|
||||
raw: true,
|
||||
});
|
||||
|
||||
while (currentStart < toMs) {
|
||||
const candles = await fetch4hRange(currentStart, currentEnd);
|
||||
if (!candles.length) break;
|
||||
if (lastRow?.ts_utc && parseInt(lastRow.ts_utc, 10) > (+new Date() - FOUR_HOURS_MS)) {
|
||||
return console.log(
|
||||
`[zano price] Nothing to sync since ${new Date(parseInt(lastRow.ts_utc, 10)).toUTCString()} (UTC)`
|
||||
);
|
||||
}
|
||||
const latestPrice = await fetchPriceForTimestamp(+new Date());
|
||||
|
||||
await ZanoPrice.bulkCreate(candles, {
|
||||
updateOnDuplicate: ["price_close", "raw"],
|
||||
});
|
||||
if (!latestPrice) {
|
||||
return console.log(`Unable to fetch latest price from mexc`);
|
||||
}
|
||||
|
||||
console.log(
|
||||
`[BACKFILL] ${candles.length} candles saved, start: ${new Date(
|
||||
candles[0].ts_utc
|
||||
).toISOString()}`
|
||||
);
|
||||
|
||||
currentStart = candles[candles.length - 1].ts_utc + FOUR_HOURS_MS;
|
||||
currentEnd = currentStart + FOUR_HOURS_MS * LIMIT;
|
||||
}
|
||||
}
|
||||
|
||||
export async function syncLatest() {
|
||||
const lastRow = await ZanoPrice.findOne({
|
||||
attributes: ["ts_utc"],
|
||||
order: [["ts_utc", "DESC"]],
|
||||
raw: true,
|
||||
});
|
||||
|
||||
const nowAligned = alignTo4hUTC(Date.now());
|
||||
|
||||
let lastTsMs: number | null = null;
|
||||
if (lastRow && (lastRow as any).ts_utc != null) {
|
||||
const tmp = Number((lastRow as any).ts_utc);
|
||||
lastTsMs = Math.min(tmp, nowAligned);
|
||||
}
|
||||
|
||||
const from =
|
||||
lastTsMs !== null ? lastTsMs + FOUR_HOURS_MS : nowAligned - 365 * 24 * 60 * 60 * 1000;
|
||||
|
||||
if (from >= nowAligned) {
|
||||
console.log("[SYNC] Nothing to sync. from >= nowAligned", {
|
||||
fromISO: new Date(from).toISOString(),
|
||||
nowAlignedISO: new Date(nowAligned).toISOString(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
console.log("[SYNC] Running backfill", {
|
||||
lastTsISO: lastTsMs ? new Date(lastTsMs).toISOString() : null,
|
||||
fromISO: new Date(from).toISOString(),
|
||||
toISO: new Date(nowAligned).toISOString(),
|
||||
});
|
||||
|
||||
await backfill(from, nowAligned);
|
||||
await ZanoPrice.create(latestPrice);
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue