/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
'use strict';
const debug = require('debug')('opensearch');
const os = require('os');
const { gzip, unzip, createGzip } = require('zlib');
const buffer = require('buffer');
const ms = require('ms');
const { EventEmitter } = require('events');
const {
ConnectionError,
RequestAbortedError,
NoLivingConnectionsError,
ResponseError,
ConfigurationError,
} = require('./errors');
const noop = () => {};
const compatibleCheckEmitter = new EventEmitter();
const clientVersion = require('../package.json').version;
const userAgent = `opensearch-js/${clientVersion} (${os.platform()} ${os.release()}-${os.arch()}; Node.js ${
process.version
})`;
const MAX_BUFFER_LENGTH = buffer.constants.MAX_LENGTH;
const MAX_STRING_LENGTH = buffer.constants.MAX_STRING_LENGTH;
const HEAP_SIZE_LIMIT = require('v8').getHeapStatistics().heap_size_limit;
const kCompatibleCheck = Symbol('compatible check');
const kApiVersioning = Symbol('api versioning');
/** Default Transport Layer */
class Transport {
constructor(opts) {
if (typeof opts.compression === 'string' && opts.compression !== 'gzip') {
throw new ConfigurationError(`Invalid compression: '${opts.compression}'`);
}
this.emit = opts.emit;
this.connectionPool = opts.connectionPool;
this.serializer = opts.serializer;
this.maxRetries = opts.maxRetries;
this.requestTimeout = toMs(opts.requestTimeout);
this.suggestCompression = opts.suggestCompression === true;
this.compression = opts.compression || false;
this.context = opts.context || null;
this.headers = Object.assign(
{},
{ 'user-agent': userAgent },
opts.suggestCompression === true ? { 'accept-encoding': 'gzip,deflate' } : null,
lowerCaseHeaders(opts.headers)
);
this.sniffInterval = opts.sniffInterval;
this.sniffOnConnectionFault = opts.sniffOnConnectionFault;
this.sniffEndpoint = opts.sniffEndpoint;
this.generateRequestId = opts.generateRequestId || generateRequestId();
this.name = opts.name;
this.opaqueIdPrefix = opts.opaqueIdPrefix;
this[kCompatibleCheck] = 0; // 0 = to be checked, 1 = checking, 2 = checked-ok, 3 checked-notok
this[kApiVersioning] = process.env.OPENSEARCH_CLIENT_APIVERSIONING === 'true';
this.memoryCircuitBreaker = opts.memoryCircuitBreaker;
this.nodeFilter = opts.nodeFilter || defaultNodeFilter;
if (typeof opts.nodeSelector === 'function') {
this.nodeSelector = opts.nodeSelector;
} else if (opts.nodeSelector === 'round-robin') {
this.nodeSelector = roundRobinSelector();
} else if (opts.nodeSelector === 'random') {
this.nodeSelector = randomSelector;
} else {
this.nodeSelector = roundRobinSelector();
}
this._sniffEnabled = typeof this.sniffInterval === 'number';
this._nextSniff = this._sniffEnabled ? Date.now() + this.sniffInterval : 0;
this._isSniffing = false;
if (opts.sniffOnStart === true) {
// timer needed otherwise it will clash
// with the compatible check testing
setTimeout(() => {
this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_START });
}, 10);
}
}
/**
* @param {Object} params
* @param {string} params.method - HTTP Method (e.g. HEAD, GET, POST...)
* @param {string} params.path - Relative URL path
* @param {Object | string} [params.body] - Body of a standard request.
* @param {Object[] | string} [params.bulkBody] - Body of a bulk request.
* @param {Object[] | string} [params.querystring] - Querystring params.
*
* @param {Object} options
* @param {number} [options.id] - Request ID
* @param {Object} [options.context] - Object used for observability
* @param {number} [options.maxRetries] - Max number of retries
* @param {false | 'gzip'} [options.compression] - Request body compression, if any
* @param {boolean} [options.asStream] - Whether to emit the response as stream
* @param {number[]} [options.ignore] - Response's Error Status Codes to ignore
* @param {Object} [options.headers] - Request headers
* @param {Object | string} [options.querystring] - Request's query string
* @param {number} [options.requestTimeout] - Max request timeout in milliseconds
*
* @param {function} callback - Callback that handles errors and response
*/
request(params, options, callback) {
options = options || {};
if (typeof options === 'function') {
callback = options;
options = {};
}
let p = null;
// promises support
if (callback === undefined) {
let onFulfilled = null;
let onRejected = null;
p = new Promise((resolve, reject) => {
onFulfilled = resolve;
onRejected = reject;
});
callback = function callback(err, result) {
err ? onRejected(err) : onFulfilled(result);
};
}
const meta = {
context: null,
request: {
params: null,
options: null,
id: options.id || this.generateRequestId(params, options),
},
name: this.name,
connection: null,
attempts: 0,
aborted: false,
};
if (this.context != null && options.context != null) {
meta.context = Object.assign({}, this.context, options.context);
} else if (this.context != null) {
meta.context = this.context;
} else if (options.context != null) {
meta.context = options.context;
}
const result = {
body: null,
statusCode: null,
headers: null,
meta,
};
Object.defineProperty(result, 'warnings', {
get() {
return this.headers && this.headers.warning
? this.headers.warning.split(/(?!\B"[^"]*),(?![^"]*"\B)/)
: null;
},
});
// We should not retry if we are sending a stream body, because we should store in memory
// a copy of the stream to be able to send it again, but since we don't know in advance
// the size of the stream, we risk to take too much memory.
// Furthermore, copying everytime the stream is very a expensive operation.
const maxRetries =
isStream(params.body) || isStream(params.bulkBody)
? 0
: typeof options.maxRetries === 'number'
? options.maxRetries
: this.maxRetries;
const compression = options.compression !== undefined ? options.compression : this.compression;
let request = { abort: noop };
const transportReturn = {
then(onFulfilled, onRejected) {
if (p != null) {
return p.then(onFulfilled, onRejected);
}
},
catch(onRejected) {
if (p != null) {
return p.catch(onRejected);
}
},
abort() {
meta.aborted = true;
request.abort();
debug('Aborting request', params);
return this;
},
finally(onFinally) {
if (p != null) {
return p.finally(onFinally);
}
},
};
const makeRequest = () => {
if (meta.aborted === true) {
return process.nextTick(callback, new RequestAbortedError(), result);
}
meta.connection = this.getConnection({ requestId: meta.request.id });
if (meta.connection == null) {
return process.nextTick(callback, new NoLivingConnectionsError(), result);
}
this.emit('request', null, result);
// perform the actual http request
request = meta.connection.request(params, onResponse);
};
const onConnectionError = (err) => {
if (err.name !== 'RequestAbortedError') {
// if there is an error in the connection
// let's mark the connection as dead
this.connectionPool.markDead(meta.connection);
if (this.sniffOnConnectionFault === true) {
this.sniff({
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
requestId: meta.request.id,
});
}
// retry logic
if (meta.attempts < maxRetries) {
meta.attempts++;
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params);
makeRequest();
return;
}
}
err.meta = result;
this.emit('response', err, result);
return callback(err, result);
};
const onResponse = (err, response) => {
if (err !== null) {
return onConnectionError(err);
}
result.statusCode = response.statusCode;
result.headers = response.headers;
if (options.asStream === true) {
result.body = response;
this.emit('response', null, result);
callback(null, result);
return;
}
const contentEncoding = (result.headers['content-encoding'] || '').toLowerCase();
const isCompressed =
contentEncoding.indexOf('gzip') > -1 || contentEncoding.indexOf('deflate') > -1;
/* istanbul ignore else */
if (result.headers['content-length'] !== undefined) {
const contentLength = Number(result.headers['content-length']);
// nodeJS data type limit check
if (isCompressed && contentLength > MAX_BUFFER_LENGTH) {
response.destroy();
return onConnectionError(
new RequestAbortedError(
`The content length (${contentLength}) is bigger than the maximum allowed buffer (${MAX_BUFFER_LENGTH})`,
result
)
);
} else if (contentLength > MAX_STRING_LENGTH) {
response.destroy();
return onConnectionError(
new RequestAbortedError(
`The content length (${contentLength}) is bigger than the maximum allowed string (${MAX_STRING_LENGTH})`,
result
)
);
} else if (shouldApplyCircuitBreaker(contentLength)) {
// Abort this response to avoid OOM crash of dashboards.
response.destroy();
return onConnectionError(
new RequestAbortedError(
`The content length (${contentLength}) is bigger than the maximum allowed heap memory limit.`,
result
)
);
}
}
// if the response is compressed, we must handle it
// as buffer for allowing decompression later
let payload = isCompressed ? [] : '';
const onData = isCompressed
? (chunk) => {
payload.push(chunk);
}
: (chunk) => {
payload += chunk;
};
const onEnd = (err) => {
response.removeListener('data', onData);
response.removeListener('end', onEnd);
response.removeListener('error', onEnd);
response.removeListener('aborted', onAbort);
if (err) {
return onConnectionError(new ConnectionError(err.message));
}
if (isCompressed) {
unzip(Buffer.concat(payload), onBody);
} else {
onBody(null, payload);
}
};
const onAbort = () => {
response.destroy();
onEnd(new Error('Response aborted while reading the body'));
};
if (!isCompressed) {
response.setEncoding('utf8');
}
this.emit('deserialization', null, result);
response.on('data', onData);
response.on('error', onEnd);
response.on('end', onEnd);
response.on('aborted', onAbort);
};
// Helper function to check if memory circuit breaker enabled and the response payload is too large to fit into available heap memory.
const shouldApplyCircuitBreaker = (contentLength) => {
if (!this.memoryCircuitBreaker || !this.memoryCircuitBreaker.enabled) return false;
const maxPercentage = validateMemoryPercentage(this.memoryCircuitBreaker.maxPercentage);
const heapUsed = process.memoryUsage().heapUsed;
return contentLength + heapUsed > HEAP_SIZE_LIMIT * maxPercentage;
};
const onBody = (err, payload) => {
if (err) {
this.emit('response', err, result);
return callback(err, result);
}
if (Buffer.isBuffer(payload)) {
payload = payload.toString();
}
const isHead = params.method === 'HEAD';
// we should attempt the payload deserialization only if:
// - a `content-type` is defined and is equal to `application/json`
// - the request is not a HEAD request
// - the payload is not an empty string
if (
result.headers['content-type'] !== undefined &&
(result.headers['content-type'].indexOf('application/json') > -1 ||
result.headers['content-type'].indexOf('application/vnd.opensearch+json') > -1) &&
isHead === false &&
payload !== ''
) {
try {
result.body = this.serializer.deserialize(payload);
} catch (err) {
this.emit('response', err, result);
return callback(err, result);
}
} else {
// cast to boolean if the request method was HEAD and there was no error
result.body = isHead === true && result.statusCode < 400 ? true : payload;
}
// we should ignore the statusCode if the user has configured the `ignore` field with
// the statusCode we just got or if the request method is HEAD and the statusCode is 404
const ignoreStatusCode =
(Array.isArray(options.ignore) && options.ignore.indexOf(result.statusCode) > -1) ||
(isHead === true && result.statusCode === 404);
if (
ignoreStatusCode === false &&
(result.statusCode === 502 || result.statusCode === 503 || result.statusCode === 504)
) {
// if the statusCode is 502/3/4 we should run our retry strategy
// and mark the connection as dead
this.connectionPool.markDead(meta.connection);
// retry logic (we should not retry on "429 - Too Many Requests")
if (meta.attempts < maxRetries && result.statusCode !== 429) {
meta.attempts++;
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params);
makeRequest();
return;
}
} else {
// everything has worked as expected, let's mark
// the connection as alive (or confirm it)
this.connectionPool.markAlive(meta.connection);
}
if (ignoreStatusCode === false && result.statusCode >= 400) {
const error = new ResponseError(result);
this.emit('response', error, result);
callback(error, result);
} else {
// cast to boolean if the request method was HEAD
if (isHead === true && result.statusCode === 404) {
result.body = false;
}
this.emit('response', null, result);
callback(null, result);
}
};
const prepareRequest = () => {
this.emit('serialization', null, result);
const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers));
if (options.opaqueId !== undefined) {
headers['x-opaque-id'] =
this.opaqueIdPrefix !== null ? this.opaqueIdPrefix + options.opaqueId : options.opaqueId;
}
// handle json body
if (params.body != null) {
if (shouldSerialize(params.body) === true) {
try {
params.body = this.serializer.serialize(params.body);
} catch (err) {
this.emit('request', err, result);
process.nextTick(callback, err, result);
return transportReturn;
}
}
if (params.body !== '') {
headers['content-type'] =
headers['content-type'] ||
(this[kApiVersioning]
? 'application/vnd.opensearch+json; compatible-with=7'
: 'application/json');
}
// handle ndjson body
} else if (params.bulkBody != null) {
if (shouldSerialize(params.bulkBody) === true) {
try {
params.body = this.serializer.ndserialize(params.bulkBody);
} catch (err) {
this.emit('request', err, result);
process.nextTick(callback, err, result);
return transportReturn;
}
} else {
params.body = params.bulkBody;
}
if (params.body !== '') {
headers['content-type'] =
headers['content-type'] ||
(this[kApiVersioning]
? 'application/vnd.opensearch+x-ndjson; compatible-with=7'
: 'application/x-ndjson');
}
}
params.headers = headers;
// serializes the querystring
if (options.querystring == null) {
params.querystring = this.serializer.qserialize(params.querystring);
} else {
params.querystring = this.serializer.qserialize(
Object.assign({}, params.querystring, options.querystring)
);
}
// handles request timeout
params.timeout = toMs(options.requestTimeout || this.requestTimeout);
if (options.asStream === true) params.asStream = true;
meta.request.params = params;
meta.request.options = options;
// handle compression
if (params.body !== '' && params.body != null) {
if (isStream(params.body) === true) {
if (compression === 'gzip') {
params.headers['content-encoding'] = compression;
params.body = params.body.pipe(createGzip());
}
makeRequest();
} else if (compression === 'gzip') {
gzip(params.body, (err, buffer) => {
/* istanbul ignore next */
if (err) {
this.emit('request', err, result);
return callback(err, result);
}
params.headers['content-encoding'] = compression;
params.headers['content-length'] = '' + Buffer.byteLength(buffer);
params.body = buffer;
makeRequest();
});
} else {
params.headers['content-length'] = '' + Buffer.byteLength(params.body);
makeRequest();
}
} else {
makeRequest();
}
};
prepareRequest();
return transportReturn;
}
getConnection(opts) {
const now = Date.now();
if (this._sniffEnabled === true && now > this._nextSniff) {
this.sniff({ reason: Transport.sniffReasons.SNIFF_INTERVAL, requestId: opts.requestId });
}
return this.connectionPool.getConnection({
filter: this.nodeFilter,
selector: this.nodeSelector,
requestId: opts.requestId,
name: this.name,
now,
});
}
sniff(opts, callback = noop) {
if (this._isSniffing === true) return;
this._isSniffing = true;
debug('Started sniffing request');
if (typeof opts === 'function') {
callback = opts;
opts = { reason: Transport.sniffReasons.DEFAULT };
}
const { reason } = opts;
const request = {
method: 'GET',
path: this.sniffEndpoint,
};
this.request(request, { id: opts.requestId }, (err, result) => {
this._isSniffing = false;
if (this._sniffEnabled === true) {
this._nextSniff = Date.now() + this.sniffInterval;
}
if (err != null) {
debug('Sniffing errored', err);
result.meta.sniff = { hosts: [], reason };
this.emit('sniff', err, result);
return callback(err);
}
debug('Sniffing ended successfully', result.body);
const protocol = result.meta.connection.url.protocol || /* istanbul ignore next */ 'http:';
const hosts = this.connectionPool.nodesToHost(result.body.nodes, protocol);
this.connectionPool.update(hosts);
result.meta.sniff = { hosts, reason };
this.emit('sniff', null, result);
callback(null, hosts);
});
}
// checkCompatibleInfo validates whether the informations are compatible
checkCompatibleInfo() {
debug('Start compatible check');
this[kCompatibleCheck] = 1;
this.request(
{
method: 'GET',
path: '/',
},
(err, result) => {
this[kCompatibleCheck] = 3;
if (err) {
debug('compatible check failed', err);
if (err.statusCode === 401 || err.statusCode === 403) {
this[kCompatibleCheck] = 2;
process.emitWarning(
'The client is unable to verify the distribution due to security privileges on the server side. Some functionality may not be compatible if the server is running an unsupported product.'
);
compatibleCheckEmitter.emit('compatible-check', true);
} else {
this[kCompatibleCheck] = 0;
compatibleCheckEmitter.emit('compatible-check', false);
}
} else {
debug('Checking OpenSearch version', result.body, result.headers);
if (result.body.version == null || typeof result.body.version.number !== 'string') {
debug("Can't access OpenSearch version");
return compatibleCheckEmitter.emit('compatible-check', false);
}
const distribution = result.body.version.distribution;
const version = result.body.version.number.split('.');
const major = Number(version[0]);
// support OpenSearch validation
if (distribution === 'opensearch') {
debug('Valid OpenSearch distribution');
this[kCompatibleCheck] = 2;
return compatibleCheckEmitter.emit('compatible-check', true);
}
// support odfe > v7 validation
if (major !== 7) {
debug('Invalid distribution');
return compatibleCheckEmitter.emit('compatible-check', false);
}
debug('Valid OpenSearch distribution');
this[kCompatibleCheck] = 2;
compatibleCheckEmitter.emit('compatible-check', true);
}
}
);
}
}
Transport.sniffReasons = {
SNIFF_ON_START: 'sniff-on-start',
SNIFF_INTERVAL: 'sniff-interval',
SNIFF_ON_CONNECTION_FAULT: 'sniff-on-connection-fault',
// TODO: find a better name
DEFAULT: 'default',
};
function toMs(time) {
if (typeof time === 'string') {
return ms(time);
}
return time;
}
function shouldSerialize(obj) {
return (
typeof obj !== 'string' && typeof obj.pipe !== 'function' && Buffer.isBuffer(obj) === false
);
}
function isStream(obj) {
return obj != null && typeof obj.pipe === 'function';
}
function defaultNodeFilter(node) {
// avoid cluster_manager or master only nodes
// TODO: remove role check on master when master is not supported
if (
(node.roles.cluster_manager === true || node.roles.master === true) &&
node.roles.data === false &&
node.roles.ingest === false
) {
return false;
}
return true;
}
function roundRobinSelector() {
let current = -1;
return function _roundRobinSelector(connections) {
if (++current >= connections.length) {
current = 0;
}
return connections[current];
};
}
function randomSelector(connections) {
const index = Math.floor(Math.random() * connections.length);
return connections[index];
}
function generateRequestId() {
const maxInt = 2147483647;
let nextReqId = 0;
return function genReqId() {
return (nextReqId = (nextReqId + 1) & maxInt);
};
}
function lowerCaseHeaders(oldHeaders) {
if (oldHeaders == null) return oldHeaders;
const newHeaders = {};
for (const header in oldHeaders) {
newHeaders[header.toLowerCase()] = oldHeaders[header];
}
return newHeaders;
}
function validateMemoryPercentage(percentage) {
if (percentage < 0 || percentage > 1) return 1.0;
return percentage;
}
module.exports = Transport;
module.exports.internals = {
defaultNodeFilter,
roundRobinSelector,
randomSelector,
generateRequestId,
lowerCaseHeaders,
};