mirror of https://github.com/EdgeVPNio/portal.git
345 lines
20 KiB
JavaScript
345 lines
20 KiB
JavaScript
/* EdgeVPNio
|
|
* Copyright 2021, University of Florida
|
|
*
|
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
* of this software and associated documentation files (the "Software"), to deal
|
|
* in the Software without restriction, including without limitation the rights
|
|
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
* copies of the Software, and to permit persons to whom the Software is
|
|
* furnished to do so, subject to the following conditions:
|
|
*
|
|
* The above copyright notice and this permission notice shall be included in
|
|
* all copies or substantial portions of the Software.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
* THE SOFTWARE.
|
|
*/
|
|
|
|
const { InfluxDB } = require('influx')
|
|
const { DataTransformer } = require('../controllers/DataTransformer')
|
|
const { DataBaseInterface } = require('./DatabaseInterface');
|
|
const dotenv = require('dotenv')
|
|
|
|
class InfluxDBImpl extends DataBaseInterface {
|
|
// Stores the dbname and is available for every instance
|
|
constructor(dbname) {
|
|
super('');
|
|
// Once an instance is created the db connection is kept until the instance is alive.
|
|
this.db = new InfluxDB({ host: process.env.DB_URI, database: dbname, options:{ timeout: 3 * 60 * 1000 } });
|
|
this.dbname = dbname;
|
|
this.db.getDatabaseNames()
|
|
.then(names => {
|
|
//console.log("Names", names);
|
|
if (!names.includes(dbname)) {
|
|
return this.db.createDatabase(dbname);
|
|
}
|
|
})
|
|
.then(noResp => {
|
|
return this.db.createRetentionPolicy('EvioPolicy', {
|
|
duration: process.env.expireTime,
|
|
replication: 1,
|
|
isDefault: true
|
|
})
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Getter function for the db attribute
|
|
*/
|
|
getDb() {
|
|
return this.db;
|
|
}
|
|
|
|
/**
|
|
* Function to insert the PUT data sent by the EVIO nodes into
|
|
* collections: 1) Overlays
|
|
* 2) Topology
|
|
*
|
|
* @param {Json} data Aggregated data Object
|
|
* @param {Number} timestamp End timestamp at which object was taken.
|
|
*/
|
|
async insertInto(data, timestamp) {
|
|
var dataTransformer = new DataTransformer();
|
|
// The data is transformed to the required form and returned as an array of arrays.
|
|
var transformedData = dataTransformer.transformData(data);
|
|
var overlaysData = {
|
|
_id: timestamp,
|
|
Overlays: JSON.stringify(transformedData[0])
|
|
}
|
|
var dataTobeWritten = [];
|
|
var overlaysWritePoint = {
|
|
"measurement": "Overlays",
|
|
"tags": overlaysData._id,
|
|
"time": overlaysData._id,
|
|
"_id": timestamp,
|
|
"fields": overlaysData
|
|
};
|
|
dataTobeWritten.push(overlaysWritePoint);
|
|
for (var num in transformedData[1]) {
|
|
var topologyData = {
|
|
_id: timestamp,
|
|
OverlayId: transformedData[1][num]['OverlayId'],
|
|
Topology: JSON.stringify(transformedData[1][num])
|
|
}
|
|
var topologyWritePoint = {
|
|
"measurement": "Topology",
|
|
"tags": { '_id': topologyData._id },
|
|
"time": topologyData._id,
|
|
"_id": timestamp,
|
|
"OverlayId": transformedData[1][num]['OverlayId'],
|
|
"fields": topologyData
|
|
};
|
|
//console.log("Topology write point:", topologyWritePoint);
|
|
dataTobeWritten.push(topologyWritePoint);
|
|
}
|
|
console.log("Data to be written:", dataTobeWritten);
|
|
this.db.writePoints(dataTobeWritten)
|
|
.then(() => {
|
|
//console.log('FINISHED')
|
|
})
|
|
.catch(e => {
|
|
console.error(e)
|
|
console.log('\\nFinished ERROR')
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Database call to get the intervals stored.
|
|
*
|
|
* @param {String} tableName Model Name to use to find the intervals.
|
|
*/
|
|
async getIntervals(tableName) {
|
|
return this.db.query(`select _id from ${tableName}`)
|
|
.catch(error => console.log("Error while querying InfluxDB:", error));
|
|
}
|
|
|
|
/**
|
|
* Database call to get the high level Overlay information.
|
|
*
|
|
* @param {String} tableName Model to use to get the overlay.
|
|
* @param {Float} intervalId Interval identifier to query.
|
|
*/
|
|
async findOverlays(tableName, intervalId) {
|
|
if (intervalId) {
|
|
//Find the next available interval, greater than the previous one from client
|
|
return this.db.query(`select * from ${tableName} WHERE _id > ${intervalId} ORDER BY time ASC LIMIT 1`)
|
|
.then(jsonStr => {
|
|
//data not available yet, returning null
|
|
if (typeof jsonStr[0] === 'undefined') {
|
|
//console.log("data not found at findOverlays,start polling");
|
|
return null;
|
|
} else {
|
|
//constructing Overlays object to render to client
|
|
var overlaysObj = {
|
|
_id: jsonStr[0]['_id'],
|
|
Overlays: JSON.parse(jsonStr[0]['Overlays'])
|
|
}
|
|
return [overlaysObj];
|
|
}
|
|
})
|
|
.catch(error => console.log("Error while querying InfluxDB:", error));
|
|
}
|
|
//Most recent entry - intervalId not passed
|
|
return this.db.query(`select * from ${tableName} ORDER BY time DESC LIMIT 1`)
|
|
.then(jsonStr => {
|
|
//data not available yet, returning null
|
|
if (typeof jsonStr[0] === 'undefined') {
|
|
//console.log("data not found at findOverlays,start polling");
|
|
return null;
|
|
} else {
|
|
//constructing Overlays object to render to client
|
|
var overlaysObj = {
|
|
_id: jsonStr[0]['_id'],
|
|
Overlays: JSON.parse(jsonStr[0]['Overlays'])
|
|
}
|
|
return [overlaysObj];
|
|
}
|
|
})
|
|
.catch(error => console.log("Error while querying InfluxDB:", error));
|
|
}
|
|
|
|
/**
|
|
* Database call to get the detailed topology information.
|
|
*
|
|
* @param {String} tableName
|
|
* @param {Float} intervalId
|
|
* @param {String} overlayId
|
|
*/
|
|
async findTopology(tableName, intervalId, overlayId) {
|
|
if (intervalId) {
|
|
//Find the next available interval, greater than the previous one from client
|
|
return this.db.query(`select * from ` + tableName + ` WHERE (_id > ` + intervalId + ` AND OverlayId = \'` + overlayId + `\') ORDER BY time ASC LIMIT 1`)
|
|
.then(jsonStr => {
|
|
//data not available yet, returning null
|
|
if (typeof jsonStr[0] === 'undefined') {
|
|
//console.log("data not found at findTopology,start polling");
|
|
return null;
|
|
} else {
|
|
//constructing Topology object to render to client
|
|
var topoObj = {
|
|
_id: jsonStr[0]['_id'],
|
|
Topology: [JSON.parse(jsonStr[0]['Topology'])]
|
|
}
|
|
return [topoObj];
|
|
}
|
|
})
|
|
.catch(error => console.log("Error while querying InfluxDB:", error));
|
|
}
|
|
//Most recent entry - intervalId not passed
|
|
return this.db.query(`select * from ` + tableName + ` WHERE OverlayId = \'` + overlayId + `\' ORDER BY time DESC LIMIT 1`)
|
|
.then(jsonStr => {
|
|
//data not available yet, returning null
|
|
if (typeof jsonStr[0] === 'undefined') {
|
|
//console.log("data not found at findTopology,start polling");
|
|
return null;
|
|
} else {
|
|
//constructing Topology object to render to client
|
|
var topoObj = {
|
|
_id: jsonStr[0]['_id'],
|
|
Topology: [JSON.parse(jsonStr[0]['Topology'])]
|
|
}
|
|
return [topoObj];
|
|
}
|
|
})
|
|
.catch(error => console.log("Error while querying InfluxDB:", error));
|
|
}
|
|
|
|
/**
|
|
* Function to query Overlays collection, watch the collection for insert op every 1 second
|
|
* @param {String} tableName
|
|
* @param {Float} intervalId
|
|
* @param {String} overlayId
|
|
* @returns inserted data to topology collection
|
|
*/
|
|
async getOverlays(tableName, intervalId) {
|
|
var that = this;
|
|
var overlaysData = null;
|
|
async function streamReady() {
|
|
//database call to get Overlays data
|
|
var res = await that.findOverlays(tableName, intervalId)
|
|
.then(data => {
|
|
//data not available yet
|
|
if (data === null || Object.keys(data).length === 0) {
|
|
//console.log("No data found, setting data to null.")
|
|
overlaysData = null;
|
|
} else {
|
|
//Initializing data to exit setInterval timer loop
|
|
//console.log("Got data from DB, not waiting. Data:", data);
|
|
overlaysData = data;
|
|
}
|
|
})
|
|
|
|
return new Promise(ok => {
|
|
//Start timer to poll DB every 1 sec for data
|
|
var overlaysInterval = setInterval(function () {
|
|
if (overlaysData) {
|
|
//data found, interval reset to exit function
|
|
clearInterval(overlaysInterval)
|
|
return ok();
|
|
} else {
|
|
//Query to get the latest available interval from db
|
|
that.db.query('select _id from ' + tableName + ' WHERE _id > ' + intervalId + ' ORDER BY time DESC LIMIT 1')
|
|
.then(intervalIdData => {
|
|
//Check if Overlays data is available
|
|
if (intervalIdData['groupRows'].length > 0) {
|
|
overlaysData = that.db.query('select * from ' + tableName + ' ORDER BY time DESC LIMIT 1')
|
|
.then(jsonStr => {
|
|
//Overlays object construction
|
|
var overlaysObj = {
|
|
_id: jsonStr[0]['_id'],
|
|
Overlays: JSON.parse(jsonStr[0]['Overlays'])
|
|
}
|
|
return [overlaysObj];
|
|
});
|
|
} else {
|
|
//data not available yet, checking after 1 sec for insert OP
|
|
return null;
|
|
}
|
|
});
|
|
}
|
|
}, 1000);
|
|
});
|
|
}
|
|
//Method to call GET Overlays, loop through timer every 1 sec if latest data not inserted to db
|
|
var newData = await streamReady()
|
|
.then(data => {
|
|
return data;
|
|
});
|
|
//Return data on successful poll to client
|
|
return overlaysData;
|
|
}
|
|
|
|
/**
|
|
* Function to query Topology collection, watch the collection for insert op every 1 second
|
|
* @param {String} tableName
|
|
* @param {Float} intervalId
|
|
* @param {String} overlayId
|
|
* @returns inserted data to topology collection
|
|
*/
|
|
async getTopology(tableName, intervalId, overlayId) {
|
|
var that = this;
|
|
var topologyData = null;
|
|
async function streamReady() {
|
|
//database call to get Topology data
|
|
var res = await that.findTopology(tableName, intervalId, overlayId)
|
|
.then(data => {
|
|
if (data === null || Object.keys(data).length === 0) {
|
|
//console.log("No data found, setting data to null.")
|
|
//interval reset to exit function
|
|
topologyData = null;
|
|
|
|
} else {
|
|
//Initializing data to exit setInterval timer loop
|
|
//console.log("Got data from DB, not waiting. Data:", data);
|
|
topologyData = data;
|
|
}
|
|
})
|
|
|
|
//Start timer to poll DB every 1 sec for data
|
|
return new Promise(ok => {
|
|
var topologyInterval = setInterval(function () {
|
|
if (topologyData) {
|
|
//data found, interval reset to exit function
|
|
clearInterval(topologyInterval)
|
|
return ok();
|
|
} else {
|
|
//Query to get the latest available interval from db
|
|
that.db.query('select _id from ' + tableName + ' WHERE (_id > ' + intervalId + ' AND OverlayId = \'' + overlayId + '\') ORDER BY time DESC LIMIT 1')
|
|
.then(intervalIdData => {
|
|
//Check if Topology data is available
|
|
if (intervalIdData['groupRows'].length > 0) {
|
|
topologyData = that.db.query('select * from ' + tableName + ' WHERE OverlayId = \'' + overlayId + '\' ORDER BY time DESC LIMIT 1')
|
|
.then(jsonStr => {
|
|
//Topology object contruction
|
|
var topoObj = {
|
|
_id: jsonStr[0]['_id'],
|
|
Topology: [JSON.parse(jsonStr[0]['Topology'])]
|
|
}
|
|
return [topoObj];
|
|
});
|
|
} else {
|
|
//data not available yet, checking after 1 sec for insert OP
|
|
return null;
|
|
}
|
|
});
|
|
}
|
|
}, 1000);
|
|
});
|
|
}
|
|
|
|
var newData = await streamReady()
|
|
.then(data => {
|
|
return data;
|
|
});
|
|
//Return data on successful poll to client
|
|
return topologyData;
|
|
}
|
|
}
|
|
module.exports = { InfluxDBImpl }
|
|
|