ConnectionPool — Documentation by YARD 0.9.37 (original) (raw)

Class: Mongo::Server::ConnectionPool

Inherits:

Object

Extended by:

Forwardable

Includes:

Loggable, Monitoring::Publishable

Defined in:

lib/mongo/server/connection_pool.rb,
lib/mongo/server/connection_pool/populator.rb,
lib/mongo/server/connection_pool/generation_manager.rb

Overview

Represents a connection pool for server connections.

Defined Under Namespace

Classes: GenerationManager, Populator

Constant Summarycollapse

DEFAULT_MAX_SIZE =

The default max size for the connection pool.

20

DEFAULT_MIN_SIZE =

The default min size for the connection pool.

0

DEFAULT_MAX_CONNECTING =

The default maximum number of connections that can be connecting at any given time.

2

DEFAULT_WAIT_TIMEOUT =

The default timeout, in seconds, to wait for a connection.

This timeout applies while in flow threads are waiting for background threads to establish connections (and hence they must connect, handshake and auth in the allotted time).

It is currently set to 10 seconds. The default connect timeout is 10 seconds by itself, but setting large timeouts can get applications in trouble if their requests get timed out by the reverse proxy, thus anything over 15 seconds is potentially dangerous.

10.freeze

Constants included from Loggable

Loggable::PREFIX

Instance Attribute Summary collapse

Attributes included from Monitoring::Publishable

#monitoring

Class Method Summarycollapse

Instance Method Summarycollapse

Methods included from Monitoring::Publishable

#publish_cmap_event, #publish_event, #publish_sdam_event

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Constructor Details

#initialize(server, options = {}) ⇒ ConnectionPool

Create the new connection pool.

Note: Additionally, options for connections created by this pool should

be included in the options passed here, and they will be forwarded to
any connections created by the pool.

| 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | # File 'lib/mongo/server/connection_pool.rb', line 102 def initialize(server, options = {}) unless server.is_a?(Server) raise ArgumentError, 'First argument must be a Server instance' end options = options.dup if options[:min_size] && options[:min_pool_size] && options[:min_size] != options[:min_pool_size] raise ArgumentError, "Min size #{options[:min_size]} is not identical to min pool size #{options[:min_pool_size]}" end if options[:max_size] && options[:max_pool_size] && options[:max_size] != options[:max_pool_size] raise ArgumentError, "Max size #{options[:max_size]} is not identical to max pool size #{options[:max_pool_size]}" end if options[:wait_timeout] && options[:wait_queue_timeout] && options[:wait_timeout] != options[:wait_queue_timeout] raise ArgumentError, "Wait timeout #{options[:wait_timeout]} is not identical to wait queue timeout #{options[:wait_queue_timeout]}" end options[:min_size] ||= options[:min_pool_size] options.delete(:min_pool_size) options[:max_size] | |= options[:max_pool_size] options.delete(:max_pool_size) if options[:min_size] && options[:max_size] && (options[:max_size] != 0 && options[:min_size] > options[:max_size]) then raise ArgumentError, "Cannot have min size #{options[:min_size]} exceed max size #{options[:max_size]}" end if options[:wait_queue_timeout] options[:wait_timeout] | |= options[:wait_queue_timeout] end options.delete(:wait_queue_timeout) @server = server @options = options.freeze @generation_manager = GenerationManager.new(server: server) @ready = false @closed = false @available_connections = available_connections = [] @checked_out_connections = Set.new @pending_connections = Set.new @interrupt_connections = [] @lock = Mutex.new @populator = Populator.new(self, options) @populate_semaphore = Semaphore.new @size_cv = Mongo::ConditionVariable.new(@lock) @connection_requests = 0 @max_connecting_cv = Mongo::ConditionVariable.new(@lock) @max_connecting = options.fetch(:max_connecting, DEFAULT_MAX_CONNECTING) ObjectSpace.define_finalizer(self, self.class.finalize(@available_connections, @pending_connections, @populator)) publish_cmap_event( Monitoring::Event::Cmap::PoolCreated.new(@server.address, options, self) ) end | | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |

Instance Attribute Details

#generation_manager ⇒ Integer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns generation Generation of connections currently being used by the queue.

233 234 235 # File 'lib/mongo/server/connection_pool.rb', line 233 def generation_manager @generation_manager end

#max_connecting ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

342 343 344 # File 'lib/mongo/server/connection_pool.rb', line 342 def max_connecting @max_connecting end

#options ⇒ Hash

Returns options The pool options.

180 181 182 # File 'lib/mongo/server/connection_pool.rb', line 180 def options @options end

#populate_semaphore ⇒ Object

Condition variable broadcast when the size of the pool changes to wake up the populator

59 60 61 # File 'lib/mongo/server/connection_pool.rb', line 59 def populate_semaphore @populate_semaphore end

#populator ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

339 340 341 # File 'lib/mongo/server/connection_pool.rb', line 339 def populator @populator end

#server ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

183 184 185 # File 'lib/mongo/server/connection_pool.rb', line 183 def server @server end

Class Method Details

.finalize(available_connections, pending_connections, populator) ⇒ Proc

Finalize the connection pool for garbage collection.

| 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 | # File 'lib/mongo/server/connection_pool.rb', line 813 def self.finalize(available_connections, pending_connections, populator) proc do available_connections.each do |connection| connection.disconnect!(reason: :pool_closed) end available_connections.clear pending_connections.each do | connection| connection.disconnect!(reason: :pool_closed) end pending_connections.clear end end | | ------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------- |

Instance Method Details

#available_count ⇒ Integer

Number of available connections in the pool.

291 292 293 294 295 296 297 # File 'lib/mongo/server/connection_pool.rb', line 291 def available_count raise_if_closed! @lock.synchronize do @available_connections.length end end

#check_in(connection) ⇒ Object

Check a connection back into the pool.

The connection must have been previously created by this pool.

402 403 404 405 406 407 408 409 410 # File 'lib/mongo/server/connection_pool.rb', line 402 def check_in(connection) check_invariants @lock.synchronize do do_check_in(connection) end ensure check_invariants end

#check_out(connection_global_id: nil, context: nil) ⇒ Mongo::Server::Connection

Checks a connection out of the pool.

If there are active connections in the pool, the most recently used connection is returned. Otherwise if the connection pool size is less than the max size, creates a new connection and returns it. Otherwise waits up to the wait timeout and raises Timeout::Error if there are still no active connections and the pool is at max size.

The returned connection counts toward the pool’s max size. When the caller is finished using the connection, the connection should be checked back in via the check_in method.

366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 # File 'lib/mongo/server/connection_pool.rb', line 366 def check_out(connection_global_id: nil, context: nil) check_invariants publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutStarted.new(@server.address) ) raise_if_pool_closed! raise_if_pool_paused_locked! connection = retrieve_and_connect_connection( connection_global_id, context ) publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckedOut.new(@server.address, connection.id, self), ) if Lint.enabled? unless connection.connected? raise Error::LintError, "Connection pool for #{address} checked out a disconnected connection #{connection.generation}:#{connection.id}" end end connection ensure check_invariants end

#clear(options = nil) ⇒ true

Closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool. The pool is paused, it will not create new connections in background and it will fail checkout requests until marked ready.

520 521 522 523 524 525 526 527 528 # File 'lib/mongo/server/connection_pool.rb', line 520 def clear(options = nil) raise_if_closed! if Lint.enabled? && !@server.unknown? raise Error::LintError, "Attempting to clear pool for server #{@server.summary} which is known" end do_clear(options) end

#close(options = nil) ⇒ true

Marks the pool closed, closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool. If force option is true, checked out connections are also closed. Attempts to use the pool after it is closed will raise Error::PoolClosedError.

| 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 | # File 'lib/mongo/server/connection_pool.rb', line 644 def close(options = nil) return if closed? options ||= {} stop_populator @lock.synchronize do until @available_connections.empty? connection = @available_connections.pop connection.disconnect!(reason: :pool_closed) end if options[:force] until @checked_out_connections.empty? connection = @checked_out_connections.take(1).first connection.disconnect!(reason: :pool_closed) @checked_out_connections.delete(connection) end end unless options && options[:stay_ready] @closed = true @ready = false end @max_connecting_cv.broadcast @size_cv.broadcast end publish_cmap_event( Monitoring::Event::Cmap::PoolClosed.new(@server.address, self) ) true end | | ------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |

#close_idle_sockets ⇒ Object

Close sockets that have been open for longer than the max idle time,

if the option is set.
734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 # File 'lib/mongo/server/connection_pool.rb', line 734 def close_idle_sockets return if closed? return unless max_idle_time @lock.synchronize do i = 0 while i < @available_connections.length connection = @available_connections[i] if last_checkin = connection.last_checkin if (Time.now - last_checkin) > max_idle_time connection.disconnect!(reason: :idle) @available_connections.delete_at(i) @populate_semaphore.signal next end end i += 1 end end end

