1: <?php
2:
3: declare(strict_types=1);
4:
5: /**
6: * Copyright OpenSearch Contributors
7: * SPDX-License-Identifier: Apache-2.0
8: *
9: * OpenSearch PHP client
10: *
11: * @link https://github.com/opensearch-project/opensearch-php/
12: * @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co)
13: * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
14: * @license https://www.gnu.org/licenses/lgpl-2.1.html GNU Lesser General Public License, Version 2.1
15: *
16: * Licensed to Elasticsearch B.V under one or more agreements.
17: * Elasticsearch B.V licenses this file to you under the Apache 2.0 License or
18: * the GNU Lesser General Public License, Version 2.1, at your option.
19: * See the LICENSE file in the project root for more information.
20: */
21:
22: namespace OpenSearch\ConnectionPool;
23:
24: use OpenSearch\Common\Exceptions\Curl\OperationTimeoutException;
25: use OpenSearch\Common\Exceptions\NoNodesAvailableException;
26: use OpenSearch\ConnectionPool\Selectors\SelectorInterface;
27: use OpenSearch\Connections\Connection;
28: use OpenSearch\Connections\ConnectionFactoryInterface;
29: use OpenSearch\Connections\ConnectionInterface;
30:
31: class SniffingConnectionPool extends AbstractConnectionPool
32: {
33: /**
34: * @var int
35: */
36: private $sniffingInterval;
37:
38: /**
39: * @var int
40: */
41: private $nextSniff;
42:
43: /**
44: * @param ConnectionInterface[] $connections
45: * @param array<string, mixed> $connectionPoolParams
46: */
47: public function __construct(
48: $connections,
49: SelectorInterface $selector,
50: ConnectionFactoryInterface $factory,
51: $connectionPoolParams
52: ) {
53: parent::__construct($connections, $selector, $factory, $connectionPoolParams);
54:
55: $this->setConnectionPoolParams($connectionPoolParams);
56: $this->nextSniff = time() + $this->sniffingInterval;
57: }
58:
59: public function nextConnection(bool $force = false): ConnectionInterface
60: {
61: $this->sniff($force);
62:
63: $size = count($this->connections);
64: while ($size--) {
65: /**
66: * @var Connection $connection
67: */
68: $connection = $this->selector->select($this->connections);
69: if ($connection->isAlive() === true || $connection->ping() === true) {
70: return $connection;
71: }
72: }
73:
74: if ($force === true) {
75: throw new NoNodesAvailableException("No alive nodes found in your cluster");
76: }
77:
78: return $this->nextConnection(true);
79: }
80:
81: public function scheduleCheck(): void
82: {
83: $this->nextSniff = -1;
84: }
85:
86: private function sniff(bool $force = false): void
87: {
88: if ($force === false && $this->nextSniff > time()) {
89: return;
90: }
91:
92: $total = count($this->connections);
93:
94: while ($total--) {
95: /**
96: * @var Connection $connection
97: */
98: $connection = $this->selector->select($this->connections);
99:
100: if ($connection->isAlive() xor $force) {
101: continue;
102: }
103:
104: if ($this->sniffConnection($connection) === true) {
105: return;
106: }
107: }
108:
109: if ($force === true) {
110: return;
111: }
112:
113: foreach ($this->seedConnections as $connection) {
114: /**
115: * @var Connection $connection
116: */
117: if ($this->sniffConnection($connection) === true) {
118: return;
119: }
120: }
121: }
122:
123: private function sniffConnection(Connection $connection): bool
124: {
125: try {
126: $response = $connection->sniff();
127: } catch (OperationTimeoutException $exception) {
128: return false;
129: }
130:
131: $nodes = $this->parseClusterState($response);
132:
133: if (count($nodes) === 0) {
134: return false;
135: }
136:
137: $this->connections = [];
138:
139: foreach ($nodes as $node) {
140: $nodeDetails = [
141: 'host' => $node['host'],
142: 'port' => $node['port'],
143: ];
144: $this->connections[] = $this->connectionFactory->create($nodeDetails);
145: }
146:
147: $this->nextSniff = time() + $this->sniffingInterval;
148:
149: return true;
150: }
151:
152: /**
153: * @return list<array{host: string, port: int}>
154: */
155: private function parseClusterState($nodeInfo): array
156: {
157: $pattern = '/([^:]*):(\d+)/';
158: $hosts = [];
159:
160: foreach ($nodeInfo['nodes'] as $node) {
161: if (isset($node['http']) === true && isset($node['http']['publish_address']) === true) {
162: if (preg_match($pattern, $node['http']['publish_address'], $match) === 1) {
163: $hosts[] = [
164: 'host' => $match[1],
165: 'port' => (int)$match[2],
166: ];
167: }
168: }
169: }
170:
171: return $hosts;
172: }
173:
174: /**
175: * @param array<string, mixed> $connectionPoolParams
176: */
177: private function setConnectionPoolParams(array $connectionPoolParams): void
178: {
179: $this->sniffingInterval = (int)($connectionPoolParams['sniffingInterval'] ?? 300);
180: }
181: }
182: