1: <?php
2:
3: declare(strict_types=1);
4:
5: /**
6: * Copyright OpenSearch Contributors
7: * SPDX-License-Identifier: Apache-2.0
8: *
9: * OpenSearch PHP client
10: *
11: * @link https://github.com/opensearch-project/opensearch-php/
12: * @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co)
13: * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
14: * @license https://www.gnu.org/licenses/lgpl-2.1.html GNU Lesser General Public License, Version 2.1
15: *
16: * Licensed to Elasticsearch B.V under one or more agreements.
17: * Elasticsearch B.V licenses this file to you under the Apache 2.0 License or
18: * the GNU Lesser General Public License, Version 2.1, at your option.
19: * See the LICENSE file in the project root for more information.
20: */
21:
22: namespace OpenSearch;
23:
24: use OpenSearch\Common\Exceptions;
25: use OpenSearch\ConnectionPool\AbstractConnectionPool;
26: use OpenSearch\Connections\Connection;
27: use OpenSearch\Connections\ConnectionInterface;
28: use GuzzleHttp\Ring\Future\FutureArrayInterface;
29: use Psr\Log\LoggerInterface;
30:
31: class Transport
32: {
33: /**
34: * @var AbstractConnectionPool
35: */
36: public $connectionPool;
37:
38: /**
39: * @var LoggerInterface
40: */
41: private $log;
42:
43: /**
44: * @var int
45: */
46: public $retryAttempts = 0;
47:
48: /**
49: * @var ConnectionInterface
50: */
51: public $lastConnection;
52:
53: /**
54: * @var int
55: */
56: public $retries;
57:
58: /**
59: * Transport class is responsible for dispatching requests to the
60: * underlying cluster connections
61: *
62: * @param int $retries
63: * @param bool $sniffOnStart
64: * @param ConnectionPool\AbstractConnectionPool $connectionPool
65: * @param \Psr\Log\LoggerInterface $log Monolog logger object
66: */
67: public function __construct(int $retries, AbstractConnectionPool $connectionPool, LoggerInterface $log, bool $sniffOnStart = false)
68: {
69: $this->log = $log;
70: $this->connectionPool = $connectionPool;
71: $this->retries = $retries;
72:
73: if ($sniffOnStart === true) {
74: $this->log->notice('Sniff on Start.');
75: $this->connectionPool->scheduleCheck();
76: }
77: }
78:
79: /**
80: * Returns a single connection from the connection pool
81: * Potentially performs a sniffing step before returning
82: */
83: public function getConnection(): ConnectionInterface
84: {
85: return $this->connectionPool->nextConnection();
86: }
87:
88: /**
89: * Perform a request to the Cluster
90: *
91: * @param string $method HTTP method to use
92: * @param string $uri HTTP URI to send request to
93: * @param array<string, mixed> $params Optional query parameters
94: * @param mixed|null $body Optional query body
95: * @param array $options
96: *
97: * @throws Common\Exceptions\NoNodesAvailableException|\Exception
98: */
99: public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): FutureArrayInterface
100: {
101: try {
102: $connection = $this->getConnection();
103: } catch (Exceptions\NoNodesAvailableException $exception) {
104: $this->log->critical('No alive nodes found in cluster');
105: throw $exception;
106: }
107:
108: $response = [];
109: $caughtException = null;
110: $this->lastConnection = $connection;
111:
112: $future = $connection->performRequest(
113: $method,
114: $uri,
115: $params,
116: $body,
117: $options,
118: $this
119: );
120:
121: $future->promise()->then(
122: //onSuccess
123: function ($response) {
124: $this->retryAttempts = 0;
125: // Note, this could be a 4xx or 5xx error
126: },
127: //onFailure
128: function ($response) {
129: $code = $response->getCode();
130: // Ignore 400 level errors, as that means the server responded just fine
131: if ($code < 400 || $code >= 500) {
132: // Otherwise schedule a check
133: $this->connectionPool->scheduleCheck();
134: }
135: }
136: );
137:
138: return $future;
139: }
140:
141: /**
142: * @param FutureArrayInterface $result Response of a request (promise)
143: * @param array $options Options for transport
144: *
145: * @return callable|array
146: */
147: public function resultOrFuture(FutureArrayInterface $result, array $options = [])
148: {
149: $response = null;
150: $async = isset($options['client']['future']) ? $options['client']['future'] : null;
151: if (is_null($async) || $async === false) {
152: do {
153: $result = $result->wait();
154: } while ($result instanceof FutureArrayInterface);
155: }
156: return $result;
157: }
158:
159: public function shouldRetry(array $request): bool
160: {
161: if ($this->retryAttempts < $this->retries) {
162: $this->retryAttempts += 1;
163:
164: return true;
165: }
166:
167: return false;
168: }
169:
170: /**
171: * Returns the last used connection so that it may be inspected. Mainly
172: * for debugging/testing purposes.
173: */
174: public function getLastConnection(): ConnectionInterface
175: {
176: return $this->lastConnection;
177: }
178: }
179: