Adding ZMQ and SocketIO access.

This commit is contained in:
Alexander Blair 2017-03-05 13:56:06 -08:00
parent 9e08dbbc90
commit 1f9fe310c0
4 changed files with 271 additions and 148 deletions

View file

@ -6,15 +6,18 @@ const async = require("async");
const debug = require("debug")("api");
const btcValidator = require('wallet-address-validator');
const cnUtil = require('cryptonote-util');
let bodyParser = require('body-parser');
let jwt = require('jsonwebtoken'); // used to create, sign, and verify tokens
const bodyParser = require('body-parser');
const jwt = require('jsonwebtoken'); // used to create, sign, and verify tokens
const crypto = require('crypto');
let cors = require('cors');
const cors = require('cors');
const zmq = require('zmq');
const sock = zmq.socket('sub');
let addressBase58Prefix = cnUtil.address_decode(new Buffer(global.config.pool.address));
let threadName = "";
let workerList = [];
let server;
let io;
if (cluster.isMaster) {
threadName = "(Master) ";
@ -22,10 +25,6 @@ if (cluster.isMaster) {
threadName = "(Worker " + cluster.worker.id + " - " + process.pid + ") ";
}
app.use(cors());
app.use(bodyParser.urlencoded({extended: false}));
app.use(bodyParser.json());
let pool_list = [];
if(global.config.pplns.enable === true){
pool_list.push('pplns');
@ -37,6 +36,248 @@ if(global.config.solo.enable === true){
pool_list.push('solo');
}
app.use(cors());
app.use(bodyParser.urlencoded({extended: false}));
app.use(bodyParser.json());
if (cluster.isMaster) {
let numWorkers = require('os').cpus().length;
console.log('Master cluster setting up ' + numWorkers + ' workers...');
for (let i = 0; i < numWorkers; i++) {
let worker = cluster.fork();
workerList.push(worker);
}
cluster.on('online', function (worker) {
console.log('Worker ' + worker.process.pid + ' is online');
});
cluster.on('exit', function (worker, code, signal) {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
console.log('Starting a new worker');
worker = cluster.fork();
workerList.push(worker);
});
} else {
server = app.listen(8001, function () {
console.log('Process ' + process.pid + ' is listening to all incoming requests');
});
io = require('socket.io')(server);
// Init the ZMQ listeners here.
sock.connect('tcp://127.0.0.1:3000');
sock.connect('tcp://127.0.0.1:3001');
// miner_hash_graph - Hash graphs updated
// miner_hash_stats - Miner hashes updated
// network_block_info - New block information
// pool_stats - Pool statistics update
sock.subscribe('');
}
// SocketIO Routes
// ===============
io.on('connection', (socket) => {
socket.on('room', function(room) {
socket.join(room);
});
});
// As the majority of updates come in upstream from the main SocketIO server, we use ZMQ to manage this.
// The following is the ZMQ logic. Pray to whatever deity you like.
sock.on('message', function(topic, message) {
/*
Registered ZMQ Messages:
miner_hash_graph - Hash graphs updated
miner_hash_stats - Miner hashes updated
network_block_info - New block information
pool_stats - Pool statistics update
payments - Payments complete - Trip the frontend to hit the frontend for new payment information
Registered Rooms:
payments
hash_chart_<address> - equiv to https://api.xmrpool.net/miner/<address>/chart/hashrate/allWorkers
worker_stats_<address> - equiv to https://api.xmrpool.net/miner/<address>/stats/allWorkers
worker_ids_<address> - equiv to https://api.xmrpool.net/miner/<address>/identifiers
address_stats_<address> - equiv to https://api.xmrpool.net/miner/<address>/stats
network_block_info - equiv to https://api.xmrpool.net/network/stats
pool_stats_<type> - equiv to https://api.xmrpool.net/pool/stats/<type>
pool_stats - equiv to https://api.xmrpool.net/pool/stats/
*/
switch(topic){
case 'payments':
io.sockets.in('payments').emit('message', 'newPaymentsAvailable');
break;
case 'miner_hash_graph':
message = JSON.parse(message);
message.forEach(function(address){
io.sockets.in('hash_chart_'+address).emit('message', JSON.stringify(getAllWorkerHashCharts(address)));
});
break;
case 'miner_hash_stats':
message = JSON.parse(message);
message.forEach(function(address){
io.sockets.in('address_stats_'+address).emit('message', JSON.stringify(getAddressStats(address)));
io.sockets.in('worker_stats_'+address).emit('message', JSON.stringify(getAllWorkerStats(address)));
io.sockets.in('worker_ids_'+address).emit('message', JSON.stringify(global.database.getCache(address + '_identifiers')));
});
break;
case 'network_block_info':
io.sockets.in('block_update').emit('message', JSON.stringify(global.database.getCache('networkBlockInfo')));
break;
case 'pool_stats':
if (message === 'global'){
let localCache = global.database.getCache('pool_stats_global');
delete(localCache.minerHistory);
delete(localCache.hashHistory);
let lastPayment = global.database.getCache('lastPaymentCycle');
io.sockets.in('pool_stats').emit('message', JSON.stringify({pool_list: pool_list, pool_statistics: localCache, last_payment: !lastPayment ? 0 : lastPayment}));
} else {
let pool_type = message;
let localCache;
switch (pool_type) {
case 'pplns':
localCache = global.database.getCache('pool_stats_pplns');
localCache.fee = global.config.payout.pplnsFee;
break;
case 'pps':
localCache = global.database.getCache('pool_stats_pps');
localCache.fee = global.config.payout.ppsFee;
break;
case 'solo':
localCache = global.database.getCache('pool_stats_solo');
localCache.fee = global.config.payout.soloFee;
break;
case 'default':
io.sockets.in('pool_stats_' + message).emit('message', JSON.stringify({'error': 'Invalid pool type'}));
}
delete(localCache.minerHistory);
delete(localCache.hashHistory);
io.sockets.in('pool_stats_' + message).emit('message', JSON.stringify({pool_statistics: localCache}));
}
}
});
// Support Functions that are reused now
function getAllWorkerHashCharts(address){
let identifiers = global.database.getCache(address + '_identifiers');
let returnData = {global: global.database.getCache(address)['hashHistory']};
if (identifiers !== false){
identifiers.sort();
} else {
return returnData;
}
let intCounter = 0;
identifiers.forEach(function(identifier){
returnData[identifier] = global.database.getCache(address+"_"+identifier)['hashHistory'];
intCounter += 1;
if (intCounter === identifiers.length){
return returnData;
}
});
}
function getAllWorkerStats(address){
let identifiers = global.database.getCache(address + '_identifiers');
let globalCache = global.database.getCache(address);
let returnData = {global: {
lts: Math.floor(globalCache.lastHash / 1000),
identifer: 'global',
hash: globalCache.hash,
totalHash: globalCache.totalHashes
}};
let intCounter = 0;
if (identifiers === false){
return returnData;
}
identifiers.sort().forEach(function(identifier){
let cachedData = global.database.getCache(address+"_"+identifier);
returnData[identifier] = {
lts: Math.floor(cachedData.lastHash / 1000),
identifer: identifier,
hash: cachedData.hash,
totalHash: cachedData.totalHashes
};
intCounter += 1;
if (intCounter === identifiers.length){
return returnData;
}
});
}
function getAddressStats(address){
let address_parts = address.split('.');
let address_pt = address_parts[0];
let payment_id = address_parts[1];
let cachedData = global.database.getCache(address);
let paidQuery = "SELECT SUM(amount) as amt FROM payments WHERE payment_address = ? AND payment_id = ?";
let txnQuery = "SELECT count(id) as amt FROM payments WHERE payment_address = ? AND payment_id = ?";
let unpaidQuery = "SELECT SUM(amount) as amt FROM balance WHERE payment_address = ? AND payment_id = ?";
if (typeof(payment_id) === 'undefined') {
paidQuery = "SELECT SUM(amount) as amt FROM payments WHERE payment_address = ? AND payment_id IS ?";
txnQuery = "SELECT count(id) as amt FROM payments WHERE payment_address = ? AND payment_id IS ?";
unpaidQuery = "SELECT SUM(amount) as amt FROM balance WHERE payment_address = ? AND payment_id IS ?";
}
async.waterfall([
function (callback) {
debug(threadName + "Checking Influx for last 10min avg for /miner/address/stats");
return callback(null, {hash: cachedData.hash, identifier: 'global', lastHash: Math.floor(cachedData.lastHash / 1000),
totalHashes: cachedData.totalHashes, validShares: Number(cachedData.goodShares), invalidShares: Number(cachedData.badShares)});
},
function (returnData, callback) {
debug(threadName + "Checking MySQL total amount paid for /miner/address/stats");
global.mysql.query(paidQuery, [address_pt, payment_id]).then(function (rows) {
if (typeof(rows[0]) === 'undefined') {
returnData.amtPaid = 0;
} else {
returnData.amtPaid = rows[0].amt;
if (returnData.amtPaid === null) {
returnData.amtPaid = 0;
}
}
return callback(null, returnData);
});
},
function (returnData, callback) {
debug(threadName + "Checking MySQL total amount unpaid for /miner/address/stats");
global.mysql.query(unpaidQuery, [address_pt, payment_id]).then(function (rows) {
if (typeof(rows[0]) === 'undefined') {
returnData.amtDue = 0;
} else {
returnData.amtDue = rows[0].amt;
if (returnData.amtDue === null) {
returnData.amtDue = 0;
}
}
return callback(null, returnData);
});
},
function (returnData, callback) {
debug(threadName + "Checking MySQL total amount unpaid for /miner/address/stats");
global.mysql.query(txnQuery, [address_pt, payment_id]).then(function (rows) {
if (typeof(rows[0]) === 'undefined') {
returnData.txnCount = 0;
} else {
returnData.txnCount = rows[0].amt;
if (returnData.txnCount === null) {
returnData.txnCount = 0;
}
}
return callback(true, returnData);
});
}
], function (err, result) {
debug(threadName + "Result information for " + address + ": " + JSON.stringify(result));
if (err === true) {
return result;
}
if (err) {
console.error(threadName + "Error within the miner stats identifier func");
return {'error': err.toString()};
}
});
}
// ROUTES FOR OUR API
// =============================================================================
@ -300,32 +541,7 @@ app.get('/miner/:address/payments', function (req, res) {
});
app.get('/miner/:address/stats/allWorkers', function (req, res) {
let address = req.params.address;
let identifiers = global.database.getCache(address + '_identifiers');
let globalCache = global.database.getCache(address);
let returnData = {global: {
lts: Math.floor(globalCache.lastHash / 1000),
identifer: 'global',
hash: globalCache.hash,
totalHash: globalCache.totalHashes
}};
let intCounter = 0;
if (identifiers === false){
return res.json(returnData);
}
identifiers.sort().forEach(function(identifier){
let cachedData = global.database.getCache(req.params.address+"_"+identifier);
returnData[identifier] = {
lts: Math.floor(cachedData.lastHash / 1000),
identifer: identifier,
hash: cachedData.hash,
totalHash: cachedData.totalHashes
};
intCounter += 1;
if (intCounter === identifiers.length){
return res.json(returnData);
}
});
return res.json(getAllWorkerStats(req.params.address));
});
app.get('/miner/:address/stats/:identifier', function (req, res) {
@ -353,22 +569,7 @@ app.get('/miner/:address/chart/hashrate', function (req, res) {
});
app.get('/miner/:address/chart/hashrate/allWorkers', function (req, res) {
let address = req.params.address;
let identifiers = global.database.getCache(address + '_identifiers');
let returnData = {global: global.database.getCache(req.params.address)['hashHistory']};
if (identifiers !== false){
identifiers.sort();
} else {
return res.json(returnData);
}
let intCounter = 0;
identifiers.forEach(function(identifier){
returnData[identifier] = global.database.getCache(req.params.address+"_"+identifier)['hashHistory'];
intCounter += 1;
if (intCounter === identifiers.length){
return res.json(returnData);
}
});
return res.json(getAllWorkerHashCharts(req.params.address));
});
app.get('/miner/:address/chart/hashrate/:identifier', function (req, res) {
@ -376,77 +577,7 @@ app.get('/miner/:address/chart/hashrate/:identifier', function (req, res) {
});
app.get('/miner/:address/stats', function (req, res) {
let address = req.params.address;
let address_parts = req.params.address.split('.');
let address_pt = address_parts[0];
let payment_id = address_parts[1];
let cachedData = global.database.getCache(address);
let paidQuery = "SELECT SUM(amount) as amt FROM payments WHERE payment_address = ? AND payment_id = ?";
let txnQuery = "SELECT count(id) as amt FROM payments WHERE payment_address = ? AND payment_id = ?";
let unpaidQuery = "SELECT SUM(amount) as amt FROM balance WHERE payment_address = ? AND payment_id = ?";
if (typeof(payment_id) === 'undefined') {
paidQuery = "SELECT SUM(amount) as amt FROM payments WHERE payment_address = ? AND payment_id IS ?";
txnQuery = "SELECT count(id) as amt FROM payments WHERE payment_address = ? AND payment_id IS ?";
unpaidQuery = "SELECT SUM(amount) as amt FROM balance WHERE payment_address = ? AND payment_id IS ?";
}
async.waterfall([
function (callback) {
debug(threadName + "Checking Influx for last 10min avg for /miner/address/stats");
return callback(null, {hash: cachedData.hash, identifier: 'global', lastHash: Math.floor(cachedData.lastHash / 1000),
totalHashes: cachedData.totalHashes, validShares: Number(cachedData.goodShares), invalidShares: Number(cachedData.badShares)});
},
function (returnData, callback) {
debug(threadName + "Checking MySQL total amount paid for /miner/address/stats");
global.mysql.query(paidQuery, [address_pt, payment_id]).then(function (rows) {
if (typeof(rows[0]) === 'undefined') {
returnData.amtPaid = 0;
} else {
returnData.amtPaid = rows[0].amt;
if (returnData.amtPaid === null) {
returnData.amtPaid = 0;
}
}
return callback(null, returnData);
});
},
function (returnData, callback) {
debug(threadName + "Checking MySQL total amount unpaid for /miner/address/stats");
global.mysql.query(unpaidQuery, [address_pt, payment_id]).then(function (rows) {
if (typeof(rows[0]) === 'undefined') {
returnData.amtDue = 0;
} else {
returnData.amtDue = rows[0].amt;
if (returnData.amtDue === null) {
returnData.amtDue = 0;
}
}
return callback(null, returnData);
});
},
function (returnData, callback) {
debug(threadName + "Checking MySQL total amount unpaid for /miner/address/stats");
global.mysql.query(txnQuery, [address_pt, payment_id]).then(function (rows) {
if (typeof(rows[0]) === 'undefined') {
returnData.txnCount = 0;
} else {
returnData.txnCount = rows[0].amt;
if (returnData.txnCount === null) {
returnData.txnCount = 0;
}
}
return callback(true, returnData);
});
}
], function (err, result) {
debug(threadName + "Result information for " + address + ": " + JSON.stringify(result));
if (err === true) {
return res.json(result);
}
if (err) {
console.error(threadName + "Error within the miner stats identifier func");
return res.json({'error': err.toString()});
}
});
return res.json(getAddressStats(req.params.address));
});
// Authentication
@ -805,29 +936,4 @@ adminRoutes.get('/userList', function (req, res) {
app.use('/authed', secureRoutes);
app.use('/admin', adminRoutes);
// Authenticated routes
if (cluster.isMaster) {
let numWorkers = require('os').cpus().length;
console.log('Master cluster setting up ' + numWorkers + ' workers...');
for (let i = 0; i < numWorkers; i++) {
let worker = cluster.fork();
workerList.push(worker);
}
cluster.on('online', function (worker) {
console.log('Worker ' + worker.process.pid + ' is online');
});
cluster.on('exit', function (worker, code, signal) {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
console.log('Starting a new worker');
worker = cluster.fork();
workerList.push(worker);
});
} else {
app.listen(8001, function () {
console.log('Process ' + process.pid + ' is listening to all incoming requests');
});
}
// Authenticated routes

View file

@ -4,12 +4,15 @@ const async = require("async");
const debug = require("debug")("payments");
const request = require('request-json');
const range = require('range');
const zmq = require('zmq');
const sock = zmq.socket('pub');
let hexChars = new RegExp("[0-9a-f]+");
let bestExchange = global.config.payout.bestExchange;
let xmrAPIClient = request.createClient('https://xmr.to/api/v1/xmr2btc/');
let extraPaymentRound = false;
let paymentTimer = null;
sock.bindSync('tcp://127.0.0.1:3001');
let shapeshiftQueue = async.queue(function (task, callback) {
// Amount needs to be shifted in as a non-completed value, as the wallet will only take non-complete values..
@ -344,6 +347,7 @@ paymentQueue.drain = function(){
extraPaymentRound = false;
paymentTimer = setInterval(makePayments, global.config.payout.timer * 60 * 1000);
global.database.setCache('lastPaymentCycle', Math.floor(Date.now()/1000));
sock.send(['payments', 'complete']);
};
function updateShapeshiftCompletion() {

View file

@ -2,9 +2,12 @@
const debug = require("debug")("worker");
const async = require("async");
const sprintf = require("sprintf-js").sprintf;
const zmq = require('zmq');
const sock = zmq.socket('pub');
let threadName = "Worker Server ";
let cycleCount = 0;
sock.bindSync('tcp://127.0.0.1:3000');
function updateShareStats() {
// This is an omni-worker to deal with all things share-stats related
@ -12,6 +15,7 @@ function updateShareStats() {
// Buffer lengths? You guessed it, configured in SQL.
// Stats timeouts are 30 seconds, so everything for buffers should be there.
let currentTime = Date.now();
let activeAddresses = [];
async.waterfall([
function (callback) {
global.coinFuncs.getLastBlockHeader(function (body) {
@ -173,6 +177,9 @@ function updateShareStats() {
if (globalMinerList.indexOf(miner) === -1) {
globalMinerList.push(miner);
}
if (miner.indexOf('-') === -1){
activeAddresses.push(miner);
}
let cachedData = global.database.getCache(miner);
if (cachedData !== false) {
cachedData.hash = Math.floor(localStats.miners[miner] / 600);
@ -241,7 +248,9 @@ function updateShareStats() {
cycleCount += 1;
if (cycleCount === 6){
cycleCount = 0;
sock.send(['miner_hash_graph', JSON.stringify(activeAddresses)]);
}
sock.send(['miner_hash_stats', JSON.stringify(activeAddresses)]);
});
setTimeout(updateShareStats, 10000);
}
@ -333,6 +342,7 @@ function updatePoolStats(poolType) {
totalPayments: result[7] || 0,
roundHashes: result[8] || 0
});
sock.send(['pool_stats', poolType]);
});
}
@ -430,6 +440,7 @@ function updateBlockHeader() {
value: body.result.block_header.reward,
ts: body.result.block_header.timestamp
});
sock.send(['network_block_info', 'complete']);
} else {
console.error("GetLastBlockHeader Error during block header update");
}

View file

@ -36,8 +36,10 @@
"request": "^2.79.0",
"request-json": "0.6.1",
"shapeshift.io": "1.3.0",
"socketio": "^1.0.0",
"sprintf-js": "^1.0.3",
"uuid": "3.0.1",
"wallet-address-validator": "0.1.0"
"wallet-address-validator": "0.1.0",
"zmq": "^2.15.3"
}
}