Class: Krakow::ConnectionFeatures::SnappyFrames::Io

Inherits:
Object
  • Object
show all
Defined in:
lib/krakow/connection_features/snappy_frames.rb

Overview

Snappy-able IO

Constant Summary

IDENTIFIER =

Header identifier

"\x73\x4e\x61\x50\x70\x59".force_encoding('ASCII-8BIT')
IDENTIFIER_SIZE =

Size of identifier

ident_size
CHUNK_TYPE =

Mapping of types

{
  "\xff".force_encoding('ASCII-8BIT') => :identifier,
  "\x00".force_encoding('ASCII-8BIT') => :compressed,
  "\x01".force_encoding('ASCII-8BIT') => :uncompressed
}

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Io) initialize(io, args = {})

Create new snappy-able IO

Parameters:

  • io (IO)

    IO to wrap



34
35
36
37
38
# File 'lib/krakow/connection_features/snappy_frames.rb', line 34

def initialize(io, args={})
  @io = io
  @snappy_write_ident = false
  @buffer = ''
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

- (Object) method_missing(*args)

Proxy to underlying socket

Parameters:

  • args (Object)

Returns:

  • (Object)


44
45
46
# File 'lib/krakow/connection_features/snappy_frames.rb', line 44

def method_missing(*args)
  io.__send__(*args)
end

Instance Attribute Details

- (Object) buffer (readonly)

Returns the value of attribute buffer



28
29
30
# File 'lib/krakow/connection_features/snappy_frames.rb', line 28

def buffer
  @buffer
end

- (Object) io (readonly)

Returns the value of attribute io



28
29
30
# File 'lib/krakow/connection_features/snappy_frames.rb', line 28

def io
  @io
end

Instance Method Details

- (String) checksum_mask(checksum)

Mask the checksum

Parameters:

  • checksum (String)

Returns:

  • (String)


52
53
54
# File 'lib/krakow/connection_features/snappy_frames.rb', line 52

def checksum_mask(checksum)
  (((checksum >> 15) | (checksum << 17)) + 0xa282ead8) & 0xffffffff
end

- (String) read_stream

Read contents from stream

Returns:

  • (String)


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/krakow/connection_features/snappy_frames.rb', line 70

def read_stream
  header = io.recv(4)
  ident = CHUNK_TYPE[header.slice!(0)]
  size = (header << CHUNK_TYPE.key(:compressed)).unpack('L<').first
  content = io.recv(size)
  case ident
  when :identifier
    unless(content == IDENTIFIER)
      raise "Invalid stream identification encountered (content: #{content.inspect})"
    end
    read_stream
  when :compressed
    checksum = content.slice!(0, 4).unpack('L<').first
    deflated = Snappy.inflate(content)
    digest = Digest::CRC32c.new
    digest << deflated
    unless(checksum == checksum_mask(digest.checksum))
      raise 'Checksum mismatch!'
    end
    buffer << deflated
  when :uncompressed
    buffer << content
  end
end

- (String) recv(n) Also known as: read

Receive bytes from the IO

Parameters:

  • n (Integer)

    nuber of bytes

Returns:

  • (String)


60
61
62
63
64
# File 'lib/krakow/connection_features/snappy_frames.rb', line 60

def recv(n)
  read_stream unless buffer.size >= n
  result = buffer.slice!(0,n)
  result.empty? ? nil : result
end

- (Integer) send_snappy_identifier

Send the identifier for snappy content

Returns:

  • (Integer)

    bytes written



117
118
119
# File 'lib/krakow/connection_features/snappy_frames.rb', line 117

def send_snappy_identifier
  io.write [CHUNK_TYPE.key(:identifier), IDENTIFIER_SIZE, IDENTIFIER].pack('a*a*a*')
end

- (Integer) write(string)

Write string to IO

Parameters:

  • string (String)

Returns:

  • (Integer)

    number of bytes written



99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/krakow/connection_features/snappy_frames.rb', line 99

def write(string)
  unless(@snappy_writer_ident)
    send_snappy_identifier
  end
  digest = Digest::CRC32c.new
  digest << string
  content = Snappy.deflate(string)
  size = content.length + 4
  size = [size].pack('L<')
  size.slice!(-1,1)
  checksum = [checksum_mask(digest.checksum)].pack('L<')
  output = [CHUNK_TYPE.key(:compressed), size, checksum, content].pack('a*a*a*a*')
  io.write output
end