#closed? ⇒ true | false

Whether the pool has been closed.

304 305 306 # File 'lib/mongo/server/connection_pool.rb', line 304 def closed? !!@closed end

#disconnect!(options = nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Disconnects the pool.

Does everything that clear does, except if the pool is closed this method does nothing but clear would raise PoolClosedError.

537 538 539 540 541 542 543 # File 'lib/mongo/server/connection_pool.rb', line 537 def disconnect!(options = nil) do_clear(options) rescue Error::PoolClosedError end

#do_check_in(connection) ⇒ Object

Executes the check in after having already acquired the lock.

415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 # File 'lib/mongo/server/connection_pool.rb', line 415 def do_check_in(connection) return if connection.closed? && connection.interrupted? unless connection.connection_pool == self raise ArgumentError, "Trying to check in a connection which was not checked out by this pool: #{connection} checked out from pool #{connection.connection_pool} (for #{self})" end unless @checked_out_connections.include?(connection) raise ArgumentError, "Trying to check in a connection which is not currently checked out by this pool: #{connection} (for #{self})" end @checked_out_connections.delete(connection) @size_cv.signal publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckedIn.new(@server.address, connection.id, self) ) if connection.interrupted? connection.disconnect!(reason: :stale) return end if connection.error? connection.disconnect!(reason: :error) return end if closed? connection.disconnect!(reason: :pool_closed) return end if connection.closed? @populate_semaphore.signal elsif connection.generation != generation(service_id: connection.service_id) && !connection.pinned? connection.disconnect!(reason: :stale) @populate_semaphore.signal else connection.record_checkin! @available_connections << connection @max_connecting_cv.signal end end

#do_clear(options = nil) ⇒ Object

545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 # File 'lib/mongo/server/connection_pool.rb', line 545 def do_clear(options = nil) check_invariants service_id = options && options[:service_id] @lock.synchronize do @generation_manager.bump(service_id: service_id) unless options && options[:lazy] close_available_connections(service_id) end if options && options[:interrupt_in_use_connections] schedule_for_interruption(@checked_out_connections, service_id) schedule_for_interruption(@pending_connections, service_id) end if @ready publish_cmap_event( Monitoring::Event::Cmap::PoolCleared.new( @server.address, service_id: service_id, interrupt_in_use_connections: options&.[](:interrupt_in_use_connections) ) ) do_pause if !@server.load_balancer? && @server.unknown? end @max_connecting_cv.broadcast @size_cv.broadcast end @populate_semaphore.signal true ensure check_invariants end

#do_pause ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Mark the connection pool as paused without acquiring the lock.

493 494 495 496 497 498 499 500 501 # File 'lib/mongo/server/connection_pool.rb', line 493 def do_pause if Lint.enabled? && !@server.unknown? raise Error::LintError, "Attempting to pause pool for server #{@server.summary} which is known" end return if !@ready @ready = false end

#inspect ⇒ String

Get a pretty printed string inspection for the pool.

691 692 693 694 695 696 697 698 699 700 701 702 # File 'lib/mongo/server/connection_pool.rb', line 691 def inspect if closed? "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " + "wait_timeout=#{wait_timeout} closed>" elsif !ready? "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " + "wait_timeout=#{wait_timeout} paused>" else "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " + "wait_timeout=#{wait_timeout} current_size=#{size} available=#{available_count}>" end end

#max_idle_time ⇒ Float | nil

The maximum seconds a socket can remain idle since it has been checked in to the pool, if set.

| 228 229 230 | # File 'lib/mongo/server/connection_pool.rb', line 228 def max_idle_time @max_idle_time ||= options[:max_idle_time] end | | ----------- | ------------------------------------------------------------------------------------------------------------------------------------ |

#max_size ⇒ Integer

Get the maximum size of the connection pool.

| 193 194 195 | # File 'lib/mongo/server/connection_pool.rb', line 193 def max_size @max_size ||= options[:max_size] | | [DEFAULT_MAX_SIZE, min_size].max end | | ----------- | -------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ |

#min_size ⇒ Integer

Get the minimum size of the connection pool.

| 202 203 204 | # File 'lib/mongo/server/connection_pool.rb', line 202 def min_size @min_size ||= options[:min_size] | | DEFAULT_MIN_SIZE end | | ----------- | -------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------- |

#pause ⇒ Object

Mark the connection pool as paused.

478 479 480 481 482 483 484 485 486 487 488 # File 'lib/mongo/server/connection_pool.rb', line 478 def pause raise_if_closed! check_invariants @lock.synchronize do do_pause end ensure check_invariants end

#paused? ⇒ true | false

A connection pool is paused if it is not closed and it is not ready.

246 247 248 249 250 251 252 # File 'lib/mongo/server/connection_pool.rb', line 246 def paused? raise_if_closed! @lock.synchronize do !@ready end end

#populate ⇒ true | false

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method does three things:

  1. Creates and adds a connection to the pool, if the pool’s size is below min_size. Retries once if a socket-related error is encountered during this process and raises if a second error or a non socket-related error occurs.
  2. Removes stale connections from the connection pool.
  3. Interrupts connections marked for interruption.

Used by the pool populator background thread.

occured, or the non socket-related error

792 793 794 795 796 797 798 799 800 801 802 803 804 # File 'lib/mongo/server/connection_pool.rb', line 792 def populate return false if closed? begin return create_and_add_connection rescue Error::SocketError, Error::SocketTimeoutError => e log_warn("Populator failed to connect a connection for #{address}: #{e.class}: #{e}. It will retry.") end return create_and_add_connection end

#ready ⇒ Object

Instructs the pool to create and return connections.

593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 # File 'lib/mongo/server/connection_pool.rb', line 593 def ready raise_if_closed! @lock.synchronize do return if @ready @ready = true end publish_cmap_event( Monitoring::Event::Cmap::PoolReady.new(@server.address, options, self) ) if options.fetch(:populator_io, true) if @populator.running? @populate_semaphore.signal else @populator.run! end end end

#ready? ⇒ true | false

Whether the pool is ready.

311 312 313 314 315 # File 'lib/mongo/server/connection_pool.rb', line 311 def ready? @lock.synchronize do @ready end end

#size ⇒ Integer

Size of the connection pool.

Includes available and checked out connections.

261 262 263 264 265 266 267 # File 'lib/mongo/server/connection_pool.rb', line 261 def size raise_if_closed! @lock.synchronize do unsynchronized_size end end

#stop_populator ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stop the background populator thread and clean up any connections created which have not been connected yet.

Used when closing the pool or when terminating the bg thread for testing purposes. In the latter case, this method must be called before the pool is used, to ensure no connections in pending_connections were created in-flow by the check_out method.

764 765 766 767 768 769 770 771 772 773 774 # File 'lib/mongo/server/connection_pool.rb', line 764 def stop_populator @populator.stop! @lock.synchronize do clear_pending_connections end end

#summary ⇒ Object

Note:

This method is experimental and subject to change.

321 322 323 324 325 326 327 328 329 330 331 332 333 # File 'lib/mongo/server/connection_pool.rb', line 321 def summary @lock.synchronize do state = if closed? 'closed' elsif !@ready 'paused' else 'ready' end "#<ConnectionPool size=#{unsynchronized_size} (#{min_size}-#{max_size}) " + "used=#{@checked_out_connections.length} avail=#{@available_connections.length} pending=#{@pending_connections.length} #{state}>" end end

#unavailable_connections ⇒ Integer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The number of unavailable connections in the pool. Used to calculate whether we have hit max_pool_size.

282 283 284 # File 'lib/mongo/server/connection_pool.rb', line 282 def unavailable_connections @checked_out_connections.length + @pending_connections.length + @connection_requests end

#wait_timeout(context = nil) ⇒ Float

The time to wait, in seconds, for a connection to become available.

| 214 215 216 217 218 219 220 | # File 'lib/mongo/server/connection_pool.rb', line 214 def wait_timeout(context = nil) if context&.remaining_timeout_sec.nil? options[:wait_timeout] || DEFAULT_WAIT_TIMEOUT else context&.remaining_timeout_sec end end | | --------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |

#with_connection(connection_global_id: nil, context: nil) ⇒ Object

Yield the block to a connection, while handling check in/check out logic.

714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 # File 'lib/mongo/server/connection_pool.rb', line 714 def with_connection(connection_global_id: nil, context: nil) raise_if_closed! connection = check_out( connection_global_id: connection_global_id, context: context ) yield(connection) rescue Error::SocketError, Error::SocketTimeoutError, Error::ConnectionPerished => e maybe_raise_pool_cleared!(connection, e) ensure if connection check_in(connection) end end