Class: Krakow::Producer

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Utils::Lazy
Defined in:
lib/krakow/producer.rb,
lib/krakow/producer/http.rb

Overview

TCP based producer

Defined Under Namespace

Classes: Http

Instance Attribute Summary (collapse)

Attributes (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Producer) initialize(args = {})

Returns a new instance of Producer



37
38
39
40
41
42
43
# File 'lib/krakow/producer.rb', line 37

def initialize(args={})
  super
  arguments[:connection_options] = {:features => {}, :config => {}}.merge(
    arguments.fetch(:connection_options, {})
  )
  connect
end

Instance Attribute Details

- (Object) connection (readonly)

Returns the value of attribute connection



19
20
21
# File 'lib/krakow/producer.rb', line 19

def connection
  @connection
end

Instance Method Details

- (Object) connect

Establish connection to configured `host` and `port`

Returns:

  • nil



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/krakow/producer.rb', line 48

def connect
  info "Establishing connection to: #{host}:#{port}"
  begin
    @connection = Connection.new(
      :host => host,
      :port => port,
      :features => connection_options[:features],
      :features_args => connection_options[:config]
    )
    connection.init!
    self.link connection
    info "Connection established: #{connection}"
    nil
  rescue => e
    abort e
  end
end

- (TrueClass, FalseClass) connected?

Returns currently connected to server

Returns:

  • (TrueClass, FalseClass)

    currently connected to server



72
73
74
# File 'lib/krakow/producer.rb', line 72

def connected?
  !!(connection && connection.alive? && connection.connected?)
end

- (TrueClass) connection_failure(*args)

Process connection failure and attempt reconnection

Returns:

  • (TrueClass)


79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/krakow/producer.rb', line 79

def connection_failure(*args)
  @connection = nil
  begin
    warn "Connection failure detected for #{host}:#{port}"
    connect
  rescue => e
    warn "Failed to establish connection to #{host}:#{port}. Pausing #{reconnect_interval} before retry"
    sleep reconnect_interval
    connect
  end
  true
end

- (Hash) connection_options

Returns the connection_options attribute

Returns:

  • (Hash)

    the connection_options attribute



33
# File 'lib/krakow/producer.rb', line 33

attribute :connection_options, Hash, :default => ->{ Hash.new }

- (TrueClass, FalseClass) connection_options?

Returns truthiness of the connection_options attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the connection_options attribute



33
# File 'lib/krakow/producer.rb', line 33

attribute :connection_options, Hash, :default => ->{ Hash.new }

- (Object) goodbye_my_love!

Instance destructor

Returns:

  • nil



94
95
96
97
98
99
100
101
102
# File 'lib/krakow/producer.rb', line 94

def goodbye_my_love!
  debug 'Tearing down producer'
  if(connection && connection.alive?)
    connection.terminate
  end
  @connection = nil
  info 'Producer torn down'
  nil
end

- (String) host

Returns the host attribute

Returns:

  • (String)

    the host attribute



28
# File 'lib/krakow/producer.rb', line 28

attribute :host, String, :required => true

- (TrueClass, FalseClass) host?

Returns truthiness of the host attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the host attribute



28
# File 'lib/krakow/producer.rb', line 28

attribute :host, String, :required => true

- (Logger?) log(*args) Originally defined in module Utils::Logging

Log message

Parameters:

  • args (Array, nil)

Returns:

  • (Logger, nil)

- ([String, Integer]) port

Returns the port attribute

Returns:

  • ([String, Integer])

    the port attribute



29
# File 'lib/krakow/producer.rb', line 29

attribute :port, [String, Integer], :required => true

- (TrueClass, FalseClass) port?

Returns truthiness of the port attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the port attribute



29
# File 'lib/krakow/producer.rb', line 29

attribute :port, [String, Integer], :required => true

- (Integer) reconnect_interval

Returns the reconnect_interval attribute

Returns:

  • (Integer)

    the reconnect_interval attribute



32
# File 'lib/krakow/producer.rb', line 32

attribute :reconnect_interval, Integer, :default => 5

- (TrueClass, FalseClass) reconnect_interval?

Returns truthiness of the reconnect_interval attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the reconnect_interval attribute



32
# File 'lib/krakow/producer.rb', line 32

attribute :reconnect_interval, Integer, :default => 5

- (Integer) reconnect_retries

Returns the reconnect_retries attribute

Returns:

  • (Integer)

    the reconnect_retries attribute



31
# File 'lib/krakow/producer.rb', line 31

attribute :reconnect_retries, Integer, :default => 10

- (TrueClass, FalseClass) reconnect_retries?

Returns truthiness of the reconnect_retries attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the reconnect_retries attribute



31
# File 'lib/krakow/producer.rb', line 31

attribute :reconnect_retries, Integer, :default => 10

- (String) to_s

Returns stringify object

Returns:

  • (String)

    stringify object



67
68
69
# File 'lib/krakow/producer.rb', line 67

def to_s
  "<#{self.class.name}:#{object_id} {#{host}:#{port}} T:#{topic}>"
end

- (String) topic

Returns the topic attribute

Returns:

  • (String)

    the topic attribute



30
# File 'lib/krakow/producer.rb', line 30

attribute :topic, String, :required => true

- (TrueClass, FalseClass) topic?

Returns truthiness of the topic attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the topic attribute



30
# File 'lib/krakow/producer.rb', line 30

attribute :topic, String, :required => true

- (Krakow::FrameType::Error?) write(*message)

Write message to server

Parameters:

  • message (String)

    message to write

Returns:

Raises:



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/krakow/producer.rb', line 109

def write(*message)
  if(message.empty?)
    abort ArgumentError.new 'Expecting one or more messages to send. None provided.'
  end
  if(connection && connection.alive?)
    if(message.size > 1)
      debug 'Multiple message publish'
      connection.transmit(
        Command::Mpub.new(
          :topic_name => topic,
          :messages => message
        )
      )
    else
      debug 'Single message publish'
      connection.transmit(
        Command::Pub.new(
          :message => message.first,
          :topic_name => topic
        )
      )
    end
  else
    abort Error::ConnectionUnavailable.new 'Remote connection is unavailable!'
  end
end