1: <?php
2:
3: declare(strict_types=1);
4:
5: /**
6: * Copyright OpenSearch Contributors
7: * SPDX-License-Identifier: Apache-2.0
8: *
9: * OpenSearch PHP client
10: *
11: * @link https://github.com/opensearch-project/opensearch-php/
12: * @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co)
13: * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
14: * @license https://www.gnu.org/licenses/lgpl-2.1.html GNU Lesser General Public License, Version 2.1
15: *
16: * Licensed to Elasticsearch B.V under one or more agreements.
17: * Elasticsearch B.V licenses this file to you under the Apache 2.0 License or
18: * the GNU Lesser General Public License, Version 2.1, at your option.
19: * See the LICENSE file in the project root for more information.
20: */
21:
22: namespace OpenSearch;
23:
24: use GuzzleHttp\Ring\Future\FutureArrayInterface;
25: use OpenSearch\Common\Exceptions;
26: use OpenSearch\ConnectionPool\AbstractConnectionPool;
27: use OpenSearch\Connections\ConnectionInterface;
28: use Psr\Log\LoggerInterface;
29:
30: // @phpstan-ignore classConstant.deprecatedClass
31: @trigger_error(Transport::class . ' is deprecated in 2.4.0 and will be removed in 3.0.0.', E_USER_DEPRECATED);
32:
33: /**
34: * @deprecated in 2.4.0 and will be removed in 3.0.0.
35: */
36: class Transport
37: {
38: /**
39: * @var AbstractConnectionPool
40: */
41: public $connectionPool;
42:
43: /**
44: * @var LoggerInterface
45: */
46: private $log;
47:
48: /**
49: * @var int
50: */
51: public $retryAttempts = 0;
52:
53: /**
54: * @var ConnectionInterface
55: */
56: public $lastConnection;
57:
58: /**
59: * @var int
60: */
61: public $retries;
62:
63: /**
64: * Transport class is responsible for dispatching requests to the
65: * underlying cluster connections
66: *
67: * @param int $retries
68: * @param bool $sniffOnStart
69: * @param ConnectionPool\AbstractConnectionPool $connectionPool
70: * @param \Psr\Log\LoggerInterface $log Monolog logger object
71: */
72: public function __construct(int $retries, AbstractConnectionPool $connectionPool, LoggerInterface $log, bool $sniffOnStart = false)
73: {
74: $this->log = $log;
75: $this->connectionPool = $connectionPool;
76: $this->retries = $retries;
77:
78: if ($sniffOnStart === true) {
79: $this->log->notice('Sniff on Start.');
80: $this->connectionPool->scheduleCheck();
81: }
82: }
83:
84: /**
85: * Returns a single connection from the connection pool
86: * Potentially performs a sniffing step before returning
87: */
88: public function getConnection(): ConnectionInterface
89: {
90: return $this->connectionPool->nextConnection();
91: }
92:
93: /**
94: * Perform a request to the Cluster
95: *
96: * @param string $method HTTP method to use
97: * @param string $uri HTTP URI to send request to
98: * @param array<string, mixed> $params Optional query parameters
99: * @param mixed|null $body Optional query body
100: * @param array $options
101: *
102: * @throws Common\Exceptions\NoNodesAvailableException|\Exception
103: */
104: public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): FutureArrayInterface
105: {
106: try {
107: $connection = $this->getConnection();
108: } catch (Exceptions\NoNodesAvailableException $exception) {
109: $this->log->critical('No alive nodes found in cluster');
110: throw $exception;
111: }
112:
113: $response = [];
114: $caughtException = null;
115: $this->lastConnection = $connection;
116:
117: $future = $connection->performRequest(
118: $method,
119: $uri,
120: $params,
121: $body,
122: $options,
123: $this
124: );
125:
126: $future->promise()->then(
127: //onSuccess
128: function ($response) {
129: $this->retryAttempts = 0;
130: // Note, this could be a 4xx or 5xx error
131: },
132: //onFailure
133: function ($response) {
134: $code = $response->getCode();
135: // Ignore 400 level errors, as that means the server responded just fine
136: if ($code < 400 || $code >= 500) {
137: // Otherwise schedule a check
138: $this->connectionPool->scheduleCheck();
139: }
140: }
141: );
142:
143: return $future;
144: }
145:
146: /**
147: * @param FutureArrayInterface $result Response of a request (promise)
148: * @param array $options Options for transport
149: *
150: * @return callable|array
151: */
152: public function resultOrFuture(FutureArrayInterface $result, array $options = [])
153: {
154: $response = null;
155: $async = isset($options['client']['future']) ? $options['client']['future'] : null;
156: if (is_null($async) || $async === false) {
157: do {
158: $result = $result->wait();
159: } while ($result instanceof FutureArrayInterface);
160: }
161: return $result;
162: }
163:
164: public function shouldRetry(array $request): bool
165: {
166: if ($this->retryAttempts < $this->retries) {
167: $this->retryAttempts += 1;
168:
169: return true;
170: }
171:
172: return false;
173: }
174:
175: /**
176: * Returns the last used connection so that it may be inspected. Mainly
177: * for debugging/testing purposes.
178: */
179: public function getLastConnection(): ConnectionInterface
180: {
181: return $this->lastConnection;
182: }
183: }
184: