portal/db/InfluxDBImpl.js

345 lines
20 KiB
JavaScript

/* EdgeVPNio
* Copyright 2021, University of Florida
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
const { InfluxDB } = require('influx')
const { DataTransformer } = require('../controllers/DataTransformer')
const { DataBaseInterface } = require('./DatabaseInterface');
const dotenv = require('dotenv')
class InfluxDBImpl extends DataBaseInterface {
// Stores the dbname and is available for every instance
constructor(dbname) {
super('');
// Once an instance is created the db connection is kept until the instance is alive.
this.db = new InfluxDB({ host: process.env.DB_URI, database: dbname, options:{ timeout: 3 * 60 * 1000 } });
this.dbname = dbname;
this.db.getDatabaseNames()
.then(names => {
//console.log("Names", names);
if (!names.includes(dbname)) {
return this.db.createDatabase(dbname);
}
})
.then(noResp => {
return this.db.createRetentionPolicy('EvioPolicy', {
duration: process.env.expireTime,
replication: 1,
isDefault: true
})
});
}
/**
* Getter function for the db attribute
*/
getDb() {
return this.db;
}
/**
* Function to insert the PUT data sent by the EVIO nodes into
* collections: 1) Overlays
* 2) Topology
*
* @param {Json} data Aggregated data Object
* @param {Number} timestamp End timestamp at which object was taken.
*/
async insertInto(data, timestamp) {
var dataTransformer = new DataTransformer();
// The data is transformed to the required form and returned as an array of arrays.
var transformedData = dataTransformer.transformData(data);
var overlaysData = {
_id: timestamp,
Overlays: JSON.stringify(transformedData[0])
}
var dataTobeWritten = [];
var overlaysWritePoint = {
"measurement": "Overlays",
"tags": overlaysData._id,
"time": overlaysData._id,
"_id": timestamp,
"fields": overlaysData
};
dataTobeWritten.push(overlaysWritePoint);
for (var num in transformedData[1]) {
var topologyData = {
_id: timestamp,
OverlayId: transformedData[1][num]['OverlayId'],
Topology: JSON.stringify(transformedData[1][num])
}
var topologyWritePoint = {
"measurement": "Topology",
"tags": { '_id': topologyData._id },
"time": topologyData._id,
"_id": timestamp,
"OverlayId": transformedData[1][num]['OverlayId'],
"fields": topologyData
};
//console.log("Topology write point:", topologyWritePoint);
dataTobeWritten.push(topologyWritePoint);
}
console.log("Data to be written:", dataTobeWritten);
this.db.writePoints(dataTobeWritten)
.then(() => {
//console.log('FINISHED')
})
.catch(e => {
console.error(e)
console.log('\\nFinished ERROR')
});
}
/**
* Database call to get the intervals stored.
*
* @param {String} tableName Model Name to use to find the intervals.
*/
async getIntervals(tableName) {
return this.db.query(`select _id from ${tableName}`)
.catch(error => console.log("Error while querying InfluxDB:", error));
}
/**
* Database call to get the high level Overlay information.
*
* @param {String} tableName Model to use to get the overlay.
* @param {Float} intervalId Interval identifier to query.
*/
async findOverlays(tableName, intervalId) {
if (intervalId) {
//Find the next available interval, greater than the previous one from client
return this.db.query(`select * from ${tableName} WHERE _id > ${intervalId} ORDER BY time ASC LIMIT 1`)
.then(jsonStr => {
//data not available yet, returning null
if (typeof jsonStr[0] === 'undefined') {
//console.log("data not found at findOverlays,start polling");
return null;
} else {
//constructing Overlays object to render to client
var overlaysObj = {
_id: jsonStr[0]['_id'],
Overlays: JSON.parse(jsonStr[0]['Overlays'])
}
return [overlaysObj];
}
})
.catch(error => console.log("Error while querying InfluxDB:", error));
}
//Most recent entry - intervalId not passed
return this.db.query(`select * from ${tableName} ORDER BY time DESC LIMIT 1`)
.then(jsonStr => {
//data not available yet, returning null
if (typeof jsonStr[0] === 'undefined') {
//console.log("data not found at findOverlays,start polling");
return null;
} else {
//constructing Overlays object to render to client
var overlaysObj = {
_id: jsonStr[0]['_id'],
Overlays: JSON.parse(jsonStr[0]['Overlays'])
}
return [overlaysObj];
}
})
.catch(error => console.log("Error while querying InfluxDB:", error));
}
/**
* Database call to get the detailed topology information.
*
* @param {String} tableName
* @param {Float} intervalId
* @param {String} overlayId
*/
async findTopology(tableName, intervalId, overlayId) {
if (intervalId) {
//Find the next available interval, greater than the previous one from client
return this.db.query(`select * from ` + tableName + ` WHERE (_id > ` + intervalId + ` AND OverlayId = \'` + overlayId + `\') ORDER BY time ASC LIMIT 1`)
.then(jsonStr => {
//data not available yet, returning null
if (typeof jsonStr[0] === 'undefined') {
//console.log("data not found at findTopology,start polling");
return null;
} else {
//constructing Topology object to render to client
var topoObj = {
_id: jsonStr[0]['_id'],
Topology: [JSON.parse(jsonStr[0]['Topology'])]
}
return [topoObj];
}
})
.catch(error => console.log("Error while querying InfluxDB:", error));
}
//Most recent entry - intervalId not passed
return this.db.query(`select * from ` + tableName + ` WHERE OverlayId = \'` + overlayId + `\' ORDER BY time DESC LIMIT 1`)
.then(jsonStr => {
//data not available yet, returning null
if (typeof jsonStr[0] === 'undefined') {
//console.log("data not found at findTopology,start polling");
return null;
} else {
//constructing Topology object to render to client
var topoObj = {
_id: jsonStr[0]['_id'],
Topology: [JSON.parse(jsonStr[0]['Topology'])]
}
return [topoObj];
}
})
.catch(error => console.log("Error while querying InfluxDB:", error));
}
/**
* Function to query Overlays collection, watch the collection for insert op every 1 second
* @param {String} tableName
* @param {Float} intervalId
* @param {String} overlayId
* @returns inserted data to topology collection
*/
async getOverlays(tableName, intervalId) {
var that = this;
var overlaysData = null;
async function streamReady() {
//database call to get Overlays data
var res = await that.findOverlays(tableName, intervalId)
.then(data => {
//data not available yet
if (data === null || Object.keys(data).length === 0) {
//console.log("No data found, setting data to null.")
overlaysData = null;
} else {
//Initializing data to exit setInterval timer loop
//console.log("Got data from DB, not waiting. Data:", data);
overlaysData = data;
}
})
return new Promise(ok => {
//Start timer to poll DB every 1 sec for data
var overlaysInterval = setInterval(function () {
if (overlaysData) {
//data found, interval reset to exit function
clearInterval(overlaysInterval)
return ok();
} else {
//Query to get the latest available interval from db
that.db.query('select _id from ' + tableName + ' WHERE _id > ' + intervalId + ' ORDER BY time DESC LIMIT 1')
.then(intervalIdData => {
//Check if Overlays data is available
if (intervalIdData['groupRows'].length > 0) {
overlaysData = that.db.query('select * from ' + tableName + ' ORDER BY time DESC LIMIT 1')
.then(jsonStr => {
//Overlays object construction
var overlaysObj = {
_id: jsonStr[0]['_id'],
Overlays: JSON.parse(jsonStr[0]['Overlays'])
}
return [overlaysObj];
});
} else {
//data not available yet, checking after 1 sec for insert OP
return null;
}
});
}
}, 1000);
});
}
//Method to call GET Overlays, loop through timer every 1 sec if latest data not inserted to db
var newData = await streamReady()
.then(data => {
return data;
});
//Return data on successful poll to client
return overlaysData;
}
/**
* Function to query Topology collection, watch the collection for insert op every 1 second
* @param {String} tableName
* @param {Float} intervalId
* @param {String} overlayId
* @returns inserted data to topology collection
*/
async getTopology(tableName, intervalId, overlayId) {
var that = this;
var topologyData = null;
async function streamReady() {
//database call to get Topology data
var res = await that.findTopology(tableName, intervalId, overlayId)
.then(data => {
if (data === null || Object.keys(data).length === 0) {
//console.log("No data found, setting data to null.")
//interval reset to exit function
topologyData = null;
} else {
//Initializing data to exit setInterval timer loop
//console.log("Got data from DB, not waiting. Data:", data);
topologyData = data;
}
})
//Start timer to poll DB every 1 sec for data
return new Promise(ok => {
var topologyInterval = setInterval(function () {
if (topologyData) {
//data found, interval reset to exit function
clearInterval(topologyInterval)
return ok();
} else {
//Query to get the latest available interval from db
that.db.query('select _id from ' + tableName + ' WHERE (_id > ' + intervalId + ' AND OverlayId = \'' + overlayId + '\') ORDER BY time DESC LIMIT 1')
.then(intervalIdData => {
//Check if Topology data is available
if (intervalIdData['groupRows'].length > 0) {
topologyData = that.db.query('select * from ' + tableName + ' WHERE OverlayId = \'' + overlayId + '\' ORDER BY time DESC LIMIT 1')
.then(jsonStr => {
//Topology object contruction
var topoObj = {
_id: jsonStr[0]['_id'],
Topology: [JSON.parse(jsonStr[0]['Topology'])]
}
return [topoObj];
});
} else {
//data not available yet, checking after 1 sec for insert OP
return null;
}
});
}
}, 1000);
});
}
var newData = await streamReady()
.then(data => {
return data;
});
//Return data on successful poll to client
return topologyData;
}
}
module.exports = { InfluxDBImpl }