Merge pull request #5 from Freeboard/cluster_support

Added cluster support
master
Jim Heising 2016-12-26 15:01:43 -08:00 committed by GitHub
commit cef0ce54a6
2 changed files with 133 additions and 146 deletions

View File

@ -5,4 +5,5 @@ exports.proxy_request_timeout_ms = 10000; // The lenght of time we'll wait for a
exports.max_request_length = 100000; // The maximum length of characters allowed for a request or a response. exports.max_request_length = 100000; // The maximum length of characters allowed for a request or a response.
exports.enable_rate_limiting = true; exports.enable_rate_limiting = true;
exports.max_requests_per_second = 10; // The maximum number of requests per second to allow from a given IP. exports.max_requests_per_second = 10; // The maximum number of requests per second to allow from a given IP.
exports.blacklist_hostname_regex = /^(10\.|192\.|127\.|localhost$)/i; // Good for limiting access to internal IP addresses and hosts. exports.blacklist_hostname_regex = /^(10\.|192\.|127\.|localhost$)/i; // Good for limiting access to internal IP addresses and hosts.
exports.cluster_process_count = Number(process.env.CLUSTER_PROCESS_COUNT) || require("os").cpus().length;

276
server.js
View File

@ -2,205 +2,191 @@ var http = require('http');
var config = require("./config"); var config = require("./config");
var url = require("url"); var url = require("url");
var request = require("request"); var request = require("request");
var cluster = require('cluster');
var throttle = require("tokenthrottle")({rate: config.max_requests_per_second}); var throttle = require("tokenthrottle")({rate: config.max_requests_per_second});
var publicAddressFinder = require("public-address"); var publicAddressFinder = require("public-address");
var publicIP; var publicIP;
// Get our public IP address // Get our public IP address
publicAddressFinder(function(err, data){ publicAddressFinder(function (err, data) {
if(!err && data) if (!err && data) {
{ publicIP = data.address;
publicIP = data.address; }
}
}); });
function addCORSHeaders(req, res) function addCORSHeaders(req, res) {
{ if (req.method.toUpperCase() === "OPTIONS") {
if (req.method.toUpperCase() === "OPTIONS") if (req.headers["access-control-request-headers"]) {
{ res.setHeader("Access-Control-Allow-Headers", req.headers["access-control-request-headers"]);
if(req.headers["access-control-request-headers"]) }
{
res.setHeader("Access-Control-Allow-Headers", req.headers["access-control-request-headers"]);
}
if(req.headers["access-control-request-method"]) if (req.headers["access-control-request-method"]) {
{ res.setHeader("Access-Control-Allow-Methods", req.headers["access-control-request-method"]);
res.setHeader("Access-Control-Allow-Methods", req.headers["access-control-request-method"]); }
} }
}
if(req.headers["origin"]) if (req.headers["origin"]) {
{ res.setHeader("Access-Control-Allow-Origin", req.headers["origin"]);
res.setHeader("Access-Control-Allow-Origin", req.headers["origin"]); }
} else {
else res.setHeader("Access-Control-Allow-Origin", "*");
{ }
res.setHeader("Access-Control-Allow-Origin", "*");
}
} }
function writeResponse(res, httpCode, body) { function writeResponse(res, httpCode, body) {
res.statusCode = httpCode; res.statusCode = httpCode;
res.end(body); res.end(body);
} }
function sendInvalidURLResponse(res) { function sendInvalidURLResponse(res) {
return writeResponse(res, 404, "url must be in the form of /fetch/{some_url_here}"); return writeResponse(res, 404, "url must be in the form of /fetch/{some_url_here}");
} }
function sendTooBigResponse(res) { function sendTooBigResponse(res) {
return writeResponse(res, 413, "the content in the request or response cannot exceed " + config.max_request_length + " characters."); return writeResponse(res, 413, "the content in the request or response cannot exceed " + config.max_request_length + " characters.");
} }
function getClientAddress(req) { function getClientAddress(req) {
return (req.headers['x-forwarded-for'] || '').split(',')[0] return (req.headers['x-forwarded-for'] || '').split(',')[0]
|| req.connection.remoteAddress; || req.connection.remoteAddress;
} }
function processRequest(req, res) function processRequest(req, res) {
{ addCORSHeaders(req, res);
addCORSHeaders(req, res);
// Return options pre-flight requests right away // Return options pre-flight requests right away
if (req.method.toUpperCase() === "OPTIONS") if (req.method.toUpperCase() === "OPTIONS") {
{ return writeResponse(res, 204);
return writeResponse(res, 204); }
}
var result = config.fetch_regex.exec(req.url); var result = config.fetch_regex.exec(req.url);
if (result && result.length == 2 && result[1]) { if (result && result.length == 2 && result[1]) {
var remoteURL; var remoteURL;
try { try {
remoteURL = url.parse(decodeURI(result[1])); remoteURL = url.parse(decodeURI(result[1]));
} }
catch (e) { catch (e) {
return sendInvalidURLResponse(res); return sendInvalidURLResponse(res);
} }
// We don't support relative links // We don't support relative links
if(!remoteURL.host) if (!remoteURL.host) {
{ return writeResponse(res, 404, "relative URLS are not supported");
return writeResponse(res, 404, "relative URLS are not supported"); }
}
// Naughty, naughty— deny requests to blacklisted hosts // Naughty, naughty— deny requests to blacklisted hosts
if(config.blacklist_hostname_regex.test(remoteURL.hostname)) if (config.blacklist_hostname_regex.test(remoteURL.hostname)) {
{ return writeResponse(res, 400, "naughty, naughty...");
return writeResponse(res, 400, "naughty, naughty..."); }
}
// We only support http and https // We only support http and https
if (remoteURL.protocol != "http:" && remoteURL.protocol !== "https:") { if (remoteURL.protocol != "http:" && remoteURL.protocol !== "https:") {
return writeResponse(res, 400, "only http and https are supported"); return writeResponse(res, 400, "only http and https are supported");
} }
if(publicIP) if (publicIP) {
{ // Add an X-Forwarded-For header
// Add an X-Forwarded-For header if (req.headers["x-forwarded-for"]) {
if(req.headers["x-forwarded-for"]) req.headers["x-forwarded-for"] += ", " + publicIP;
{ }
req.headers["x-forwarded-for"] += ", " + publicIP; else {
} req.headers["x-forwarded-for"] = req.clientIP + ", " + publicIP;
else }
{ }
req.headers["x-forwarded-for"] = req.clientIP + ", " + publicIP;
}
}
// Make sure the host header is to the URL we're requesting, not thingproxy // Make sure the host header is to the URL we're requesting, not thingproxy
if(req.headers["host"]) { if (req.headers["host"]) {
req.headers["host"] = remoteURL.host; req.headers["host"] = remoteURL.host;
} }
var proxyRequest = request({ var proxyRequest = request({
url: remoteURL, url: remoteURL,
headers: req.headers, headers: req.headers,
method: req.method, method: req.method,
timeout: config.proxy_request_timeout_ms, timeout: config.proxy_request_timeout_ms,
strictSSL : false strictSSL: false
}); });
proxyRequest.on('error', function(err){ proxyRequest.on('error', function (err) {
if(err.code === "ENOTFOUND") if (err.code === "ENOTFOUND") {
{ return writeResponse(res, 502, "host cannot be found.")
return writeResponse(res, 502, "host cannot be found.") }
} else {
else console.log("Proxy Request Error: " + err.toString());
{ return writeResponse(res, 500);
console.log("Proxy Request Error: " + err.toString()); }
return writeResponse(res, 500);
}
}); });
var requestSize = 0; var requestSize = 0;
var proxyResponseSize = 0; var proxyResponseSize = 0;
req.pipe(proxyRequest).on('data', function(data){ req.pipe(proxyRequest).on('data', function (data) {
requestSize += data.length; requestSize += data.length;
if(requestSize >= config.max_request_length) if (requestSize >= config.max_request_length) {
{ proxyRequest.end();
proxyRequest.end(); return sendTooBigResponse(res);
return sendTooBigResponse(res); }
} });
});
proxyRequest.pipe(res).on('data', function (data) { proxyRequest.pipe(res).on('data', function (data) {
proxyResponseSize += data.length; proxyResponseSize += data.length;
if(proxyResponseSize >= config.max_request_length) if (proxyResponseSize >= config.max_request_length) {
{ proxyRequest.end();
proxyRequest.end(); return sendTooBigResponse(res);
return sendTooBigResponse(res); }
} });
}); }
} else {
else { return sendInvalidURLResponse(res);
return sendInvalidURLResponse(res); }
}
} }
http.createServer(function (req, res) { if (cluster.isMaster) {
for (var i = 0; i < config.cluster_process_count; i++) {
cluster.fork();
}
}
else
{
http.createServer(function (req, res) {
// Process AWS health checks // Process AWS health checks
if(req.url === "/health") if (req.url === "/health") {
{ return writeResponse(res, 200);
return writeResponse(res, 200); }
}
var clientIP = getClientAddress(req); var clientIP = getClientAddress(req);
req.clientIP = clientIP; req.clientIP = clientIP;
// Log our request // Log our request
if(config.enable_logging) if (config.enable_logging) {
{ console.log("%s %s %s", (new Date()).toJSON(), clientIP, req.method, req.url);
console.log("%s %s %s", (new Date()).toJSON(), clientIP, req.method, req.url); }
}
if(config.enable_rate_limiting) if (config.enable_rate_limiting) {
{ throttle.rateLimit(clientIP, function (err, limited) {
throttle.rateLimit(clientIP, function(err, limited) { if (limited) {
if (limited) return writeResponse(res, 429, "enhance your calm");
{ }
return writeResponse(res, 429, "enhance your calm");
}
processRequest(req, res); processRequest(req, res);
}) })
} }
else else {
{ processRequest(req, res);
processRequest(req, res); }
}
}).listen(config.port); }).listen(config.port);
console.log("thingproxy.freeboard.io started"); console.log("thingproxy.freeboard.io process started (PID " + process.pid + ")");
}