Class: Krakow::Consumer
- Inherits:
-
Object
- Object
- Krakow::Consumer
- Includes:
- Celluloid, Utils::Lazy
- Defined in:
- lib/krakow/consumer.rb
Overview
Consume messages from a server
Instance Attribute Summary (collapse)
-
- (Object) connections
readonly
Returns the value of attribute connections.
-
- (Object) discovery
readonly
Returns the value of attribute discovery.
-
- (Object) distribution
readonly
Returns the value of attribute distribution.
-
- (Object) queue
readonly
Returns the value of attribute queue.
Attributes (collapse)
-
- (Numeric) backoff_interval
The backoff_interval attribute.
-
- (TrueClass, FalseClass) backoff_interval?
Truthiness of the backoff_interval attribute.
-
- (String) channel
The channel attribute.
-
- (TrueClass, FalseClass) channel?
Truthiness of the channel attribute.
-
- (Hash) connection_options
The connection_options attribute.
-
- (TrueClass, FalseClass) connection_options?
Truthiness of the connection_options attribute.
-
- (Numeric) discovery_interval
The discovery_interval attribute.
-
- (TrueClass, FalseClass) discovery_interval?
Truthiness of the discovery_interval attribute.
-
- (Numeric) discovery_jitter
The discovery_jitter attribute.
-
- (TrueClass, FalseClass) discovery_jitter?
Truthiness of the discovery_jitter attribute.
-
- (String) host
The host attribute.
-
- (TrueClass, FalseClass) host?
Truthiness of the host attribute.
-
- (Integer) max_in_flight
The max_in_flight attribute.
-
- (TrueClass, FalseClass) max_in_flight?
Truthiness of the max_in_flight attribute.
-
- (Celluloid::Actor) notifier
The notifier attribute.
-
- (TrueClass, FalseClass) notifier?
Truthiness of the notifier attribute.
-
- ([Array, String]) nsqlookupd
The nsqlookupd attribute.
-
- (TrueClass, FalseClass) nsqlookupd?
Truthiness of the nsqlookupd attribute.
-
- ([String, Integer]) port
The port attribute.
-
- (TrueClass, FalseClass) port?
Truthiness of the port attribute.
-
- (String) topic
The topic attribute.
-
- (TrueClass, FalseClass) topic?
Truthiness of the topic attribute.
Instance Method Summary (collapse)
-
- (Krakow::Connection?) build_connection(host, port, queue)
Build a new [Krakow::Connection].
-
- (TrueClass) confirm(message_id)
(also: #finish)
Confirm message has been processed.
-
- (Krakow::Connection) connection(key)
Returns [Krakow::Connection] associated to key.
-
- (nil) connection_failure(actor, reason)
Remove connection references when connection is terminated.
-
- (nil) connection_reconnect(connection)
Action to take when a connection has reconnected.
-
- (nil) discover
Start the discovery interval lookup.
-
- (nil) goodbye_my_love!
Instance destructor.
-
- (nil) init!
Initialize the consumer by starting lookup and adding connections.
-
- (Consumer) initialize(args = {})
constructor
A new instance of Consumer.
-
- (Logger?) log(*args)
included
from Utils::Logging
Log message.
-
- (Krakow::FrameType) process_message(message, connection)
Process a given message if required.
-
- (TrueClass, FalseClass) register(connection)
Register connection with distribution.
-
- (TrueClass) requeue(message_id, timeout = 0)
Requeue message (generally due to processing failure).
-
- (String) to_s
Stringify object.
-
- (TrueClass) touch(message_id)
Touch message (to extend timeout).
-
- (nil) update_ready!(connection)
Send RDY for connection based on distribution rules.
Constructor Details
- (Consumer) initialize(args = {})
Returns a new instance of Consumer
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/krakow/consumer.rb', line 39 def initialize(args={}) super arguments[:connection_options] = {:features => {}, :config => {}}.merge( arguments[:connection_options] || {} ) @connections = {} @distribution = Distribution::Default.new( :max_in_flight => max_in_flight, :backoff_interval => backoff_interval, :consumer => current_actor ) @queue = Queue.new if(nsqlookupd) debug "Connections will be established via lookup #{nsqlookupd.inspect}" @discovery = Discovery.new(:nsqlookupd => nsqlookupd) discover elsif(host && port) debug "Connection will be established via direct connection #{host}:#{port}" connection = build_connection(host, port, queue) if(register(connection)) info "Registered new connection #{connection}" distribution.redistribute! else abort Error::ConnectionFailure.new("Failed to establish subscription at provided end point (#{host}:#{port}") end else abort Error::ConfigurationError.new('No connection information provided!') end end |
Instance Attribute Details
- (Object) connections (readonly)
Returns the value of attribute connections
16 17 18 |
# File 'lib/krakow/consumer.rb', line 16 def connections @connections end |
- (Object) discovery (readonly)
Returns the value of attribute discovery
16 17 18 |
# File 'lib/krakow/consumer.rb', line 16 def discovery @discovery end |
- (Object) distribution (readonly)
Returns the value of attribute distribution
16 17 18 |
# File 'lib/krakow/consumer.rb', line 16 def distribution @distribution end |
- (Object) queue (readonly)
Returns the value of attribute queue
16 17 18 |
# File 'lib/krakow/consumer.rb', line 16 def queue @queue end |
Instance Method Details
- (Numeric) backoff_interval
Returns the backoff_interval attribute
31 |
# File 'lib/krakow/consumer.rb', line 31 attribute :backoff_interval, Numeric |
- (TrueClass, FalseClass) backoff_interval?
Returns truthiness of the backoff_interval attribute
31 |
# File 'lib/krakow/consumer.rb', line 31 attribute :backoff_interval, Numeric |
- (Krakow::Connection?) build_connection(host, port, queue)
Build a new [Krakow::Connection]
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/krakow/consumer.rb', line 101 def build_connection(host, port, queue) begin connection = Connection.new( :host => host, :port => port, :queue => queue, :topic => topic, :channel => channel, :notifier => notifier, :features => [:features], :features_args => [:config], :callbacks => { :handle => { :actor => current_actor, :method => :process_message }, :reconnect => { :actor => current_actor, :method => :connection_reconnect } } ) rescue => e error "Failed to build connection (host: #{host} port: #{port} queue: #{queue}) - #{e.class}: #{e}" debug "#{e.class}: #{e}\n#{e.backtrace.join("\n")}" nil end end |
- (String) channel
Returns the channel attribute
26 |
# File 'lib/krakow/consumer.rb', line 26 attribute :channel, String, :required => true |
- (TrueClass, FalseClass) channel?
Returns truthiness of the channel attribute
26 |
# File 'lib/krakow/consumer.rb', line 26 attribute :channel, String, :required => true |
- (TrueClass) confirm(message_id) Also known as: finish
Confirm message has been processed
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/krakow/consumer.rb', line 237 def confirm() = . if .respond_to?(:message_id) begin distribution.in_flight_lookup() do |connection| distribution.() connection.transmit(Command::Fin.new(:message_id => )) distribution.success(connection.identifier) update_ready!(connection) end true rescue KeyError => e error "Message confirmation failed: #{e}" abort e rescue Error::ConnectionUnavailable => e retry end end |
- (Krakow::Connection) connection(key)
Returns [Krakow::Connection] associated to key
73 74 75 |
# File 'lib/krakow/consumer.rb', line 73 def connection(key) @connections[key] end |
- (nil) connection_failure(actor, reason)
Remove connection references when connection is terminated
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/krakow/consumer.rb', line 216 def connection_failure(actor, reason) connections.delete_if do |key, value| if(value == actor && reason.nil?) warn "Connection failure detected. Removing connection: #{key} - #{reason || 'no reason provided'}" begin distribution.remove_connection(key) rescue Error::ConnectionUnavailable, Error::ConnectionFailure warn 'Caught connection unavailability' end distribution.redistribute! true end end nil end |
- (Hash) connection_options
Returns the connection_options attribute
35 |
# File 'lib/krakow/consumer.rb', line 35 attribute :connection_options, Hash, :default => ->{ Hash.new } |
- (TrueClass, FalseClass) connection_options?
Returns truthiness of the connection_options attribute
35 |
# File 'lib/krakow/consumer.rb', line 35 attribute :connection_options, Hash, :default => ->{ Hash.new } |
- (nil) connection_reconnect(connection)
Action to take when a connection has reconnected
147 148 149 150 151 |
# File 'lib/krakow/consumer.rb', line 147 def connection_reconnect(connection) connection.transmit(Command::Sub.new(:topic_name => topic, :channel_name => channel)) distribution.set_ready_for(connection) nil end |
- (nil) discover
Start the discovery interval lookup
187 188 189 190 |
# File 'lib/krakow/consumer.rb', line 187 def discover init! after(discovery_interval + (discovery_jitter * rand)){ discover } end |
- (Numeric) discovery_interval
Returns the discovery_interval attribute
32 |
# File 'lib/krakow/consumer.rb', line 32 attribute :discovery_interval, Numeric, :default => 30 |
- (TrueClass, FalseClass) discovery_interval?
Returns truthiness of the discovery_interval attribute
32 |
# File 'lib/krakow/consumer.rb', line 32 attribute :discovery_interval, Numeric, :default => 30 |
- (Numeric) discovery_jitter
Returns the discovery_jitter attribute
33 |
# File 'lib/krakow/consumer.rb', line 33 attribute :discovery_jitter, Numeric, :default => 10.0 |
- (TrueClass, FalseClass) discovery_jitter?
Returns truthiness of the discovery_jitter attribute
33 |
# File 'lib/krakow/consumer.rb', line 33 attribute :discovery_jitter, Numeric, :default => 10.0 |
- (nil) goodbye_my_love!
Instance destructor
85 86 87 88 89 90 91 92 93 |
# File 'lib/krakow/consumer.rb', line 85 def goodbye_my_love! debug 'Tearing down consumer' connections.values.each do |con| con.terminate if con.alive? end distribution.terminate if distribution && distribution.alive? info 'Consumer torn down' nil end |
- (String) host
Returns the host attribute
27 |
# File 'lib/krakow/consumer.rb', line 27 attribute :host, String |
- (TrueClass, FalseClass) host?
Returns truthiness of the host attribute
27 |
# File 'lib/krakow/consumer.rb', line 27 attribute :host, String |
- (nil) init!
Initialize the consumer by starting lookup and adding connections
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/krakow/consumer.rb', line 165 def init! debug 'Running consumer `init!` connection builds' found = discovery.lookup(topic) debug "Discovery results: #{found.inspect}" connection = nil found.each do |node| debug "Processing discovery result: #{node.inspect}" key = Connection.identifier(node[:broadcast_address], node[:tcp_port], topic, channel) unless(connections[key]) connection = build_connection(node[:broadcast_address], node[:tcp_port], queue) info "Registered new connection #{connection}" if register(connection) else debug "Discovery result already registered: #{node.inspect}" end end distribution.redistribute! if connection nil end |
- (Logger?) log(*args) Originally defined in module Utils::Logging
Log message
- (Integer) max_in_flight
Returns the max_in_flight attribute
30 |
# File 'lib/krakow/consumer.rb', line 30 attribute :max_in_flight, Integer, :default => 1 |
- (TrueClass, FalseClass) max_in_flight?
Returns truthiness of the max_in_flight attribute
30 |
# File 'lib/krakow/consumer.rb', line 30 attribute :max_in_flight, Integer, :default => 1 |
- (Celluloid::Actor) notifier
Returns the notifier attribute
34 |
# File 'lib/krakow/consumer.rb', line 34 attribute :notifier, Celluloid::Actor |
- (TrueClass, FalseClass) notifier?
Returns truthiness of the notifier attribute
34 |
# File 'lib/krakow/consumer.rb', line 34 attribute :notifier, Celluloid::Actor |
- ([Array, String]) nsqlookupd
Returns the nsqlookupd attribute
29 |
# File 'lib/krakow/consumer.rb', line 29 attribute :nsqlookupd, [Array, String] |
- (TrueClass, FalseClass) nsqlookupd?
Returns truthiness of the nsqlookupd attribute
29 |
# File 'lib/krakow/consumer.rb', line 29 attribute :nsqlookupd, [Array, String] |
- ([String, Integer]) port
Returns the port attribute
28 |
# File 'lib/krakow/consumer.rb', line 28 attribute :port, [String, Integer] |
- (TrueClass, FalseClass) port?
Returns truthiness of the port attribute
28 |
# File 'lib/krakow/consumer.rb', line 28 attribute :port, [String, Integer] |
- (Krakow::FrameType) process_message(message, connection)
Process a given message if required
135 136 137 138 139 140 141 |
# File 'lib/krakow/consumer.rb', line 135 def (, connection) if(.is_a?(FrameType::Message)) distribution.(, connection.identifier) .origin = current_actor end end |
- (TrueClass, FalseClass) register(connection)
Register connection with distribution
196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/krakow/consumer.rb', line 196 def register(connection) begin connection.init! connection.transmit(Command::Sub.new(:topic_name => topic, :channel_name => channel)) self.link connection connections[connection.identifier] = connection distribution.add_connection(connection) true rescue Error::BadResponse => e debug "Failed to establish connection: #{e.result.error}" connection.terminate false end end |
- (TrueClass) requeue(message_id, timeout = 0)
Requeue message (generally due to processing failure)
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/krakow/consumer.rb', line 261 def requeue(, timeout=0) = . if .respond_to?(:message_id) distribution.in_flight_lookup() do |connection| distribution.() connection.transmit( Command::Req.new( :message_id => , :timeout => timeout ) ) distribution.failure(connection.identifier) update_ready!(connection) end true end |
- (String) to_s
Returns stringify object
78 79 80 |
# File 'lib/krakow/consumer.rb', line 78 def to_s "<#{self.class.name}:#{object_id} T:#{topic} C:#{channel}>" end |
- (String) topic
Returns the topic attribute
25 |
# File 'lib/krakow/consumer.rb', line 25 attribute :topic, String, :required => true |
- (TrueClass, FalseClass) topic?
Returns truthiness of the topic attribute
25 |
# File 'lib/krakow/consumer.rb', line 25 attribute :topic, String, :required => true |
- (TrueClass) touch(message_id)
Touch message (to extend timeout)
281 282 283 284 285 286 287 288 289 |
# File 'lib/krakow/consumer.rb', line 281 def touch() = . if .respond_to?(:message_id) distribution.in_flight_lookup() do |connection| connection.transmit( Command::Touch.new(:message_id => ) ) end true end |
- (nil) update_ready!(connection)
Send RDY for connection based on distribution rules
157 158 159 160 |
# File 'lib/krakow/consumer.rb', line 157 def update_ready!(connection) distribution.set_ready_for(connection) nil end |