Module constriction.stream.queue
Range Coding: a stream code with queue semantics (i.e., "first in first out") [1, 2].
The Range Coding algorithm is a variation on Arithmetic Coding [1, 3] that runs more efficiently on standard computing hardware.
Example
The following example shows a full round trip that encodes a message, prints its compressed
representation, and then decodes the message again. The message is a sequence of 11 integers
(referred to as "symbols") and comprised of two parts: the first 7 symbols are encoded with an
i.i.d. entropy model, i.e., using the same distribution for each symbol, which happens to be a
Categorical
distribution; and the remaining
4 symbols are each encoded with a different entropy model, but all of these 4 models are from
the same family of QuantizedGaussian
s,
just with different model parameters (means and standard deviations) for each of the 4 symbols.
import constriction
import numpy as np
# Define the two parts of the message and their respective entropy models:
message_part1 = np.array([1, 2, 0, 3, 2, 3, 0], dtype=np.int32)
probabilities_part1 = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float32)
model_part1 = constriction.stream.model.Categorical(probabilities_part1, perfect=False)
# `model_part1` is a categorical distribution over the (implied) alphabet
# {0,1,2,3} with P(X=0) = 0.2, P(X=1) = 0.4, P(X=2) = 0.1, and P(X=3) = 0.3;
# we will use it below to encode each of the 7 symbols in `message_part1`.
message_part2 = np.array([6, 10, -4, 2 ], dtype=np.int32)
means_part2 = np.array([2.5, 13.1, -1.1, -3.0], dtype=np.float32)
stds_part2 = np.array([4.1, 8.7, 6.2, 5.4], dtype=np.float32)
model_family_part2 = constriction.stream.model.QuantizedGaussian(-100, 100)
# `model_family_part2` is a *family* of Gaussian distributions, quantized to
# bins of width 1 centered at the integers -100, -99, ..., 100. We could
# have provided a fixed mean and standard deviation to the constructor of
# `QuantizedGaussian` but we'll instead provide individual means and standard
# deviations for each symbol when we encode and decode `message_part2` below.
print(f"Original message: {np.concatenate([message_part1, message_part2])}")
# Encode both parts of the message in sequence:
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(message_part1, model_part1)
encoder.encode(message_part2, model_family_part2, means_part2, stds_part2)
# Get and print the compressed representation:
compressed = encoder.get_compressed()
print(f"compressed representation: {compressed}")
print(f"(in binary: {[bin(word) for word in compressed]})")
# You could save `compressed` to a file using `compressed.tofile("filename")`
# and read it back in: `compressed = np.fromfile("filename", dtype=np.uint32).
# Decode the message:
decoder = constriction.stream.queue.RangeDecoder(compressed)
decoded_part1 = decoder.decode(model_part1, 7) # (decodes 7 symbols)
decoded_part2 = decoder.decode(model_family_part2, means_part2, stds_part2)
print(f"Decoded message: {np.concatenate([decoded_part1, decoded_part2])}")
assert np.all(decoded_part1 == message_part1)
assert np.all(decoded_part2 == message_part2)
References
[1] Pasco, Richard Clark. Source coding algorithms for fast data compression. Diss. Stanford University, 1976.
[2] Martin, G. Nigel N. "Range encoding: an algorithm for removing redundancy from a digitised message." Proc. Institution of Electronic and Radio Engineers International Conference on Video and Data Recording. 1979.
[3] Rissanen, Jorma, and Glen G. Langdon. "Arithmetic coding." IBM Journal of research and development 23.2 (1979): 149-162.
Classes
class RangeDecoder (compressed)
-
A decoder of data that was previously encoded with a
RangeEncoder
.The constructor expects a single argument
compressed
, which has to be a rank-1 numpy array withdtype=np.uint32
that contains the compressed data (as returned by the methodget_compressed
of aRangeEncoder
). The provided compressed data gets copied into an internal buffer of theRangeDecoder
.To decode data with a
RangeDecoder
, call its methoddecode
one or more times. Each decoding operation consumes some portion of the compressed data from theRangeDecoder
's internal buffer.Example
See module level example.
Methods
def clone(self, /)
-
Creates a deep copy of the coder and returns it.
The returned copy will initially encapsulate the identical compressed data as the original coder, but the two coders can be used independently without influencing other.
def decode(self, /, model, *optional_amt_or_model_params)
-
Decodes one or more symbols, consuming them from the encapsulated compressed data.
This method can be called in 3 different ways:
Option 1: decode(model)
Decodes a single symbol with a concrete (i.e., fully parameterized) entropy model and returns the decoded symbol; (for optimal computational efficiency, don't use this option in a loop if you can instead use one of the two alternative options below.)
For example:
# Define a concrete categorical entropy model over the (implied) # alphabet {0, 1, 2}: probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float32) model = constriction.stream.model.Categorical(probabilities, perfect=False) # Decode a single symbol from some example compressed data: compressed = np.array([3089773345, 1894195597], dtype=np.uint32) decoder = constriction.stream.queue.RangeDecoder(compressed) symbol = decoder.decode(model) print(symbol) # (prints: 2) # ... then decode some more symbols ...
Option 2: decode(model, amt) [where
amt
is an integer]Decodes
amt
i.i.d. symbols using the same concrete (i.e., fully parametrized) entropy model for each symbol, and returns the decoded symbols as a rank-1 numpy array withdtype=np.int32
and lengthamt
;For example:
# Use the same concrete entropy model as in the previous example: probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float32) model = constriction.stream.model.Categorical(probabilities, perfect=False) # Decode 9 symbols from some example compressed data, using the # same (fixed) entropy model defined above for all symbols: compressed = np.array([369323598], dtype=np.uint32) decoder = constriction.stream.queue.RangeDecoder(compressed) symbols = decoder.decode(model, 9) print(symbols) # (prints: [0, 2, 1, 2, 0, 2, 0, 2, 1])
Option 3: decode(model_family, params1, params2, …)
Decodes multiple symbols, using the same family of entropy models (e.g., categorical or quantized Gaussian) for all symbols, but with different model parameters for each symbol, and returns the decoded symbols as a rank-1 numpy array with
dtype=np.int32
; here, allparamsX
arguments are arrays of equal length (the number of symbols to be decoded). The number of requiredparamsX
arguments and their shapes anddtype
s depend on the model family.For example, the
QuantizedGaussian
model family expects two rank-1 model parameters with a floatdtype
, which specify the mean and standard deviation for each entropy model:# Define a generic quantized Gaussian distribution for all integers # in the range from -100 to 100 (both ends inclusive): model_family = constriction.stream.model.QuantizedGaussian(-100, 100) # Specify the model parameters for each symbol: means = np.array([10.3, -4.7, 20.5], dtype=np.float32) stds = np.array([ 5.2, 24.2, 3.1], dtype=np.float32) # Decode a message from some example compressed data: compressed = np.array([2655472005], dtype=np.uint32) decoder = constriction.stream.queue.RangeDecoder(compressed) symbols = decoder.decode(model_family, means, stds) print(symbols) # (prints: [12, -13, 25])
By contrast, the
Categorical
model family expects a single rank-2 model parameter where the i'th row lists the probabilities for each possible value of the i'th symbol:# Define 2 categorical models over the alphabet {0, 1, 2, 3, 4}: probabilities = np.array( [[0.1, 0.2, 0.3, 0.1, 0.3], # (for first decoded symbol) [0.3, 0.2, 0.2, 0.2, 0.1]], # (for second decoded symbol) dtype=np.float32) model_family = constriction.stream.model.Categorical(perfect=False) # Decode 2 symbols: compressed = np.array([2705829535], dtype=np.uint32) decoder = constriction.stream.queue.RangeDecoder(compressed) symbols = decoder.decode(model_family, probabilities) print(symbols) # (prints: [3, 1])
def maybe_exhausted(self, /)
-
Returns
True
if all compressed data may have already been decoded andFalse
if there is definitely still some more data available to decode.A return value of
True
does not necessarily mean that there is no data left on the decoder becauseconstriction
's range coding implementation–by design–cannot detect end- of-stream in all cases. If you need ot be able to decode variable-length messages then you can introduce an "end of stream" sentinel symbol, which you append to all messages before encoding them. def seek(self, /, position, state)
-
Jumps to a checkpoint recorded with method
pos
during encoding.This allows random-access decoding. The arguments
position
andstate
are the two values returned by theRangeEncoder
's methodpos
.Example
probabilities = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float32) model = constriction.stream.model.Categorical(probabilities, perfect=False) message_part1 = np.array([1, 2, 0, 3, 2, 3, 0], dtype=np.int32) message_part2 = np.array([2, 2, 0, 1, 3], dtype=np.int32) # Encode both parts of the message and record a checkpoint in-between: encoder = constriction.stream.queue.RangeEncoder() encoder.encode(message_part1, model) (position, state) = encoder.pos() # Records a checkpoint. encoder.encode(message_part2, model) compressed = encoder.get_compressed() decoder = constriction.stream.queue.RangeDecoder(compressed) # Decode first symbol: print(decoder.decode(model)) # (prints: 1) # Jump to part 2 and decode it: decoder.seek(position, state) decoded_part2 = decoder.decode(model, 5) assert np.all(decoded_part2 == message_part2)
class RangeEncoder
-
An encoder that uses the range coding algorithm.
To encode data with a
RangeEncoder
, call its methodencode
one or more times. ARangeEncoder
has an internal buffer of compressed data, and eachencode
operation appends to this internal buffer. You can copy out the contents of the internal buffer by calling the methodget_compressed
. This will return a rank-1 numpy array withdtype=np.uint32
that you can pass to the constructor of aRangeDecoder
or write to a file for decoding at some later time (see example in the documentation of the methodget_compressed
).Example
See module level example.
Methods
def clear(self, /)
-
Resets the encoder to an empty state.
This removes any existing compressed data on the coder. It is equivalent to replacing the coder with a new one but slightly more efficient.
def clone(self, /)
-
Creates a deep copy of the coder and returns it.
The returned copy will initially encapsulate the identical compressed data as the original coder, but the two coders can be used independently without influencing other.
def encode(self, /, symbols, model, *optional_model_params)
-
Encodes one or more symbols, appending them to the encapsulated compressed data.
This method can be called in 3 different ways:
Option 1: encode(symbol, model)
Encodes a single symbol with a concrete (i.e., fully parameterized) entropy model; (for optimal computational efficiency, don't use this option in a loop if you can instead use one of the two alternative options below.)
For example:
# Define a concrete categorical entropy model over the (implied) # alphabet {0, 1, 2}: probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float32) model = constriction.stream.model.Categorical(probabilities, perfect=False) # Encode a single symbol with this entropy model: encoder = constriction.stream.queue.RangeEncoder() encoder.encode(2, model) # Encodes the symbol `2`. # ... then encode some more symbols ...
Option 2: encode(symbols, model)
Encodes multiple i.i.d. symbols, i.e., all symbols in the rank-1 array
symbols
will be encoded with the same concrete (i.e., fully parameterized) entropy model.For example:
# Use the same concrete entropy model as in the previous example: probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float32) model = constriction.stream.model.Categorical(probabilities, perfect=False) # Encode an example message using the above `model` for all symbols: symbols = np.array([0, 2, 1, 2, 0, 2, 0, 2, 1], dtype=np.int32) encoder = constriction.stream.queue.RangeEncoder() encoder.encode(symbols, model) print(encoder.get_compressed()) # (prints: [369323576])
Option 3: encode(symbols, model_family, params1, params2, …)
Encodes multiple symbols, using the same family of entropy models (e.g., categorical or quantized Gaussian) for all symbols, but with different model parameters for each symbol; here, each
paramsX
argument is an array of the same length assymbols
. The number of requiredparamsX
arguments and their shapes anddtype
s depend on the model family.For example, the
QuantizedGaussian
model family expects two rank-1 model parameters with floatdtype
, which specify the mean and standard deviation for each entropy model:# Define a generic quantized Gaussian distribution for all integers # in the range from -100 to 100 (both ends inclusive): model_family = constriction.stream.model.QuantizedGaussian(-100, 100) # Specify the model parameters for each symbol: means = np.array([10.3, -4.7, 20.5], dtype=np.float32) stds = np.array([ 5.2, 24.2, 3.1], dtype=np.float32) # Encode an example message: # (needs `len(symbols) == len(means) == len(stds)`) symbols = np.array([12, -13, 25], dtype=np.int32) encoder = constriction.stream.queue.RangeEncoder() encoder.encode(symbols, model_family, means, stds) print(encoder.get_compressed()) # (prints: [2655472005])
By contrast, the
Categorical
model family expects a single rank-2 model parameter where the i'th row lists the probabilities for each possible value of the i'th symbol:# Define 2 categorical models over the alphabet {0, 1, 2, 3, 4}: probabilities = np.array( [[0.1, 0.2, 0.3, 0.1, 0.3], # (for first encoded symbol) [0.3, 0.2, 0.2, 0.2, 0.1]], # (for second encoded symbol) dtype=np.float32) model_family = constriction.stream.model.Categorical(perfect=False) # Encode 2 symbols (needs `len(symbols) == probabilities.shape[0]`): symbols = np.array([3, 1], dtype=np.int32) encoder = constriction.stream.queue.RangeEncoder() encoder.encode(symbols, model_family, probabilities) print(encoder.get_compressed()) # (prints: [2705829510])
def get_compressed(self, /)
-
Returns a copy of the compressed data accumulated so far, as a rank-1 numpy array of
dtype=np.uint32
.You will typically only want to call this method at the very end of your encoding task, i.e., once you've encoded the entire message. There is usually no need to call this method after encoding each symbol or other portion of your message. The encoders in
constriction
accumulate compressed data in an internal buffer, and encoding (semantically) appends to this buffer.That said, calling
get_compressed
has no side effects, so you can callget_compressed
, then continue to encode more symbols, and then callget_compressed
again. The first call ofget_compressed
will have no effect on the return value of the second call ofget_compressed
.The return value is a rank-1 numpy array of
dtype=np.uint32
. You can write it to a file by callingto_file
on it, but we recommend to convert it into an architecture-independent byte order first:import sys encoder = constriction.stream.queue.RangeEncoder() # ... encode some message (skipped here) ... compressed = encoder.get_compressed() # returns a numpy array. if sys.byteorder != 'little': # Let's save data in little-endian byte order by convention. compressed.byteswap(inplace=True) compressed.tofile('compressed-file.bin') # At a later point, you might want to read and decode the file: compressed = np.fromfile('compressed-file.bin', dtype=np.uint32) if sys.byteorder != 'little': # Restore native byte order before passing it to `constriction`. compressed.byteswap(inplace=True) decoder = constriction.stream.queue.RangeDecoder(compressed) # ... decode the message (skipped here) ...
def get_decoder(self, /)
-
Returns a
RangeDecoder
that is initialized with a copy of the compressed data currently on thisRangeEncoder
.If
encoder
is aRangeEncoder
, thendecoder = encoder.get_decoder()
is equivalent to:
compressed = encoder.get_compressed() decoder = constriction.stream.stack.RangeDecoder(compressed)
Calling
get_decoder
is more efficient since it copies the compressed data only once whereas the longhand version copies the data twice. def is_empty(self, /)
-
Returns
True
iff the coder is in its default initial state.The default initial state is the state returned by the constructor when called without arguments, or the state to which the coder is set when calling
clear
. def num_bits(self, /)
-
Returns the current size of the compressed data, in bits, rounded up to full words.
This is 32 times the result of what
num_words
would return. def num_words(self, /)
-
Returns the current size of the encapsulated compressed data, in
np.uint32
words.Thus, the number returned by this method is the length of the array that you would get if you called
get_compressed
. def pos(self, /)
-
Records a checkpoint to which you can jump during decoding using
seek
.Returns a tuple
(position, state)
whereposition
is an integer that specifies how many 32-bit words of compressed data have been produced so far, andstate
is a tuple of two integers that define theRangeEncoder
's internal state (so that it can be restored uponseek
ing.Note: Don't call
pos
if you just want to find out how much compressed data has been produced so far. Callnum_words
instead.Example
See
seek
.