diff --git a/lib/api.js b/lib/api.js index 7c4aec3..8f815e7 100644 --- a/lib/api.js +++ b/lib/api.js @@ -6,15 +6,18 @@ const async = require("async"); const debug = require("debug")("api"); const btcValidator = require('wallet-address-validator'); const cnUtil = require('cryptonote-util'); -let bodyParser = require('body-parser'); -let jwt = require('jsonwebtoken'); // used to create, sign, and verify tokens +const bodyParser = require('body-parser'); +const jwt = require('jsonwebtoken'); // used to create, sign, and verify tokens const crypto = require('crypto'); -let cors = require('cors'); - +const cors = require('cors'); +const zmq = require('zmq'); +const sock = zmq.socket('sub'); let addressBase58Prefix = cnUtil.address_decode(new Buffer(global.config.pool.address)); let threadName = ""; let workerList = []; +let server; +let io; if (cluster.isMaster) { threadName = "(Master) "; @@ -22,10 +25,6 @@ if (cluster.isMaster) { threadName = "(Worker " + cluster.worker.id + " - " + process.pid + ") "; } -app.use(cors()); -app.use(bodyParser.urlencoded({extended: false})); -app.use(bodyParser.json()); - let pool_list = []; if(global.config.pplns.enable === true){ pool_list.push('pplns'); @@ -37,6 +36,248 @@ if(global.config.solo.enable === true){ pool_list.push('solo'); } +app.use(cors()); +app.use(bodyParser.urlencoded({extended: false})); +app.use(bodyParser.json()); + +if (cluster.isMaster) { + let numWorkers = require('os').cpus().length; + console.log('Master cluster setting up ' + numWorkers + ' workers...'); + + for (let i = 0; i < numWorkers; i++) { + let worker = cluster.fork(); + workerList.push(worker); + } + + cluster.on('online', function (worker) { + console.log('Worker ' + worker.process.pid + ' is online'); + }); + + cluster.on('exit', function (worker, code, signal) { + console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal); + console.log('Starting a new worker'); + worker = cluster.fork(); + workerList.push(worker); + }); +} else { + server = app.listen(8001, function () { + console.log('Process ' + process.pid + ' is listening to all incoming requests'); + }); + io = require('socket.io')(server); + // Init the ZMQ listeners here. + sock.connect('tcp://127.0.0.1:3000'); + sock.connect('tcp://127.0.0.1:3001'); + // miner_hash_graph - Hash graphs updated + // miner_hash_stats - Miner hashes updated + // network_block_info - New block information + // pool_stats - Pool statistics update + sock.subscribe(''); +} + +// SocketIO Routes +// =============== +io.on('connection', (socket) => { + socket.on('room', function(room) { + socket.join(room); + }); +}); + +// As the majority of updates come in upstream from the main SocketIO server, we use ZMQ to manage this. +// The following is the ZMQ logic. Pray to whatever deity you like. + +sock.on('message', function(topic, message) { + /* + Registered ZMQ Messages: + miner_hash_graph - Hash graphs updated + miner_hash_stats - Miner hashes updated + network_block_info - New block information + pool_stats - Pool statistics update + payments - Payments complete - Trip the frontend to hit the frontend for new payment information + + Registered Rooms: + payments + hash_chart_
- equiv to https://api.xmrpool.net/miner/
/chart/hashrate/allWorkers + worker_stats_
- equiv to https://api.xmrpool.net/miner/
/stats/allWorkers + worker_ids_
- equiv to https://api.xmrpool.net/miner/
/identifiers + address_stats_
- equiv to https://api.xmrpool.net/miner/
/stats + network_block_info - equiv to https://api.xmrpool.net/network/stats + pool_stats_ - equiv to https://api.xmrpool.net/pool/stats/ + pool_stats - equiv to https://api.xmrpool.net/pool/stats/ + */ + switch(topic){ + case 'payments': + io.sockets.in('payments').emit('message', 'newPaymentsAvailable'); + break; + case 'miner_hash_graph': + message = JSON.parse(message); + message.forEach(function(address){ + io.sockets.in('hash_chart_'+address).emit('message', JSON.stringify(getAllWorkerHashCharts(address))); + }); + break; + case 'miner_hash_stats': + message = JSON.parse(message); + message.forEach(function(address){ + io.sockets.in('address_stats_'+address).emit('message', JSON.stringify(getAddressStats(address))); + io.sockets.in('worker_stats_'+address).emit('message', JSON.stringify(getAllWorkerStats(address))); + io.sockets.in('worker_ids_'+address).emit('message', JSON.stringify(global.database.getCache(address + '_identifiers'))); + }); + break; + case 'network_block_info': + io.sockets.in('block_update').emit('message', JSON.stringify(global.database.getCache('networkBlockInfo'))); + break; + case 'pool_stats': + if (message === 'global'){ + let localCache = global.database.getCache('pool_stats_global'); + delete(localCache.minerHistory); + delete(localCache.hashHistory); + let lastPayment = global.database.getCache('lastPaymentCycle'); + io.sockets.in('pool_stats').emit('message', JSON.stringify({pool_list: pool_list, pool_statistics: localCache, last_payment: !lastPayment ? 0 : lastPayment})); + } else { + let pool_type = message; + let localCache; + switch (pool_type) { + case 'pplns': + localCache = global.database.getCache('pool_stats_pplns'); + localCache.fee = global.config.payout.pplnsFee; + break; + case 'pps': + localCache = global.database.getCache('pool_stats_pps'); + localCache.fee = global.config.payout.ppsFee; + break; + case 'solo': + localCache = global.database.getCache('pool_stats_solo'); + localCache.fee = global.config.payout.soloFee; + break; + case 'default': + io.sockets.in('pool_stats_' + message).emit('message', JSON.stringify({'error': 'Invalid pool type'})); + } + delete(localCache.minerHistory); + delete(localCache.hashHistory); + io.sockets.in('pool_stats_' + message).emit('message', JSON.stringify({pool_statistics: localCache})); + } + } +}); + +// Support Functions that are reused now +function getAllWorkerHashCharts(address){ + let identifiers = global.database.getCache(address + '_identifiers'); + let returnData = {global: global.database.getCache(address)['hashHistory']}; + if (identifiers !== false){ + identifiers.sort(); + } else { + return returnData; + } + let intCounter = 0; + identifiers.forEach(function(identifier){ + returnData[identifier] = global.database.getCache(address+"_"+identifier)['hashHistory']; + intCounter += 1; + if (intCounter === identifiers.length){ + return returnData; + } + }); +} + +function getAllWorkerStats(address){ + let identifiers = global.database.getCache(address + '_identifiers'); + let globalCache = global.database.getCache(address); + let returnData = {global: { + lts: Math.floor(globalCache.lastHash / 1000), + identifer: 'global', + hash: globalCache.hash, + totalHash: globalCache.totalHashes + }}; + let intCounter = 0; + if (identifiers === false){ + return returnData; + } + identifiers.sort().forEach(function(identifier){ + let cachedData = global.database.getCache(address+"_"+identifier); + returnData[identifier] = { + lts: Math.floor(cachedData.lastHash / 1000), + identifer: identifier, + hash: cachedData.hash, + totalHash: cachedData.totalHashes + }; + intCounter += 1; + if (intCounter === identifiers.length){ + return returnData; + } + }); +} + +function getAddressStats(address){ + let address_parts = address.split('.'); + let address_pt = address_parts[0]; + let payment_id = address_parts[1]; + let cachedData = global.database.getCache(address); + let paidQuery = "SELECT SUM(amount) as amt FROM payments WHERE payment_address = ? AND payment_id = ?"; + let txnQuery = "SELECT count(id) as amt FROM payments WHERE payment_address = ? AND payment_id = ?"; + let unpaidQuery = "SELECT SUM(amount) as amt FROM balance WHERE payment_address = ? AND payment_id = ?"; + if (typeof(payment_id) === 'undefined') { + paidQuery = "SELECT SUM(amount) as amt FROM payments WHERE payment_address = ? AND payment_id IS ?"; + txnQuery = "SELECT count(id) as amt FROM payments WHERE payment_address = ? AND payment_id IS ?"; + unpaidQuery = "SELECT SUM(amount) as amt FROM balance WHERE payment_address = ? AND payment_id IS ?"; + } + async.waterfall([ + function (callback) { + debug(threadName + "Checking Influx for last 10min avg for /miner/address/stats"); + return callback(null, {hash: cachedData.hash, identifier: 'global', lastHash: Math.floor(cachedData.lastHash / 1000), + totalHashes: cachedData.totalHashes, validShares: Number(cachedData.goodShares), invalidShares: Number(cachedData.badShares)}); + }, + function (returnData, callback) { + debug(threadName + "Checking MySQL total amount paid for /miner/address/stats"); + global.mysql.query(paidQuery, [address_pt, payment_id]).then(function (rows) { + if (typeof(rows[0]) === 'undefined') { + returnData.amtPaid = 0; + } else { + returnData.amtPaid = rows[0].amt; + if (returnData.amtPaid === null) { + returnData.amtPaid = 0; + } + } + return callback(null, returnData); + }); + }, + function (returnData, callback) { + debug(threadName + "Checking MySQL total amount unpaid for /miner/address/stats"); + global.mysql.query(unpaidQuery, [address_pt, payment_id]).then(function (rows) { + if (typeof(rows[0]) === 'undefined') { + returnData.amtDue = 0; + } else { + returnData.amtDue = rows[0].amt; + if (returnData.amtDue === null) { + returnData.amtDue = 0; + } + } + return callback(null, returnData); + }); + }, + function (returnData, callback) { + debug(threadName + "Checking MySQL total amount unpaid for /miner/address/stats"); + global.mysql.query(txnQuery, [address_pt, payment_id]).then(function (rows) { + if (typeof(rows[0]) === 'undefined') { + returnData.txnCount = 0; + } else { + returnData.txnCount = rows[0].amt; + if (returnData.txnCount === null) { + returnData.txnCount = 0; + } + } + return callback(true, returnData); + }); + } + ], function (err, result) { + debug(threadName + "Result information for " + address + ": " + JSON.stringify(result)); + if (err === true) { + return result; + } + if (err) { + console.error(threadName + "Error within the miner stats identifier func"); + return {'error': err.toString()}; + } + }); +} + // ROUTES FOR OUR API // ============================================================================= @@ -300,32 +541,7 @@ app.get('/miner/:address/payments', function (req, res) { }); app.get('/miner/:address/stats/allWorkers', function (req, res) { - let address = req.params.address; - let identifiers = global.database.getCache(address + '_identifiers'); - let globalCache = global.database.getCache(address); - let returnData = {global: { - lts: Math.floor(globalCache.lastHash / 1000), - identifer: 'global', - hash: globalCache.hash, - totalHash: globalCache.totalHashes - }}; - let intCounter = 0; - if (identifiers === false){ - return res.json(returnData); - } - identifiers.sort().forEach(function(identifier){ - let cachedData = global.database.getCache(req.params.address+"_"+identifier); - returnData[identifier] = { - lts: Math.floor(cachedData.lastHash / 1000), - identifer: identifier, - hash: cachedData.hash, - totalHash: cachedData.totalHashes - }; - intCounter += 1; - if (intCounter === identifiers.length){ - return res.json(returnData); - } - }); + return res.json(getAllWorkerStats(req.params.address)); }); app.get('/miner/:address/stats/:identifier', function (req, res) { @@ -353,22 +569,7 @@ app.get('/miner/:address/chart/hashrate', function (req, res) { }); app.get('/miner/:address/chart/hashrate/allWorkers', function (req, res) { - let address = req.params.address; - let identifiers = global.database.getCache(address + '_identifiers'); - let returnData = {global: global.database.getCache(req.params.address)['hashHistory']}; - if (identifiers !== false){ - identifiers.sort(); - } else { - return res.json(returnData); - } - let intCounter = 0; - identifiers.forEach(function(identifier){ - returnData[identifier] = global.database.getCache(req.params.address+"_"+identifier)['hashHistory']; - intCounter += 1; - if (intCounter === identifiers.length){ - return res.json(returnData); - } - }); + return res.json(getAllWorkerHashCharts(req.params.address)); }); app.get('/miner/:address/chart/hashrate/:identifier', function (req, res) { @@ -376,77 +577,7 @@ app.get('/miner/:address/chart/hashrate/:identifier', function (req, res) { }); app.get('/miner/:address/stats', function (req, res) { - let address = req.params.address; - let address_parts = req.params.address.split('.'); - let address_pt = address_parts[0]; - let payment_id = address_parts[1]; - let cachedData = global.database.getCache(address); - let paidQuery = "SELECT SUM(amount) as amt FROM payments WHERE payment_address = ? AND payment_id = ?"; - let txnQuery = "SELECT count(id) as amt FROM payments WHERE payment_address = ? AND payment_id = ?"; - let unpaidQuery = "SELECT SUM(amount) as amt FROM balance WHERE payment_address = ? AND payment_id = ?"; - if (typeof(payment_id) === 'undefined') { - paidQuery = "SELECT SUM(amount) as amt FROM payments WHERE payment_address = ? AND payment_id IS ?"; - txnQuery = "SELECT count(id) as amt FROM payments WHERE payment_address = ? AND payment_id IS ?"; - unpaidQuery = "SELECT SUM(amount) as amt FROM balance WHERE payment_address = ? AND payment_id IS ?"; - } - async.waterfall([ - function (callback) { - debug(threadName + "Checking Influx for last 10min avg for /miner/address/stats"); - return callback(null, {hash: cachedData.hash, identifier: 'global', lastHash: Math.floor(cachedData.lastHash / 1000), - totalHashes: cachedData.totalHashes, validShares: Number(cachedData.goodShares), invalidShares: Number(cachedData.badShares)}); - }, - function (returnData, callback) { - debug(threadName + "Checking MySQL total amount paid for /miner/address/stats"); - global.mysql.query(paidQuery, [address_pt, payment_id]).then(function (rows) { - if (typeof(rows[0]) === 'undefined') { - returnData.amtPaid = 0; - } else { - returnData.amtPaid = rows[0].amt; - if (returnData.amtPaid === null) { - returnData.amtPaid = 0; - } - } - return callback(null, returnData); - }); - }, - function (returnData, callback) { - debug(threadName + "Checking MySQL total amount unpaid for /miner/address/stats"); - global.mysql.query(unpaidQuery, [address_pt, payment_id]).then(function (rows) { - if (typeof(rows[0]) === 'undefined') { - returnData.amtDue = 0; - } else { - returnData.amtDue = rows[0].amt; - if (returnData.amtDue === null) { - returnData.amtDue = 0; - } - } - return callback(null, returnData); - }); - }, - function (returnData, callback) { - debug(threadName + "Checking MySQL total amount unpaid for /miner/address/stats"); - global.mysql.query(txnQuery, [address_pt, payment_id]).then(function (rows) { - if (typeof(rows[0]) === 'undefined') { - returnData.txnCount = 0; - } else { - returnData.txnCount = rows[0].amt; - if (returnData.txnCount === null) { - returnData.txnCount = 0; - } - } - return callback(true, returnData); - }); - } - ], function (err, result) { - debug(threadName + "Result information for " + address + ": " + JSON.stringify(result)); - if (err === true) { - return res.json(result); - } - if (err) { - console.error(threadName + "Error within the miner stats identifier func"); - return res.json({'error': err.toString()}); - } - }); + return res.json(getAddressStats(req.params.address)); }); // Authentication @@ -805,29 +936,4 @@ adminRoutes.get('/userList', function (req, res) { app.use('/authed', secureRoutes); app.use('/admin', adminRoutes); -// Authenticated routes - -if (cluster.isMaster) { - let numWorkers = require('os').cpus().length; - console.log('Master cluster setting up ' + numWorkers + ' workers...'); - - for (let i = 0; i < numWorkers; i++) { - let worker = cluster.fork(); - workerList.push(worker); - } - - cluster.on('online', function (worker) { - console.log('Worker ' + worker.process.pid + ' is online'); - }); - - cluster.on('exit', function (worker, code, signal) { - console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal); - console.log('Starting a new worker'); - worker = cluster.fork(); - workerList.push(worker); - }); -} else { - app.listen(8001, function () { - console.log('Process ' + process.pid + ' is listening to all incoming requests'); - }); -} +// Authenticated routes \ No newline at end of file diff --git a/lib/payments.js b/lib/payments.js index ec3e7ef..b840a1c 100644 --- a/lib/payments.js +++ b/lib/payments.js @@ -4,12 +4,15 @@ const async = require("async"); const debug = require("debug")("payments"); const request = require('request-json'); const range = require('range'); +const zmq = require('zmq'); +const sock = zmq.socket('pub'); let hexChars = new RegExp("[0-9a-f]+"); let bestExchange = global.config.payout.bestExchange; let xmrAPIClient = request.createClient('https://xmr.to/api/v1/xmr2btc/'); let extraPaymentRound = false; let paymentTimer = null; +sock.bindSync('tcp://127.0.0.1:3001'); let shapeshiftQueue = async.queue(function (task, callback) { // Amount needs to be shifted in as a non-completed value, as the wallet will only take non-complete values.. @@ -344,6 +347,7 @@ paymentQueue.drain = function(){ extraPaymentRound = false; paymentTimer = setInterval(makePayments, global.config.payout.timer * 60 * 1000); global.database.setCache('lastPaymentCycle', Math.floor(Date.now()/1000)); + sock.send(['payments', 'complete']); }; function updateShapeshiftCompletion() { diff --git a/lib/worker.js b/lib/worker.js index 4a914ba..b329937 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -2,9 +2,12 @@ const debug = require("debug")("worker"); const async = require("async"); const sprintf = require("sprintf-js").sprintf; +const zmq = require('zmq'); +const sock = zmq.socket('pub'); let threadName = "Worker Server "; let cycleCount = 0; +sock.bindSync('tcp://127.0.0.1:3000'); function updateShareStats() { // This is an omni-worker to deal with all things share-stats related @@ -12,6 +15,7 @@ function updateShareStats() { // Buffer lengths? You guessed it, configured in SQL. // Stats timeouts are 30 seconds, so everything for buffers should be there. let currentTime = Date.now(); + let activeAddresses = []; async.waterfall([ function (callback) { global.coinFuncs.getLastBlockHeader(function (body) { @@ -173,6 +177,9 @@ function updateShareStats() { if (globalMinerList.indexOf(miner) === -1) { globalMinerList.push(miner); } + if (miner.indexOf('-') === -1){ + activeAddresses.push(miner); + } let cachedData = global.database.getCache(miner); if (cachedData !== false) { cachedData.hash = Math.floor(localStats.miners[miner] / 600); @@ -241,7 +248,9 @@ function updateShareStats() { cycleCount += 1; if (cycleCount === 6){ cycleCount = 0; + sock.send(['miner_hash_graph', JSON.stringify(activeAddresses)]); } + sock.send(['miner_hash_stats', JSON.stringify(activeAddresses)]); }); setTimeout(updateShareStats, 10000); } @@ -333,6 +342,7 @@ function updatePoolStats(poolType) { totalPayments: result[7] || 0, roundHashes: result[8] || 0 }); + sock.send(['pool_stats', poolType]); }); } @@ -430,6 +440,7 @@ function updateBlockHeader() { value: body.result.block_header.reward, ts: body.result.block_header.timestamp }); + sock.send(['network_block_info', 'complete']); } else { console.error("GetLastBlockHeader Error during block header update"); } diff --git a/package.json b/package.json index 9fc45eb..58731dd 100644 --- a/package.json +++ b/package.json @@ -36,8 +36,10 @@ "request": "^2.79.0", "request-json": "0.6.1", "shapeshift.io": "1.3.0", + "socketio": "^1.0.0", "sprintf-js": "^1.0.3", "uuid": "3.0.1", - "wallet-address-validator": "0.1.0" + "wallet-address-validator": "0.1.0", + "zmq": "^2.15.3" } }