1: <?php
2:
3: declare(strict_types=1);
4:
5: /**
6: * Copyright OpenSearch Contributors
7: * SPDX-License-Identifier: Apache-2.0
8: *
9: * OpenSearch PHP client
10: *
11: * @link https://github.com/opensearch-project/opensearch-php/
12: * @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co)
13: * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
14: * @license https://www.gnu.org/licenses/lgpl-2.1.html GNU Lesser General Public License, Version 2.1
15: *
16: * Licensed to Elasticsearch B.V under one or more agreements.
17: * Elasticsearch B.V licenses this file to you under the Apache 2.0 License or
18: * the GNU Lesser General Public License, Version 2.1, at your option.
19: * See the LICENSE file in the project root for more information.
20: */
21:
22: namespace OpenSearch\ConnectionPool;
23:
24: use OpenSearch\Common\Exceptions\Curl\OperationTimeoutException;
25: use OpenSearch\Common\Exceptions\NoNodesAvailableException;
26: use OpenSearch\ConnectionPool\Selectors\SelectorInterface;
27: use OpenSearch\Connections\Connection;
28: use OpenSearch\Connections\ConnectionFactoryInterface;
29: use OpenSearch\Connections\ConnectionInterface;
30:
31: // @phpstan-ignore classConstant.deprecatedClass
32: @trigger_error(SniffingConnectionPool::class . ' is deprecated in 2.4.0 and will be removed in 3.0.0.', E_USER_DEPRECATED);
33:
34: /**
35: * @deprecated in 2.4.0 and will be removed in 3.0.0.
36: *
37: * @phpstan-ignore class.extendsDeprecatedClass
38: */
39: class SniffingConnectionPool extends AbstractConnectionPool
40: {
41: /**
42: * @var int
43: */
44: private $sniffingInterval;
45:
46: /**
47: * @var int
48: */
49: private $nextSniff;
50:
51: /**
52: * @param ConnectionInterface[] $connections
53: * @param array<string, mixed> $connectionPoolParams
54: */
55: public function __construct(
56: $connections,
57: SelectorInterface $selector,
58: ConnectionFactoryInterface $factory,
59: $connectionPoolParams
60: ) {
61: parent::__construct($connections, $selector, $factory, $connectionPoolParams);
62:
63: $this->setConnectionPoolParams($connectionPoolParams);
64: $this->nextSniff = time() + $this->sniffingInterval;
65: }
66:
67: public function nextConnection(bool $force = false): ConnectionInterface
68: {
69: $this->sniff($force);
70:
71: $size = count($this->connections);
72: while ($size--) {
73: /**
74: * @var Connection $connection
75: */
76: $connection = $this->selector->select($this->connections);
77: if ($connection->isAlive() === true || $connection->ping() === true) {
78: return $connection;
79: }
80: }
81:
82: if ($force === true) {
83: throw new NoNodesAvailableException("No alive nodes found in your cluster");
84: }
85:
86: return $this->nextConnection(true);
87: }
88:
89: public function scheduleCheck(): void
90: {
91: $this->nextSniff = -1;
92: }
93:
94: private function sniff(bool $force = false): void
95: {
96: if ($force === false && $this->nextSniff > time()) {
97: return;
98: }
99:
100: $total = count($this->connections);
101:
102: while ($total--) {
103: /**
104: * @var Connection $connection
105: */
106: $connection = $this->selector->select($this->connections);
107:
108: if ($connection->isAlive() xor $force) {
109: continue;
110: }
111:
112: if ($this->sniffConnection($connection) === true) {
113: return;
114: }
115: }
116:
117: if ($force === true) {
118: return;
119: }
120:
121: foreach ($this->seedConnections as $connection) {
122: /**
123: * @var Connection $connection
124: */
125: if ($this->sniffConnection($connection) === true) {
126: return;
127: }
128: }
129: }
130:
131: private function sniffConnection(Connection $connection): bool
132: {
133: try {
134: $response = $connection->sniff();
135: } catch (OperationTimeoutException $exception) {
136: return false;
137: }
138:
139: $nodes = $this->parseClusterState($response);
140:
141: if (count($nodes) === 0) {
142: return false;
143: }
144:
145: $this->connections = [];
146:
147: foreach ($nodes as $node) {
148: $nodeDetails = [
149: 'host' => $node['host'],
150: 'port' => $node['port'],
151: ];
152: $this->connections[] = $this->connectionFactory->create($nodeDetails);
153: }
154:
155: $this->nextSniff = time() + $this->sniffingInterval;
156:
157: return true;
158: }
159:
160: /**
161: * @return list<array{host: string, port: int}>
162: */
163: private function parseClusterState($nodeInfo): array
164: {
165: $pattern = '/([^:]*):(\d+)/';
166: $hosts = [];
167:
168: foreach ($nodeInfo['nodes'] as $node) {
169: if (isset($node['http']) === true && isset($node['http']['publish_address']) === true) {
170: if (preg_match($pattern, $node['http']['publish_address'], $match) === 1) {
171: $hosts[] = [
172: 'host' => $match[1],
173: 'port' => (int)$match[2],
174: ];
175: }
176: }
177: }
178:
179: return $hosts;
180: }
181:
182: /**
183: * @param array<string, mixed> $connectionPoolParams
184: */
185: private function setConnectionPoolParams(array $connectionPoolParams): void
186: {
187: $this->sniffingInterval = (int)($connectionPoolParams['sniffingInterval'] ?? 300);
188: }
189: }
190: