Class: Krakow::Distribution Abstract
- Inherits:
-
Object
- Object
- Krakow::Distribution
- Extended by:
- Utils::Lazy::ClassMethods
- Includes:
- Celluloid, Utils::Lazy, Utils::Lazy::InstanceMethods
- Defined in:
- lib/krakow/distribution.rb,
lib/krakow/distribution/default.rb
Overview
Message distribution
Direct Known Subclasses
Defined Under Namespace
Classes: Default
Instance Attribute Summary (collapse)
-
- (Hash) arguments
included
from Utils::Lazy::InstanceMethods
readonly
Argument hash.
-
- (Object) flight_record
Returns the value of attribute flight_record.
-
- (Object) ideal
Returns the value of attribute ideal.
-
- (Object) registry
Returns the value of attribute registry.
Attributes (collapse)
-
- (Numeric) backoff_interval
The backoff_interval attribute.
-
- (TrueClass, FalseClass) backoff_interval?
Truthiness of the backoff_interval attribute.
-
- (Krakow::Consumer) consumer
The consumer attribute.
-
- (TrueClass, FalseClass) consumer?
Truthiness of the consumer attribute.
-
- (Integer) max_in_flight
The max_in_flight attribute.
-
- (TrueClass, FalseClass) max_in_flight?
Truthiness of the max_in_flight attribute.
-
- (Numeric) watch_dog_interval
The watch_dog_interval attribute.
-
- (TrueClass, FalseClass) watch_dog_interval?
Truthiness of the watch_dog_interval attribute.
Class Method Summary (collapse)
-
+ (nil) attribute(name, type, options = {})
extended
from Utils::Lazy::ClassMethods
Add new attributes to class.
-
+ (Array<Hash>) attributes(*args)
extended
from Utils::Lazy::ClassMethods
Return attributes.
-
+ (TrueClass) set_attributes(attrs)
extended
from Utils::Lazy::ClassMethods
Directly set attribute hash.
Instance Method Summary (collapse)
-
- (TrueClass) add_connection(connection)
Add connection to make available for RDY distribution.
-
- (Integer) calculate_ready!(connection_identifier)
- Abstract
-
Determine RDY value for given connection.
-
- (Krakow::Connection?) connection_lookup(identifier)
Return connection associated with given registry key.
-
- (Array<Krakow::Connection>) connections
Connections in registry.
-
- (TrueClass) failure(connection_identifier)
Log failure of processed message.
-
- (Krakow::Connection, Object) in_flight_lookup(msg_id) {|connection| ... }
Return source connection for given message ID.
-
- (Integer) initial_ready
Initial ready value used for new connections.
-
- (Distribution) initialize(args = {})
constructor
A new instance of Distribution.
- - (String) inspect included from Utils::Lazy::InstanceMethods
-
- (Logger?) log(*args)
included
from Utils::Logging
Log message.
-
- (Integer) ready_for(connection_identifier)
Return the currently configured RDY value for given connnection.
-
- (Object) redistribute!
- Abstract
-
Reset flight distributions.
-
- (Integer) register_message(message, connection_identifier)
Registers message into registry and configures for distribution.
-
- (Hash) registry_lookup(connection_identifier)
Return registry information for given connection.
-
- (TrueClass) remove_connection(connection_identifier, *args)
Remove connection from RDY distribution.
-
- (Krakow::FrameType::Error?) set_ready_for(connection, *_)
Send RDY for given connection.
-
- (TrueClass) success(connection_identifier)
Log success of processed message.
- - (String) to_s included from Utils::Lazy::InstanceMethods
-
- (Krakow::Connection) unregister_message(message)
Remove message metadata from registry.
Constructor Details
- (Distribution) initialize(args = {})
Returns a new instance of Distribution
33 34 35 36 37 38 |
# File 'lib/krakow/distribution.rb', line 33 def initialize(args={}) super @ideal = 0 @flight_record = {} @registry = {} end |
Instance Attribute Details
- (Hash) arguments (readonly) Originally defined in module Utils::Lazy::InstanceMethods
Returns argument hash
- (Object) flight_record
Returns the value of attribute flight_record
17 18 19 |
# File 'lib/krakow/distribution.rb', line 17 def flight_record @flight_record end |
- (Object) ideal
Returns the value of attribute ideal
17 18 19 |
# File 'lib/krakow/distribution.rb', line 17 def ideal @ideal end |
- (Object) registry
Returns the value of attribute registry
17 18 19 |
# File 'lib/krakow/distribution.rb', line 17 def registry @registry end |
Class Method Details
+ (nil) attribute(name, type, options = {}) Originally defined in module Utils::Lazy::ClassMethods
Add new attributes to class
+ (Array<Hash>) attributes(*args) Originally defined in module Utils::Lazy::ClassMethods
Return attributes
+ (TrueClass) set_attributes(attrs) Originally defined in module Utils::Lazy::ClassMethods
need deep dup here
Directly set attribute hash
Instance Method Details
- (TrueClass) add_connection(connection)
Add connection to make available for RDY distribution
110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/krakow/distribution.rb', line 110 def add_connection(connection) unless(registry[connection.identifier]) registry[connection.identifier] = { :ready => initial_ready, :in_flight => 0, :failures => 0, :backoff_until => 0 } end true end |
- (Numeric) backoff_interval
Returns the backoff_interval attribute
28 |
# File 'lib/krakow/distribution.rb', line 28 attribute :backoff_interval, Numeric |
- (TrueClass, FalseClass) backoff_interval?
Returns truthiness of the backoff_interval attribute
28 |
# File 'lib/krakow/distribution.rb', line 28 attribute :backoff_interval, Numeric |
- (Integer) calculate_ready!(connection_identifier)
- Abstract
-
Determine RDY value for given connection
48 49 50 |
# File 'lib/krakow/distribution.rb', line 48 def calculate_ready!(connection_identifier) raise NotImplementedError.new 'Custom `#calculate_ready!` method must be provided!' end |
- (Krakow::Connection?) connection_lookup(identifier)
Return connection associated with given registry key
143 144 145 |
# File 'lib/krakow/distribution.rb', line 143 def connection_lookup(identifier) consumer.connection(identifier) end |
- (Array<Krakow::Connection>) connections
Returns connections in registry
175 176 177 178 179 |
# File 'lib/krakow/distribution.rb', line 175 def connections registry.keys.map do |identifier| connection_lookup(identifier) end.compact end |
- (Krakow::Consumer) consumer
Returns the consumer attribute
26 |
# File 'lib/krakow/distribution.rb', line 26 attribute :consumer, Krakow::Consumer, :required => true |
- (TrueClass, FalseClass) consumer?
Returns truthiness of the consumer attribute
26 |
# File 'lib/krakow/distribution.rb', line 26 attribute :consumer, Krakow::Consumer, :required => true |
- (TrueClass) failure(connection_identifier)
Log failure of processed message
185 186 187 188 189 190 191 192 |
# File 'lib/krakow/distribution.rb', line 185 def failure(connection_identifier) if(backoff_interval) registry_info = registry_lookup(connection_identifier) registry_info[:failures] += 1 registry_info[:backoff_until] = Time.now.to_i + (registry_info[:failures] * backoff_interval) end true end |
- (Krakow::Connection, Object) in_flight_lookup(msg_id) {|connection| ... }
Return source connection for given message ID
153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/krakow/distribution.rb', line 153 def in_flight_lookup(msg_id) connection = connection_lookup(flight_record[msg_id]) unless(connection) abort Krakow::Error::LookupFailed.new("Failed to locate in flight message (ID: #{msg_id})") end if(block_given?) yield connection else connection end end |
- (Integer) initial_ready
Initial ready value used for new connections
90 91 92 |
# File 'lib/krakow/distribution.rb', line 90 def initial_ready ideal > 0 ? 1 : 0 end |
- (String) inspect Originally defined in module Utils::Lazy::InstanceMethods
- (Logger?) log(*args) Originally defined in module Utils::Logging
Log message
- (Integer) max_in_flight
Returns the max_in_flight attribute
29 |
# File 'lib/krakow/distribution.rb', line 29 attribute :max_in_flight, Integer, :default => 1 |
- (TrueClass, FalseClass) max_in_flight?
Returns truthiness of the max_in_flight attribute
29 |
# File 'lib/krakow/distribution.rb', line 29 attribute :max_in_flight, Integer, :default => 1 |
- (Integer) ready_for(connection_identifier)
Return the currently configured RDY value for given connnection
70 71 72 |
# File 'lib/krakow/distribution.rb', line 70 def ready_for(connection_identifier) registry_lookup(connection_identifier)[:ready] end |
- (Object) redistribute!
- Abstract
-
Reset flight distributions
41 42 43 |
# File 'lib/krakow/distribution.rb', line 41 def redistribute! raise NotImplementedError.new 'Custom `#redistrubute!` method must be provided!' end |
- (Integer) register_message(message, connection_identifier)
Registers message into registry and configures for distribution
99 100 101 102 103 104 |
# File 'lib/krakow/distribution.rb', line 99 def (, connection_identifier) registry_info = registry_lookup(connection_identifier) registry_info[:in_flight] += 1 flight_record[.] = connection_identifier calculate_ready!(connection_identifier) end |
- (Hash) registry_lookup(connection_identifier)
Return registry information for given connection
169 170 171 172 |
# File 'lib/krakow/distribution.rb', line 169 def registry_lookup(connection_identifier) registry[connection_identifier] || abort(Krakow::Error::LookupFailed.new("Failed to locate connection information in registry (#{connection_identifier})")) end |
- (TrueClass) remove_connection(connection_identifier, *args)
Remove connection from RDY distribution
126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/krakow/distribution.rb', line 126 def remove_connection(connection_identifier, *args) # remove connection from registry registry.delete(connection_identifier) # remove any in flight messages flight_record.delete_if do |k,v| if(k == connection_identifier) warn "Removing in flight reference due to failed connection: #{k}" true end end true end |
- (Krakow::FrameType::Error?) set_ready_for(connection, *_)
Send RDY for given connection
79 80 81 82 83 84 85 |
# File 'lib/krakow/distribution.rb', line 79 def set_ready_for(connection, *_) connection.transmit( Command::Rdy.new( :count => ready_for(connection.identifier) ) ) end |
- (TrueClass) success(connection_identifier)
Log success of processed message
198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/krakow/distribution.rb', line 198 def success(connection_identifier) if(backoff_interval) registry_info = registry_lookup(connection_identifier) if(registry_info[:failures] > 1) registry_info[:failures] -= 1 registry_info[:backoff_until] = Time.now.to_i + (registry_info[:failures] * backoff_interval) else registry_info[:failures] = 0 end end true end |
- (String) to_s Originally defined in module Utils::Lazy::InstanceMethods
- (Krakow::Connection) unregister_message(message)
Remove message metadata from registry
56 57 58 59 60 61 62 63 64 |
# File 'lib/krakow/distribution.rb', line 56 def () msg_id = .respond_to?(:message_id) ? . : .to_s connection = connection_lookup(flight_record[msg_id]) registry_info = registry_lookup(connection.identifier) flight_record.delete(msg_id) registry_info[:in_flight] -= 1 calculate_ready!(connection.identifier) connection end |
- (Numeric) watch_dog_interval
Returns the watch_dog_interval attribute
27 |
# File 'lib/krakow/distribution.rb', line 27 attribute :watch_dog_interval, Numeric, :default => 1.0 |
- (TrueClass, FalseClass) watch_dog_interval?
Returns truthiness of the watch_dog_interval attribute
27 |
# File 'lib/krakow/distribution.rb', line 27 attribute :watch_dog_interval, Numeric, :default => 1.0 |