1: | <?php |
2: | |
3: | declare(strict_types=1); |
4: | |
5: | |
6: | |
7: | |
8: | |
9: | |
10: | |
11: | |
12: | |
13: | |
14: | |
15: | |
16: | |
17: | |
18: | |
19: | |
20: | |
21: | |
22: | namespace OpenSearch; |
23: | |
24: | use GuzzleHttp\Ring\Future\FutureArrayInterface; |
25: | use OpenSearch\Common\Exceptions; |
26: | use OpenSearch\ConnectionPool\AbstractConnectionPool; |
27: | use OpenSearch\Connections\ConnectionInterface; |
28: | use Psr\Log\LoggerInterface; |
29: | |
30: | |
31: | @trigger_error(Transport::class . ' is deprecated in 2.4.0 and will be removed in 3.0.0.', E_USER_DEPRECATED); |
32: | |
33: | |
34: | |
35: | |
36: | class Transport |
37: | { |
38: | |
39: | |
40: | |
41: | public $connectionPool; |
42: | |
43: | |
44: | |
45: | |
46: | private $log; |
47: | |
48: | |
49: | |
50: | |
51: | public $retryAttempts = 0; |
52: | |
53: | |
54: | |
55: | |
56: | public $lastConnection; |
57: | |
58: | |
59: | |
60: | |
61: | public $retries; |
62: | |
63: | |
64: | |
65: | |
66: | |
67: | |
68: | |
69: | |
70: | |
71: | |
72: | public function __construct(int $retries, AbstractConnectionPool $connectionPool, LoggerInterface $log, bool $sniffOnStart = false) |
73: | { |
74: | $this->log = $log; |
75: | $this->connectionPool = $connectionPool; |
76: | $this->retries = $retries; |
77: | |
78: | if ($sniffOnStart === true) { |
79: | $this->log->notice('Sniff on Start.'); |
80: | $this->connectionPool->scheduleCheck(); |
81: | } |
82: | } |
83: | |
84: | |
85: | |
86: | |
87: | |
88: | public function getConnection(): ConnectionInterface |
89: | { |
90: | return $this->connectionPool->nextConnection(); |
91: | } |
92: | |
93: | |
94: | |
95: | |
96: | |
97: | |
98: | |
99: | |
100: | |
101: | |
102: | |
103: | |
104: | public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): FutureArrayInterface |
105: | { |
106: | try { |
107: | $connection = $this->getConnection(); |
108: | } catch (Exceptions\NoNodesAvailableException $exception) { |
109: | $this->log->critical('No alive nodes found in cluster'); |
110: | throw $exception; |
111: | } |
112: | |
113: | $response = []; |
114: | $caughtException = null; |
115: | $this->lastConnection = $connection; |
116: | |
117: | $future = $connection->performRequest( |
118: | $method, |
119: | $uri, |
120: | $params, |
121: | $body, |
122: | $options, |
123: | $this |
124: | ); |
125: | |
126: | $future->promise()->then( |
127: | |
128: | function ($response) { |
129: | $this->retryAttempts = 0; |
130: | |
131: | }, |
132: | |
133: | function ($response) { |
134: | $code = $response->getCode(); |
135: | |
136: | if ($code < 400 || $code >= 500) { |
137: | |
138: | $this->connectionPool->scheduleCheck(); |
139: | } |
140: | } |
141: | ); |
142: | |
143: | return $future; |
144: | } |
145: | |
146: | |
147: | |
148: | |
149: | |
150: | |
151: | |
152: | public function resultOrFuture(FutureArrayInterface $result, array $options = []) |
153: | { |
154: | $response = null; |
155: | $async = isset($options['client']['future']) ? $options['client']['future'] : null; |
156: | if (is_null($async) || $async === false) { |
157: | do { |
158: | $result = $result->wait(); |
159: | } while ($result instanceof FutureArrayInterface); |
160: | } |
161: | return $result; |
162: | } |
163: | |
164: | public function shouldRetry(array $request): bool |
165: | { |
166: | if ($this->retryAttempts < $this->retries) { |
167: | $this->retryAttempts += 1; |
168: | |
169: | return true; |
170: | } |
171: | |
172: | return false; |
173: | } |
174: | |
175: | |
176: | |
177: | |
178: | |
179: | public function getLastConnection(): ConnectionInterface |
180: | { |
181: | return $this->lastConnection; |
182: | } |
183: | } |
184: | |