1: <?php
2:
3: declare(strict_types=1);
4:
5: /**
6: * Copyright OpenSearch Contributors
7: * SPDX-License-Identifier: Apache-2.0
8: *
9: * OpenSearch PHP client
10: *
11: * @link https://github.com/opensearch-project/opensearch-php/
12: * @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co)
13: * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
14: * @license https://www.gnu.org/licenses/lgpl-2.1.html GNU Lesser General Public License, Version 2.1
15: *
16: * Licensed to Elasticsearch B.V under one or more agreements.
17: * Elasticsearch B.V licenses this file to you under the Apache 2.0 License or
18: * the GNU Lesser General Public License, Version 2.1, at your option.
19: * See the LICENSE file in the project root for more information.
20: */
21:
22: namespace OpenSearch\Connections;
23:
24: use OpenSearch\Client;
25: use OpenSearch\Common\Exceptions\BadRequest400Exception;
26: use OpenSearch\Common\Exceptions\Conflict409Exception;
27: use OpenSearch\Common\Exceptions\Curl\CouldNotConnectToHost;
28: use OpenSearch\Common\Exceptions\Curl\CouldNotResolveHostException;
29: use OpenSearch\Common\Exceptions\Curl\OperationTimeoutException;
30: use OpenSearch\Common\Exceptions\OpenSearchException;
31: use OpenSearch\Common\Exceptions\Forbidden403Exception;
32: use OpenSearch\Common\Exceptions\MaxRetriesException;
33: use OpenSearch\Common\Exceptions\Missing404Exception;
34: use OpenSearch\Common\Exceptions\NoDocumentsToGetException;
35: use OpenSearch\Common\Exceptions\NoShardAvailableException;
36: use OpenSearch\Common\Exceptions\RequestTimeout408Exception;
37: use OpenSearch\Common\Exceptions\RoutingMissingException;
38: use OpenSearch\Common\Exceptions\ScriptLangNotSupportedException;
39: use OpenSearch\Common\Exceptions\ServerErrorResponseException;
40: use OpenSearch\Common\Exceptions\TransportException;
41: use OpenSearch\Common\Exceptions\Unauthorized401Exception;
42: use OpenSearch\Serializers\SerializerInterface;
43: use OpenSearch\Transport;
44: use Exception;
45: use GuzzleHttp\Ring\Core;
46: use GuzzleHttp\Ring\Exception\ConnectException;
47: use GuzzleHttp\Ring\Exception\RingException;
48: use Psr\Log\LoggerInterface;
49:
50: class Connection implements ConnectionInterface
51: {
52: /**
53: * @var callable
54: */
55: protected $handler;
56:
57: /**
58: * @var SerializerInterface
59: */
60: protected $serializer;
61:
62: /**
63: * @var string
64: */
65: protected $transportSchema = 'http'; // TODO depreciate this default
66:
67: /**
68: * @var string
69: */
70: protected $host;
71:
72: /**
73: * @var string|null
74: */
75: protected $path;
76:
77: /**
78: * @var int
79: */
80: protected $port;
81:
82: /**
83: * @var LoggerInterface
84: */
85: protected $log;
86:
87: /**
88: * @var LoggerInterface
89: */
90: protected $trace;
91:
92: /**
93: * @var array
94: */
95: protected $connectionParams;
96:
97: /**
98: * @var array<string, list<string>>
99: */
100: protected $headers = [];
101:
102: /**
103: * @var bool
104: */
105: protected $isAlive = false;
106:
107: /**
108: * @var float
109: */
110: private $pingTimeout = 1; //TODO expose this
111:
112: /**
113: * @var int
114: */
115: private $lastPing = 0;
116:
117: /**
118: * @var int
119: */
120: private $failedPings = 0;
121:
122: /**
123: * @var mixed[]
124: */
125: private $lastRequest = array();
126:
127: /**
128: * @var string
129: */
130: private $OSVersion = null;
131:
132: /**
133: * @param array{host: string, port?: int, scheme?: string, user?: string, pass?: string, path?: string} $hostDetails
134: * @param array{client?: array{headers?: array<string, list<string>>, curl?: array<int, mixed>}} $connectionParams
135: */
136: public function __construct(
137: callable $handler,
138: array $hostDetails,
139: array $connectionParams,
140: SerializerInterface $serializer,
141: LoggerInterface $log,
142: LoggerInterface $trace
143: ) {
144: if (isset($hostDetails['port']) !== true) {
145: $hostDetails['port'] = 9200;
146: }
147:
148: if (isset($hostDetails['scheme'])) {
149: $this->transportSchema = $hostDetails['scheme'];
150: }
151:
152: // Only Set the Basic if API Key is not set and setBasicAuthentication was not called prior
153: if (isset($connectionParams['client']['headers']['Authorization']) === false
154: && isset($connectionParams['client']['curl'][CURLOPT_HTTPAUTH]) === false
155: && isset($hostDetails['user'])
156: && isset($hostDetails['pass'])
157: ) {
158: $connectionParams['client']['curl'][CURLOPT_HTTPAUTH] = CURLAUTH_BASIC;
159: $connectionParams['client']['curl'][CURLOPT_USERPWD] = $hostDetails['user'].':'.$hostDetails['pass'];
160: }
161:
162: $connectionParams['client']['curl'][CURLOPT_PORT] = $hostDetails['port'];
163:
164: if (isset($connectionParams['client']['headers'])) {
165: $this->headers = $connectionParams['client']['headers'];
166: unset($connectionParams['client']['headers']);
167: }
168:
169: // Add the User-Agent using the format: <client-repo-name>/<client-version> (metadata-values)
170: $this->headers['User-Agent'] = [sprintf(
171: 'opensearch-php/%s (%s %s; PHP %s)',
172: Client::VERSION,
173: PHP_OS,
174: $this->getOSVersion(),
175: PHP_VERSION
176: )];
177:
178: $host = $hostDetails['host'];
179: $path = null;
180: if (isset($hostDetails['path']) === true) {
181: $path = $hostDetails['path'];
182: }
183: $port = $hostDetails['port'];
184:
185: $this->host = $host;
186: $this->path = $path;
187: $this->port = $port;
188: $this->log = $log;
189: $this->trace = $trace;
190: $this->connectionParams = $connectionParams;
191: $this->serializer = $serializer;
192:
193: $this->handler = $this->wrapHandler($handler);
194: }
195:
196: /**
197: * @param string $method
198: * @param string $uri
199: * @param null|array<string, mixed> $params
200: * @param mixed $body
201: * @param array $options
202: * @param Transport|null $transport
203: * @return mixed
204: */
205: public function performRequest(string $method, string $uri, ?array $params = [], $body = null, array $options = [], ?Transport $transport = null)
206: {
207: if ($body !== null) {
208: $body = $this->serializer->serialize($body);
209: }
210:
211: $headers = $this->headers;
212: if (isset($options['client']['headers']) && is_array($options['client']['headers'])) {
213: $headers = array_merge($this->headers, $options['client']['headers']);
214: }
215:
216: $host = $this->host;
217: if (isset($this->connectionParams['client']['port_in_header']) && $this->connectionParams['client']['port_in_header']) {
218: $host .= ':' . $this->port;
219: }
220:
221: $request = [
222: 'http_method' => $method,
223: 'scheme' => $this->transportSchema,
224: 'uri' => $this->getURI($uri, $params),
225: 'body' => $body,
226: 'headers' => array_merge(
227: [
228: 'Host' => [$host]
229: ],
230: $headers
231: )
232: ];
233:
234: $request = array_replace_recursive($request, $this->connectionParams, $options);
235:
236: // RingPHP does not like if client is empty
237: if (empty($request['client'])) {
238: unset($request['client']);
239: }
240:
241: $handler = $this->handler;
242: $future = $handler($request, $this, $transport, $options);
243:
244: return $future;
245: }
246:
247: public function getTransportSchema(): string
248: {
249: return $this->transportSchema;
250: }
251:
252: public function getLastRequestInfo(): array
253: {
254: return $this->lastRequest;
255: }
256:
257: private function wrapHandler(callable $handler): callable
258: {
259: return function (array $request, Connection $connection, ?Transport $transport, $options) use ($handler) {
260: $this->lastRequest = [];
261: $this->lastRequest['request'] = $request;
262:
263: // Send the request using the wrapped handler.
264: $response = Core::proxy(
265: $handler($request),
266: function ($response) use ($connection, $transport, $request, $options) {
267: $this->lastRequest['response'] = $response;
268:
269: if (isset($response['error']) === true) {
270: if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) {
271: $this->log->warning("Curl exception encountered.");
272:
273: $exception = $this->getCurlRetryException($request, $response);
274:
275: $this->logRequestFail($request, $response, $exception);
276:
277: $node = $connection->getHost();
278: $this->log->warning("Marking node $node dead.");
279: $connection->markDead();
280:
281: // If the transport has not been set, we are inside a Ping or Sniff,
282: // so we don't want to retrigger retries anyway.
283: //
284: // TODO this could be handled better, but we are limited because connectionpools do not
285: // have access to Transport. Architecturally, all of this needs to be refactored
286: if (isset($transport) === true) {
287: $transport->connectionPool->scheduleCheck();
288:
289: $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false;
290: $shouldRetry = $transport->shouldRetry($request);
291: $shouldRetryText = ($shouldRetry) ? 'true' : 'false';
292:
293: $this->log->warning("Retries left? $shouldRetryText");
294: if ($shouldRetry && !$neverRetry) {
295: return $transport->performRequest(
296: $request['http_method'],
297: $request['uri'],
298: [],
299: $request['body'],
300: $options
301: );
302: }
303: }
304:
305: $this->log->warning("Out of retries, throwing exception from $node");
306: // Only throw if we run out of retries
307: throw $exception;
308: } else {
309: // Something went seriously wrong, bail
310: $exception = new TransportException($response['error']->getMessage());
311: $this->logRequestFail($request, $response, $exception);
312: throw $exception;
313: }
314: } else {
315: $connection->markAlive();
316:
317: if (isset($response['headers']['Warning'])) {
318: $this->logWarning($request, $response);
319: }
320: if (isset($response['body']) === true) {
321: $response['body'] = stream_get_contents($response['body']);
322: $this->lastRequest['response']['body'] = $response['body'];
323: }
324:
325: if ($response['status'] >= 400 && $response['status'] < 500) {
326: $ignore = $request['client']['ignore'] ?? [];
327: // Skip 404 if succeeded true in the body (e.g. clear_scroll)
328: $body = $response['body'] ?? '';
329: if (strpos($body, '"succeeded":true') !== false) {
330: $ignore[] = 404;
331: }
332: $this->process4xxError($request, $response, $ignore);
333: } elseif ($response['status'] >= 500) {
334: $ignore = $request['client']['ignore'] ?? [];
335: $this->process5xxError($request, $response, $ignore);
336: }
337:
338: // No error, deserialize
339: $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']);
340: }
341: $this->logRequestSuccess($request, $response);
342:
343: return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body'];
344: }
345: );
346:
347: return $response;
348: };
349: }
350:
351: /**
352: * @param array<string, string|int|bool>|null $params
353: */
354: private function getURI(string $uri, ?array $params): string
355: {
356: if (isset($params) === true && !empty($params)) {
357: $params = array_map(
358: function ($value) {
359: if ($value === true) {
360: return 'true';
361: } elseif ($value === false) {
362: return 'false';
363: }
364:
365: return $value;
366: },
367: $params
368: );
369:
370: $uri .= '?' . http_build_query($params);
371: }
372:
373: if ($this->path !== null) {
374: $uri = $this->path . $uri;
375: }
376:
377: return $uri;
378: }
379:
380: /**
381: * @return array<string, list<string>>
382: */
383: public function getHeaders(): array
384: {
385: return $this->headers;
386: }
387:
388: public function logWarning(array $request, array $response): void
389: {
390: $this->log->warning('Deprecation', $response['headers']['Warning']);
391: }
392:
393: /**
394: * Log a successful request
395: *
396: * @param array $request
397: * @param array $response
398: * @return void
399: */
400: public function logRequestSuccess(array $request, array $response): void
401: {
402: $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? '';
403: $uri = $this->addPortInUrl($response['effective_url'], (int) $port);
404:
405: $this->log->debug('Request Body', array($request['body']));
406: $this->log->info(
407: 'Request Success:',
408: array(
409: 'method' => $request['http_method'],
410: 'uri' => $uri,
411: 'port' => $port,
412: 'headers' => $request['headers'],
413: 'HTTP code' => $response['status'],
414: 'duration' => $response['transfer_stats']['total_time'],
415: )
416: );
417: $this->log->debug('Response', array($response['body']));
418:
419: // Build the curl command for Trace.
420: $curlCommand = $this->buildCurlCommand($request['http_method'], $uri, $request['body']);
421: $this->trace->info($curlCommand);
422: $this->trace->debug(
423: 'Response:',
424: array(
425: 'response' => $response['body'],
426: 'method' => $request['http_method'],
427: 'uri' => $uri,
428: 'port' => $port,
429: 'HTTP code' => $response['status'],
430: 'duration' => $response['transfer_stats']['total_time'],
431: )
432: );
433: }
434:
435: /**
436: * Log a failed request
437: *
438: * @param array $request
439: * @param array $response
440: * @param \Throwable $exception
441: *
442: * @return void
443: */
444: public function logRequestFail(array $request, array $response, \Throwable $exception): void
445: {
446: $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? '';
447: $uri = $this->addPortInUrl($response['effective_url'], (int) $port);
448:
449: $this->log->debug('Request Body', array($request['body']));
450: $this->log->warning(
451: 'Request Failure:',
452: array(
453: 'method' => $request['http_method'],
454: 'uri' => $uri,
455: 'port' => $port,
456: 'headers' => $request['headers'],
457: 'HTTP code' => $response['status'],
458: 'duration' => $response['transfer_stats']['total_time'],
459: 'error' => $exception->getMessage(),
460: )
461: );
462: $this->log->warning('Response', array($response['body']));
463:
464: // Build the curl command for Trace.
465: $curlCommand = $this->buildCurlCommand($request['http_method'], $uri, $request['body']);
466: $this->trace->info($curlCommand);
467: $this->trace->debug(
468: 'Response:',
469: array(
470: 'response' => $response,
471: 'method' => $request['http_method'],
472: 'uri' => $uri,
473: 'port' => $port,
474: 'HTTP code' => $response['status'],
475: 'duration' => $response['transfer_stats']['total_time'],
476: )
477: );
478: }
479:
480: public function ping(): bool
481: {
482: $options = [
483: 'client' => [
484: 'timeout' => $this->pingTimeout,
485: 'never_retry' => true,
486: 'verbose' => true
487: ]
488: ];
489: try {
490: $response = $this->performRequest('HEAD', '/', null, null, $options);
491: $response = $response->wait();
492: } catch (TransportException $exception) {
493: $this->markDead();
494:
495: return false;
496: }
497:
498: if ($response['status'] === 200) {
499: $this->markAlive();
500:
501: return true;
502: } else {
503: $this->markDead();
504:
505: return false;
506: }
507: }
508:
509: /**
510: * @return array|\GuzzleHttp\Ring\Future\FutureArray
511: */
512: public function sniff()
513: {
514: $options = [
515: 'client' => [
516: 'timeout' => $this->pingTimeout,
517: 'never_retry' => true
518: ]
519: ];
520:
521: return $this->performRequest('GET', '/_nodes/', null, null, $options);
522: }
523:
524: public function isAlive(): bool
525: {
526: return $this->isAlive;
527: }
528:
529: public function markAlive(): void
530: {
531: $this->failedPings = 0;
532: $this->isAlive = true;
533: $this->lastPing = time();
534: }
535:
536: public function markDead(): void
537: {
538: $this->isAlive = false;
539: $this->failedPings += 1;
540: $this->lastPing = time();
541: }
542:
543: public function getLastPing(): int
544: {
545: return $this->lastPing;
546: }
547:
548: public function getPingFailures(): int
549: {
550: return $this->failedPings;
551: }
552:
553: public function getHost(): string
554: {
555: return $this->host;
556: }
557:
558: public function getUserPass(): ?string
559: {
560: return $this->connectionParams['client']['curl'][CURLOPT_USERPWD] ?? null;
561: }
562:
563: public function getPath(): ?string
564: {
565: return $this->path;
566: }
567:
568: /**
569: * @return int
570: */
571: public function getPort()
572: {
573: return $this->port;
574: }
575:
576: protected function getCurlRetryException(array $request, array $response): OpenSearchException
577: {
578: $exception = null;
579: $message = $response['error']->getMessage();
580: $exception = new MaxRetriesException($message);
581: switch ($response['curl']['errno']) {
582: case 6:
583: $exception = new CouldNotResolveHostException($message, 0, $exception);
584: break;
585: case 7:
586: $exception = new CouldNotConnectToHost($message, 0, $exception);
587: break;
588: case 28:
589: $exception = new OperationTimeoutException($message, 0, $exception);
590: break;
591: }
592:
593: return $exception;
594: }
595:
596: /**
597: * Get the OS version using php_uname if available
598: * otherwise it returns an empty string
599: *
600: * @see https://github.com/elastic/elasticsearch-php/issues/922
601: */
602: private function getOSVersion(): string
603: {
604: if ($this->OSVersion === null) {
605: $this->OSVersion = strpos(strtolower(ini_get('disable_functions')), 'php_uname') !== false
606: ? ''
607: : php_uname("r");
608: }
609: return $this->OSVersion;
610: }
611:
612: /**
613: * Add the port value in the URL if not present
614: */
615: private function addPortInUrl(string $uri, int $port): string
616: {
617: if (strpos($uri, ':', 7) !== false) {
618: return $uri;
619: }
620: return preg_replace('#([^/])/([^/])#', sprintf("$1:%s/$2", $port), $uri, 1);
621: }
622:
623: /**
624: * Construct a string cURL command
625: */
626: private function buildCurlCommand(string $method, string $url, ?string $body): string
627: {
628: if (strpos($url, '?') === false) {
629: $url .= '?pretty=true';
630: } else {
631: str_replace('?', '?pretty=true', $url);
632: }
633:
634: $curlCommand = 'curl -X' . strtoupper($method);
635: $curlCommand .= " '" . $url . "'";
636:
637: if (isset($body) === true && $body !== '') {
638: $curlCommand .= " -d '" . $body . "'";
639: }
640:
641: return $curlCommand;
642: }
643:
644: /**
645: * @throws OpenSearchException
646: */
647: private function process4xxError(array $request, array $response, array $ignore): void
648: {
649: $statusCode = $response['status'];
650:
651: /**
652: * @var \Exception $exception
653: */
654: $exception = $this->tryDeserialize400Error($response);
655:
656: if (array_search($response['status'], $ignore) !== false) {
657: return;
658: }
659:
660: $responseBody = $this->convertBodyToString($response['body'], $statusCode, $exception);
661: if ($statusCode === 401) {
662: $exception = new Unauthorized401Exception($responseBody, $statusCode);
663: } elseif ($statusCode === 403) {
664: $exception = new Forbidden403Exception($responseBody, $statusCode);
665: } elseif ($statusCode === 404) {
666: $exception = new Missing404Exception($responseBody, $statusCode);
667: } elseif ($statusCode === 409) {
668: $exception = new Conflict409Exception($responseBody, $statusCode);
669: } elseif ($statusCode === 400 && strpos($responseBody, 'script_lang not supported') !== false) {
670: $exception = new ScriptLangNotSupportedException($responseBody. $statusCode);
671: } elseif ($statusCode === 408) {
672: $exception = new RequestTimeout408Exception($responseBody, $statusCode);
673: } else {
674: $exception = new BadRequest400Exception($responseBody, $statusCode);
675: }
676:
677: $this->logRequestFail($request, $response, $exception);
678:
679: throw $exception;
680: }
681:
682: /**
683: * @throws OpenSearchException
684: */
685: private function process5xxError(array $request, array $response, array $ignore): void
686: {
687: $statusCode = (int) $response['status'];
688: $responseBody = $response['body'];
689:
690: /**
691: * @var \Exception $exception
692: */
693: $exception = $this->tryDeserialize500Error($response);
694:
695: $exceptionText = "[$statusCode Server Exception] ".$exception->getMessage();
696: $this->log->error($exceptionText);
697: $this->log->error($exception->getTraceAsString());
698:
699: if (array_search($statusCode, $ignore) !== false) {
700: return;
701: }
702:
703: if ($statusCode === 500 && strpos($responseBody, "RoutingMissingException") !== false) {
704: $exception = new RoutingMissingException($exception->getMessage(), $statusCode, $exception);
705: } elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/', $responseBody) === 1) {
706: $exception = new NoDocumentsToGetException($exception->getMessage(), $statusCode, $exception);
707: } elseif ($statusCode === 500 && strpos($responseBody, 'NoShardAvailableActionException') !== false) {
708: $exception = new NoShardAvailableException($exception->getMessage(), $statusCode, $exception);
709: } else {
710: $exception = new ServerErrorResponseException(
711: $this->convertBodyToString($responseBody, $statusCode, $exception),
712: $statusCode
713: );
714: }
715:
716: $this->logRequestFail($request, $response, $exception);
717:
718: throw $exception;
719: }
720:
721: private function convertBodyToString($body, int $statusCode, Exception $exception): string
722: {
723: if (empty($body)) {
724: return sprintf(
725: "Unknown %d error from OpenSearch %s",
726: $statusCode,
727: $exception->getMessage()
728: );
729: }
730: // if body is not string, we convert it so it can be used as Exception message
731: if (!is_string($body)) {
732: return json_encode($body);
733: }
734: return $body;
735: }
736:
737: private function tryDeserialize400Error(array $response): OpenSearchException
738: {
739: return $this->tryDeserializeError($response, BadRequest400Exception::class);
740: }
741:
742: private function tryDeserialize500Error(array $response): OpenSearchException
743: {
744: return $this->tryDeserializeError($response, ServerErrorResponseException::class);
745: }
746:
747: private function tryDeserializeError(array $response, string $errorClass): OpenSearchException
748: {
749: $error = $this->serializer->deserialize($response['body'], $response['transfer_stats']);
750: if (is_array($error) === true) {
751: if (isset($error['error']) === false) {
752: // <2.0 "i just blew up" nonstructured exception
753: // $error is an array but we don't know the format, reuse the response body instead
754: // added json_encode to convert into a string
755: return new $errorClass(json_encode($response['body']), (int) $response['status']);
756: }
757:
758: // 2.0 structured exceptions
759: if (is_array($error['error']) && array_key_exists('reason', $error['error']) === true) {
760: // Try to use root cause first (only grabs the first root cause)
761: $info = $error['error']['root_cause'][0] ?? $error['error'];
762: $cause = $info['reason'];
763: $type = $info['type'];
764: // added json_encode to convert into a string
765: $original = new $errorClass(json_encode($response['body']), $response['status']);
766:
767: return new $errorClass("$type: $cause", (int) $response['status'], $original);
768: }
769: // <2.0 semi-structured exceptions
770: // added json_encode to convert into a string
771: $original = new $errorClass(json_encode($response['body']), $response['status']);
772:
773: $errorEncoded = $error['error'];
774: if (is_array($errorEncoded)) {
775: $errorEncoded = json_encode($errorEncoded);
776: }
777: return new $errorClass($errorEncoded, (int) $response['status'], $original);
778: }
779:
780: // if responseBody is not string, we convert it so it can be used as Exception message
781: $responseBody = $response['body'];
782: if (!is_string($responseBody)) {
783: $responseBody = json_encode($responseBody);
784: }
785:
786: // <2.0 "i just blew up" nonstructured exception
787: return new $errorClass($responseBody);
788: }
789: }
790: