1: <?php
2:
3: declare(strict_types=1);
4:
5: /**
6: * Copyright OpenSearch Contributors
7: * SPDX-License-Identifier: Apache-2.0
8: *
9: * OpenSearch PHP client
10: *
11: * @link https://github.com/opensearch-project/opensearch-php/
12: * @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co)
13: * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
14: * @license https://www.gnu.org/licenses/lgpl-2.1.html GNU Lesser General Public License, Version 2.1
15: *
16: * Licensed to Elasticsearch B.V under one or more agreements.
17: * Elasticsearch B.V licenses this file to you under the Apache 2.0 License or
18: * the GNU Lesser General Public License, Version 2.1, at your option.
19: * See the LICENSE file in the project root for more information.
20: */
21:
22: namespace OpenSearch\Connections;
23:
24: use Exception;
25: use GuzzleHttp\Ring\Core;
26: use GuzzleHttp\Ring\Exception\ConnectException;
27: use GuzzleHttp\Ring\Exception\RingException;
28: use OpenSearch\Client;
29: use OpenSearch\Common\Exceptions\BadRequest400Exception;
30: use OpenSearch\Common\Exceptions\Conflict409Exception;
31: use OpenSearch\Common\Exceptions\Curl\CouldNotConnectToHost;
32: use OpenSearch\Common\Exceptions\Curl\CouldNotResolveHostException;
33: use OpenSearch\Common\Exceptions\Curl\OperationTimeoutException;
34: use OpenSearch\Common\Exceptions\Forbidden403Exception;
35: use OpenSearch\Common\Exceptions\MaxRetriesException;
36: use OpenSearch\Common\Exceptions\Missing404Exception;
37: use OpenSearch\Common\Exceptions\NoDocumentsToGetException;
38: use OpenSearch\Common\Exceptions\NoShardAvailableException;
39: use OpenSearch\Common\Exceptions\OpenSearchException;
40: use OpenSearch\Common\Exceptions\RequestTimeout408Exception;
41: use OpenSearch\Common\Exceptions\RoutingMissingException;
42: use OpenSearch\Common\Exceptions\ScriptLangNotSupportedException;
43: use OpenSearch\Common\Exceptions\ServerErrorResponseException;
44: use OpenSearch\Common\Exceptions\TransportException;
45: use OpenSearch\Common\Exceptions\Unauthorized401Exception;
46: use OpenSearch\Serializers\SerializerInterface;
47: use OpenSearch\Transport;
48: use Psr\Log\LoggerInterface;
49:
50: // @phpstan-ignore classConstant.deprecatedClass
51: @trigger_error(Connection::class . ' is deprecated in 2.4.0 and will be removed in 3.0.0.', E_USER_DEPRECATED);
52:
53: /**
54: * @deprecated in 2.4.0 and will be removed in 3.0.0.
55: */
56: class Connection implements ConnectionInterface
57: {
58: /**
59: * @var callable
60: */
61: protected $handler;
62:
63: /**
64: * @var SerializerInterface
65: */
66: protected $serializer;
67:
68: /**
69: * @var string
70: */
71: protected $transportSchema = 'http'; // TODO depreciate this default
72:
73: /**
74: * @var string
75: */
76: protected $host;
77:
78: /**
79: * @var string|null
80: */
81: protected $path;
82:
83: /**
84: * @var int
85: */
86: protected $port;
87:
88: /**
89: * @var LoggerInterface
90: */
91: protected $log;
92:
93: /**
94: * @var LoggerInterface
95: */
96: protected $trace;
97:
98: /**
99: * @var array
100: */
101: protected $connectionParams;
102:
103: /**
104: * @var array<string, list<string>>
105: */
106: protected $headers = [];
107:
108: /**
109: * @var bool
110: */
111: protected $isAlive = false;
112:
113: /**
114: * @var float
115: */
116: private $pingTimeout = 1; //TODO expose this
117:
118: /**
119: * @var int
120: */
121: private $lastPing = 0;
122:
123: /**
124: * @var int
125: */
126: private $failedPings = 0;
127:
128: /**
129: * @var mixed[]
130: */
131: private $lastRequest = array();
132:
133: /**
134: * @var string
135: */
136: private $OSVersion = null;
137:
138: /**
139: * @param array{host: string, port?: int, scheme?: string, user?: string, pass?: string, path?: string} $hostDetails
140: * @param array{client?: array{headers?: array<string, list<string>>, curl?: array<int, mixed>}} $connectionParams
141: */
142: public function __construct(
143: callable $handler,
144: array $hostDetails,
145: array $connectionParams,
146: SerializerInterface $serializer,
147: LoggerInterface $log,
148: LoggerInterface $trace
149: ) {
150: if (isset($hostDetails['port']) !== true) {
151: $hostDetails['port'] = 9200;
152: }
153:
154: if (isset($hostDetails['scheme'])) {
155: $this->transportSchema = $hostDetails['scheme'];
156: }
157:
158: // Only Set the Basic if API Key is not set and setBasicAuthentication was not called prior
159: if (isset($connectionParams['client']['headers']['Authorization']) === false
160: && isset($connectionParams['client']['curl'][CURLOPT_HTTPAUTH]) === false
161: && isset($hostDetails['user'])
162: && isset($hostDetails['pass'])
163: ) {
164: $connectionParams['client']['curl'][CURLOPT_HTTPAUTH] = CURLAUTH_BASIC;
165: $connectionParams['client']['curl'][CURLOPT_USERPWD] = $hostDetails['user'].':'.$hostDetails['pass'];
166: }
167:
168: $connectionParams['client']['curl'][CURLOPT_PORT] = $hostDetails['port'];
169:
170: if (isset($connectionParams['client']['headers'])) {
171: $this->headers = $connectionParams['client']['headers'];
172: unset($connectionParams['client']['headers']);
173: }
174:
175: // Add the User-Agent using the format: <client-repo-name>/<client-version> (metadata-values)
176: $this->headers['User-Agent'] = [sprintf(
177: 'opensearch-php/%s (%s %s; PHP %s)',
178: Client::VERSION,
179: PHP_OS,
180: $this->getOSVersion(),
181: PHP_VERSION
182: )];
183:
184: $host = $hostDetails['host'];
185: $path = null;
186: if (isset($hostDetails['path']) === true) {
187: $path = $hostDetails['path'];
188: }
189: $port = $hostDetails['port'];
190:
191: $this->host = $host;
192: $this->path = $path;
193: $this->port = $port;
194: $this->log = $log;
195: $this->trace = $trace;
196: $this->connectionParams = $connectionParams;
197: $this->serializer = $serializer;
198:
199: $this->handler = $this->wrapHandler($handler);
200: }
201:
202: /**
203: * @param string $method
204: * @param string $uri
205: * @param null|array<string, mixed> $params
206: * @param mixed $body
207: * @param array $options
208: * @param Transport|null $transport
209: * @return mixed
210: */
211: public function performRequest(string $method, string $uri, ?array $params = [], $body = null, array $options = [], ?Transport $transport = null)
212: {
213: if ($body !== null) {
214: $body = $this->serializer->serialize($body);
215: }
216:
217: $headers = $this->headers;
218: if (isset($options['client']['headers']) && is_array($options['client']['headers'])) {
219: $headers = array_merge($this->headers, $options['client']['headers']);
220: }
221:
222: $host = $this->host;
223: if (isset($this->connectionParams['client']['port_in_header']) && $this->connectionParams['client']['port_in_header']) {
224: $host .= ':' . $this->port;
225: }
226:
227: $request = [
228: 'http_method' => $method,
229: 'scheme' => $this->transportSchema,
230: 'uri' => $this->getURI($uri, $params),
231: 'body' => $body,
232: 'headers' => array_merge(
233: [
234: 'Host' => [$host]
235: ],
236: $headers
237: )
238: ];
239:
240: $request = array_replace_recursive($request, $this->connectionParams, $options);
241:
242: // RingPHP does not like if client is empty
243: if (empty($request['client'])) {
244: unset($request['client']);
245: }
246:
247: $handler = $this->handler;
248: $future = $handler($request, $this, $transport, $options);
249:
250: return $future;
251: }
252:
253: public function getTransportSchema(): string
254: {
255: return $this->transportSchema;
256: }
257:
258: public function getLastRequestInfo(): array
259: {
260: return $this->lastRequest;
261: }
262:
263: private function wrapHandler(callable $handler): callable
264: {
265: return function (array $request, Connection $connection, ?Transport $transport, $options) use ($handler) {
266: $this->lastRequest = [];
267: $this->lastRequest['request'] = $request;
268:
269: // Send the request using the wrapped handler.
270: $response = Core::proxy(
271: $handler($request),
272: function ($response) use ($connection, $transport, $request, $options) {
273: $this->lastRequest['response'] = $response;
274:
275: if (isset($response['error']) === true) {
276: if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) {
277: $this->log->warning("Curl exception encountered.");
278:
279: $exception = $this->getCurlRetryException($request, $response);
280:
281: $this->logRequestFail($request, $response, $exception);
282:
283: $node = $connection->getHost();
284: $this->log->warning("Marking node $node dead.");
285: $connection->markDead();
286:
287: // If the transport has not been set, we are inside a Ping or Sniff,
288: // so we don't want to retrigger retries anyway.
289: //
290: // TODO this could be handled better, but we are limited because connectionpools do not
291: // have access to Transport. Architecturally, all of this needs to be refactored
292: if (isset($transport) === true) {
293: $transport->connectionPool->scheduleCheck();
294:
295: $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false;
296: $shouldRetry = $transport->shouldRetry($request);
297: $shouldRetryText = ($shouldRetry) ? 'true' : 'false';
298:
299: $this->log->warning("Retries left? $shouldRetryText");
300: if ($shouldRetry && !$neverRetry) {
301: return $transport->performRequest(
302: $request['http_method'],
303: $request['uri'],
304: [],
305: $request['body'],
306: $options
307: );
308: }
309: }
310:
311: $this->log->warning("Out of retries, throwing exception from $node");
312: // Only throw if we run out of retries
313: throw $exception;
314: } else {
315: // Something went seriously wrong, bail
316: $exception = new TransportException($response['error']->getMessage());
317: $this->logRequestFail($request, $response, $exception);
318: throw $exception;
319: }
320: } else {
321: $connection->markAlive();
322:
323: if (isset($response['headers']['Warning'])) {
324: $this->logWarning($request, $response);
325: }
326: if (isset($response['body']) === true) {
327: $response['body'] = stream_get_contents($response['body']);
328: $this->lastRequest['response']['body'] = $response['body'];
329: }
330:
331: if ($response['status'] >= 400 && $response['status'] < 500) {
332: $ignore = $request['client']['ignore'] ?? [];
333: // Skip 404 if succeeded true in the body (e.g. clear_scroll)
334: $body = $response['body'] ?? '';
335: if (strpos($body, '"succeeded":true') !== false) {
336: $ignore[] = 404;
337: }
338: $this->process4xxError($request, $response, $ignore);
339: } elseif ($response['status'] >= 500) {
340: $ignore = $request['client']['ignore'] ?? [];
341: $this->process5xxError($request, $response, $ignore);
342: }
343:
344: // No error, deserialize
345: $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']);
346: }
347: $this->logRequestSuccess($request, $response);
348:
349: return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body'];
350: }
351: );
352:
353: return $response;
354: };
355: }
356:
357: /**
358: * @param array<string, string|int|bool>|null $params
359: */
360: private function getURI(string $uri, ?array $params): string
361: {
362: if (isset($params) === true && !empty($params)) {
363: $params = array_map(
364: function ($value) {
365: if ($value === true) {
366: return 'true';
367: } elseif ($value === false) {
368: return 'false';
369: }
370:
371: return $value;
372: },
373: $params
374: );
375:
376: $uri .= '?' . http_build_query($params);
377: }
378:
379: if ($this->path !== null) {
380: $uri = $this->path . $uri;
381: }
382:
383: return $uri;
384: }
385:
386: /**
387: * @return array<string, list<string>>
388: */
389: public function getHeaders(): array
390: {
391: return $this->headers;
392: }
393:
394: public function logWarning(array $request, array $response): void
395: {
396: $this->log->warning('Deprecation', $response['headers']['Warning']);
397: }
398:
399: /**
400: * Log a successful request
401: *
402: * @param array $request
403: * @param array $response
404: * @return void
405: */
406: public function logRequestSuccess(array $request, array $response): void
407: {
408: $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? '';
409: $uri = $this->addPortInUrl($response['effective_url'], (int) $port);
410:
411: $this->log->debug('Request Body', array($request['body']));
412: $this->log->info(
413: 'Request Success:',
414: array(
415: 'method' => $request['http_method'],
416: 'uri' => $uri,
417: 'port' => $port,
418: 'headers' => $request['headers'],
419: 'HTTP code' => $response['status'],
420: 'duration' => $response['transfer_stats']['total_time'],
421: )
422: );
423: $this->log->debug('Response', array($response['body']));
424:
425: // Build the curl command for Trace.
426: $curlCommand = $this->buildCurlCommand($request['http_method'], $uri, $request['body']);
427: $this->trace->info($curlCommand);
428: $this->trace->debug(
429: 'Response:',
430: array(
431: 'response' => $response['body'],
432: 'method' => $request['http_method'],
433: 'uri' => $uri,
434: 'port' => $port,
435: 'HTTP code' => $response['status'],
436: 'duration' => $response['transfer_stats']['total_time'],
437: )
438: );
439: }
440:
441: /**
442: * Log a failed request
443: *
444: * @param array $request
445: * @param array $response
446: * @param \Throwable $exception
447: *
448: * @return void
449: */
450: public function logRequestFail(array $request, array $response, \Throwable $exception): void
451: {
452: $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? '';
453: $uri = $this->addPortInUrl($response['effective_url'], (int) $port);
454:
455: $this->log->debug('Request Body', array($request['body']));
456: $this->log->warning(
457: 'Request Failure:',
458: array(
459: 'method' => $request['http_method'],
460: 'uri' => $uri,
461: 'port' => $port,
462: 'headers' => $request['headers'],
463: 'HTTP code' => $response['status'],
464: 'duration' => $response['transfer_stats']['total_time'],
465: 'error' => $exception->getMessage(),
466: )
467: );
468: $this->log->warning('Response', array($response['body']));
469:
470: // Build the curl command for Trace.
471: $curlCommand = $this->buildCurlCommand($request['http_method'], $uri, $request['body']);
472: $this->trace->info($curlCommand);
473: $this->trace->debug(
474: 'Response:',
475: array(
476: 'response' => $response,
477: 'method' => $request['http_method'],
478: 'uri' => $uri,
479: 'port' => $port,
480: 'HTTP code' => $response['status'],
481: 'duration' => $response['transfer_stats']['total_time'],
482: )
483: );
484: }
485:
486: public function ping(): bool
487: {
488: $options = [
489: 'client' => [
490: 'timeout' => $this->pingTimeout,
491: 'never_retry' => true,
492: 'verbose' => true
493: ]
494: ];
495: try {
496: $response = $this->performRequest('HEAD', '/', null, null, $options);
497: $response = $response->wait();
498: } catch (TransportException $exception) {
499: $this->markDead();
500:
501: return false;
502: }
503:
504: if ($response['status'] === 200) {
505: $this->markAlive();
506:
507: return true;
508: } else {
509: $this->markDead();
510:
511: return false;
512: }
513: }
514:
515: /**
516: * @return array|\GuzzleHttp\Ring\Future\FutureArray
517: */
518: public function sniff()
519: {
520: $options = [
521: 'client' => [
522: 'timeout' => $this->pingTimeout,
523: 'never_retry' => true
524: ]
525: ];
526:
527: return $this->performRequest('GET', '/_nodes/', null, null, $options);
528: }
529:
530: public function isAlive(): bool
531: {
532: return $this->isAlive;
533: }
534:
535: public function markAlive(): void
536: {
537: $this->failedPings = 0;
538: $this->isAlive = true;
539: $this->lastPing = time();
540: }
541:
542: public function markDead(): void
543: {
544: $this->isAlive = false;
545: $this->failedPings += 1;
546: $this->lastPing = time();
547: }
548:
549: public function getLastPing(): int
550: {
551: return $this->lastPing;
552: }
553:
554: public function getPingFailures(): int
555: {
556: return $this->failedPings;
557: }
558:
559: public function getHost(): string
560: {
561: return $this->host;
562: }
563:
564: public function getUserPass(): ?string
565: {
566: return $this->connectionParams['client']['curl'][CURLOPT_USERPWD] ?? null;
567: }
568:
569: public function getPath(): ?string
570: {
571: return $this->path;
572: }
573:
574: /**
575: * @return int
576: */
577: public function getPort()
578: {
579: return $this->port;
580: }
581:
582: protected function getCurlRetryException(array $request, array $response): OpenSearchException
583: {
584: $exception = null;
585: $message = $response['error']->getMessage();
586: $exception = new MaxRetriesException($message);
587: switch ($response['curl']['errno']) {
588: case 6:
589: $exception = new CouldNotResolveHostException($message, 0, $exception);
590: break;
591: case 7:
592: $exception = new CouldNotConnectToHost($message, 0, $exception);
593: break;
594: case 28:
595: $exception = new OperationTimeoutException($message, 0, $exception);
596: break;
597: }
598:
599: return $exception;
600: }
601:
602: /**
603: * Get the OS version using php_uname if available
604: * otherwise it returns an empty string
605: *
606: * @see https://github.com/elastic/elasticsearch-php/issues/922
607: */
608: private function getOSVersion(): string
609: {
610: if ($this->OSVersion === null) {
611: $this->OSVersion = strpos(strtolower(ini_get('disable_functions')), 'php_uname') !== false
612: ? ''
613: : php_uname("r");
614: }
615: return $this->OSVersion;
616: }
617:
618: /**
619: * Add the port value in the URL if not present
620: */
621: private function addPortInUrl(string $uri, int $port): string
622: {
623: if (strpos($uri, ':', 7) !== false) {
624: return $uri;
625: }
626: return preg_replace('#([^/])/([^/])#', sprintf("$1:%s/$2", $port), $uri, 1);
627: }
628:
629: /**
630: * Construct a string cURL command
631: */
632: private function buildCurlCommand(string $method, string $url, ?string $body): string
633: {
634: if (strpos($url, '?') === false) {
635: $url .= '?pretty=true';
636: } else {
637: str_replace('?', '?pretty=true', $url);
638: }
639:
640: $curlCommand = 'curl -X' . strtoupper($method);
641: $curlCommand .= " '" . $url . "'";
642:
643: if (isset($body) === true && $body !== '') {
644: $curlCommand .= " -d '" . $body . "'";
645: }
646:
647: return $curlCommand;
648: }
649:
650: /**
651: * @throws OpenSearchException
652: */
653: private function process4xxError(array $request, array $response, array $ignore): void
654: {
655: $statusCode = $response['status'];
656:
657: /**
658: * @var \Exception $exception
659: */
660: $exception = $this->tryDeserialize400Error($response);
661:
662: if (array_search($response['status'], $ignore) !== false) {
663: return;
664: }
665:
666: $responseBody = $this->convertBodyToString($response['body'], $statusCode, $exception);
667: if ($statusCode === 401) {
668: $exception = new Unauthorized401Exception($responseBody);
669: } elseif ($statusCode === 403) {
670: $exception = new Forbidden403Exception($responseBody);
671: } elseif ($statusCode === 404) {
672: $exception = new Missing404Exception($responseBody);
673: } elseif ($statusCode === 409) {
674: $exception = new Conflict409Exception($responseBody, $statusCode);
675: } elseif ($statusCode === 400 && strpos($responseBody, 'script_lang not supported') !== false) {
676: $exception = new ScriptLangNotSupportedException($responseBody);
677: } elseif ($statusCode === 408) {
678: $exception = new RequestTimeout408Exception($responseBody);
679: } else {
680: $exception = new BadRequest400Exception($responseBody);
681: }
682:
683: $this->logRequestFail($request, $response, $exception);
684:
685: throw $exception;
686: }
687:
688: /**
689: * @throws OpenSearchException
690: */
691: private function process5xxError(array $request, array $response, array $ignore): void
692: {
693: $statusCode = (int) $response['status'];
694: $responseBody = $response['body'];
695:
696: /**
697: * @var \Exception $exception
698: */
699: $exception = $this->tryDeserialize500Error($response);
700:
701: $exceptionText = "[$statusCode Server Exception] ".$exception->getMessage();
702: $this->log->error($exceptionText);
703: $this->log->error($exception->getTraceAsString());
704:
705: if (array_search($statusCode, $ignore) !== false) {
706: return;
707: }
708:
709: if ($statusCode === 500 && strpos($responseBody, "RoutingMissingException") !== false) {
710: $exception = new RoutingMissingException($exception->getMessage(), [], 0, $exception);
711: } elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/', $responseBody) === 1) {
712: $exception = new NoDocumentsToGetException($exception->getMessage(), [], 0, $exception);
713: } elseif ($statusCode === 500 && strpos($responseBody, 'NoShardAvailableActionException') !== false) {
714: $exception = new NoShardAvailableException($exception->getMessage(), [], 0, $exception);
715: } else {
716: $exception = new ServerErrorResponseException(
717: $this->convertBodyToString($responseBody, $statusCode, $exception),
718: );
719: }
720:
721: $this->logRequestFail($request, $response, $exception);
722:
723: throw $exception;
724: }
725:
726: private function convertBodyToString($body, int $statusCode, Exception $exception): string
727: {
728: if (empty($body)) {
729: return sprintf(
730: "Unknown %d error from OpenSearch %s",
731: $statusCode,
732: $exception->getMessage()
733: );
734: }
735: // if body is not string, we convert it so it can be used as Exception message
736: if (!is_string($body)) {
737: return json_encode($body);
738: }
739: return $body;
740: }
741:
742: private function tryDeserialize400Error(array $response): OpenSearchException
743: {
744: return $this->tryDeserializeError($response, BadRequest400Exception::class);
745: }
746:
747: private function tryDeserialize500Error(array $response): OpenSearchException
748: {
749: return $this->tryDeserializeError($response, ServerErrorResponseException::class);
750: }
751:
752: private function tryDeserializeError(array $response, string $errorClass): OpenSearchException
753: {
754: $error = $this->serializer->deserialize($response['body'], $response['transfer_stats']);
755: if (is_array($error) === true) {
756: if (isset($error['error']) === false) {
757: // <2.0 "i just blew up" nonstructured exception
758: // $error is an array but we don't know the format, reuse the response body instead
759: // added json_encode to convert into a string
760: return new $errorClass(json_encode($response['body']), (int) $response['status']);
761: }
762:
763: // 2.0 structured exceptions
764: if (is_array($error['error']) && array_key_exists('reason', $error['error']) === true) {
765: // Try to use root cause first (only grabs the first root cause)
766: $info = $error['error']['root_cause'][0] ?? $error['error'];
767: $cause = $info['reason'];
768: $type = $info['type'];
769: // added json_encode to convert into a string
770: $original = new $errorClass(json_encode($response['body']), $response['status']);
771:
772: return new $errorClass("$type: $cause", (int) $response['status'], $original);
773: }
774: // <2.0 semi-structured exceptions
775: // added json_encode to convert into a string
776: $original = new $errorClass(json_encode($response['body']), $response['status']);
777:
778: $errorEncoded = $error['error'];
779: if (is_array($errorEncoded)) {
780: $errorEncoded = json_encode($errorEncoded);
781: }
782: return new $errorClass($errorEncoded, (int) $response['status'], $original);
783: }
784:
785: // if responseBody is not string, we convert it so it can be used as Exception message
786: $responseBody = $response['body'];
787: if (!is_string($responseBody)) {
788: $responseBody = json_encode($responseBody);
789: }
790:
791: // <2.0 "i just blew up" nonstructured exception
792: return new $errorClass($responseBody);
793: }
794: }
795: