Module constriction.stream.queue

Range Coding: a stream code with queue semantics (i.e., "first in first out") [1, 2].

The Range Coding algorithm is a variation on Arithmetic Coding [1, 3] that runs more efficiently on standard computing hardware.

Example

The following example shows a full round trip that encodes a message, prints its compressed representation, and then decodes the message again. The message is a sequence of 11 integers (referred to as "symbols") and comprised of two parts: the first 7 symbols are encoded with an i.i.d. entropy model, i.e., using the same distribution for each symbol, which happens to be a Categorical distribution; and the remaining 4 symbols are each encoded with a different entropy model, but all of these 4 models are from the same family of QuantizedGaussians, just with different model parameters (means and standard deviations) for each of the 4 symbols.

import constriction
import numpy as np

# Define the two parts of the message and their respective entropy models:
message_part1       = np.array([1, 2, 0, 3, 2, 3, 0], dtype=np.int32)
probabilities_part1 = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float32)
model_part1       = constriction.stream.model.Categorical(probabilities_part1, perfect=False)
# `model_part1` is a categorical distribution over the (implied) alphabet
# {0,1,2,3} with P(X=0) = 0.2, P(X=1) = 0.4, P(X=2) = 0.1, and P(X=3) = 0.3;
# we will use it below to encode each of the 7 symbols in `message_part1`.

message_part2       = np.array([6,   10,   -4,    2  ], dtype=np.int32)
means_part2         = np.array([2.5, 13.1, -1.1, -3.0], dtype=np.float32)
stds_part2          = np.array([4.1,  8.7,  6.2,  5.4], dtype=np.float32)
model_family_part2  = constriction.stream.model.QuantizedGaussian(-100, 100)
# `model_family_part2` is a *family* of Gaussian distributions, quantized to
# bins of width 1 centered at the integers -100, -99, ..., 100. We could
# have provided a fixed mean and standard deviation to the constructor of
# `QuantizedGaussian` but we'll instead provide individual means and standard
# deviations for each symbol when we encode and decode `message_part2` below.

print(f"Original message: {np.concatenate([message_part1, message_part2])}")

# Encode both parts of the message in sequence:
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(message_part1, model_part1)
encoder.encode(message_part2, model_family_part2, means_part2, stds_part2)

# Get and print the compressed representation:
compressed = encoder.get_compressed()
print(f"compressed representation: {compressed}")
print(f"(in binary: {[bin(word) for word in compressed]})")

# You could save `compressed` to a file using `compressed.tofile("filename")`
# and read it back in: `compressed = np.fromfile("filename", dtype=np.uint32).

# Decode the message:
decoder = constriction.stream.queue.RangeDecoder(compressed)
decoded_part1 = decoder.decode(model_part1, 7) # (decodes 7 symbols)
decoded_part2 = decoder.decode(model_family_part2, means_part2, stds_part2)
print(f"Decoded message: {np.concatenate([decoded_part1, decoded_part2])}")
assert np.all(decoded_part1 == message_part1)
assert np.all(decoded_part2 == message_part2)

References

[1] Pasco, Richard Clark. Source coding algorithms for fast data compression. Diss. Stanford University, 1976.

[2] Martin, G. Nigel N. "Range encoding: an algorithm for removing redundancy from a digitised message." Proc. Institution of Electronic and Radio Engineers International Conference on Video and Data Recording. 1979.

[3] Rissanen, Jorma, and Glen G. Langdon. "Arithmetic coding." IBM Journal of research and development 23.2 (1979): 149-162.

Classes

class RangeDecoder (compressed)

A decoder of data that was previously encoded with a RangeEncoder.

The constructor expects a single argument compressed, which has to be a rank-1 numpy array with dtype=np.uint32 that contains the compressed data (as returned by the method get_compressed of a RangeEncoder). The provided compressed data gets copied into an internal buffer of the RangeDecoder.

To decode data with a RangeDecoder, call its method decode one or more times. Each decoding operation consumes some portion of the compressed data from the RangeDecoder's internal buffer.

Example

See module level example.

Methods

def clone(self, /)

Creates a deep copy of the coder and returns it.

The returned copy will initially encapsulate the identical compressed data as the original coder, but the two coders can be used independently without influencing other.

def decode(self, /, model, *optional_amt_or_model_params)

Decodes one or more symbols, consuming them from the encapsulated compressed data.

This method can be called in 3 different ways:

Option 1: decode(model)

Decodes a single symbol with a concrete (i.e., fully parameterized) entropy model and returns the decoded symbol; (for optimal computational efficiency, don't use this option in a loop if you can instead use one of the two alternative options below.)

For example:

# Define a concrete categorical entropy model over the (implied)
# alphabet {0, 1, 2}:
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float32)
model = constriction.stream.model.Categorical(probabilities, perfect=False)

# Decode a single symbol from some example compressed data:
compressed = np.array([3089773345, 1894195597], dtype=np.uint32)
decoder = constriction.stream.queue.RangeDecoder(compressed)
symbol = decoder.decode(model)
print(symbol) # (prints: 2)
# ... then decode some more symbols ...

Option 2: decode(model, amt) [where amt is an integer]

Decodes amt i.i.d. symbols using the same concrete (i.e., fully parametrized) entropy model for each symbol, and returns the decoded symbols as a rank-1 numpy array with dtype=np.int32 and length amt;

For example:

# Use the same concrete entropy model as in the previous example:
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float32)
model = constriction.stream.model.Categorical(probabilities, perfect=False)

# Decode 9 symbols from some example compressed data, using the
# same (fixed) entropy model defined above for all symbols:
compressed = np.array([369323598], dtype=np.uint32)
decoder = constriction.stream.queue.RangeDecoder(compressed)
symbols = decoder.decode(model, 9)
print(symbols) # (prints: [0, 2, 1, 2, 0, 2, 0, 2, 1])

Option 3: decode(model_family, params1, params2, …)

Decodes multiple symbols, using the same family of entropy models (e.g., categorical or quantized Gaussian) for all symbols, but with different model parameters for each symbol, and returns the decoded symbols as a rank-1 numpy array with dtype=np.int32; here, all paramsX arguments are arrays of equal length (the number of symbols to be decoded). The number of required paramsX arguments and their shapes and dtypes depend on the model family.

For example, the QuantizedGaussian model family expects two rank-1 model parameters with a float dtype, which specify the mean and standard deviation for each entropy model:

# Define a generic quantized Gaussian distribution for all integers
# in the range from -100 to 100 (both ends inclusive):
model_family = constriction.stream.model.QuantizedGaussian(-100, 100)

# Specify the model parameters for each symbol:
means = np.array([10.3, -4.7, 20.5], dtype=np.float32)
stds  = np.array([ 5.2, 24.2,  3.1], dtype=np.float32)

# Decode a message from some example compressed data:
compressed = np.array([2655472005], dtype=np.uint32)
decoder = constriction.stream.queue.RangeDecoder(compressed)
symbols = decoder.decode(model_family, means, stds)
print(symbols) # (prints: [12, -13, 25])

By contrast, the Categorical model family expects a single rank-2 model parameter where the i'th row lists the probabilities for each possible value of the i'th symbol:

# Define 2 categorical models over the alphabet {0, 1, 2, 3, 4}:
probabilities = np.array(
    [[0.1, 0.2, 0.3, 0.1, 0.3],  # (for first decoded symbol)
     [0.3, 0.2, 0.2, 0.2, 0.1]], # (for second decoded symbol)
    dtype=np.float32)
model_family = constriction.stream.model.Categorical(perfect=False)

# Decode 2 symbols:
compressed = np.array([2705829535], dtype=np.uint32)
decoder = constriction.stream.queue.RangeDecoder(compressed)
symbols = decoder.decode(model_family, probabilities)
print(symbols) # (prints: [3, 1])
def maybe_exhausted(self, /)

Returns True if all compressed data may have already been decoded and False if there is definitely still some more data available to decode.

A return value of True does not necessarily mean that there is no data left on the decoder because constriction's range coding implementation–by design–cannot detect end- of-stream in all cases. If you need ot be able to decode variable-length messages then you can introduce an "end of stream" sentinel symbol, which you append to all messages before encoding them.

def seek(self, /, position, state)

Jumps to a checkpoint recorded with method pos during encoding.

This allows random-access decoding. The arguments position and state are the two values returned by the RangeEncoder's method pos.

Example

probabilities = np.array([0.2, 0.4, 0.1, 0.3], dtype=np.float32)
model         = constriction.stream.model.Categorical(probabilities, perfect=False)
message_part1 = np.array([1, 2, 0, 3, 2, 3, 0], dtype=np.int32)
message_part2 = np.array([2, 2, 0, 1, 3], dtype=np.int32)

# Encode both parts of the message and record a checkpoint in-between:
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(message_part1, model)
(position, state) = encoder.pos() # Records a checkpoint.
encoder.encode(message_part2, model)

compressed = encoder.get_compressed()
decoder = constriction.stream.queue.RangeDecoder(compressed)

# Decode first symbol:
print(decoder.decode(model)) # (prints: 1)

# Jump to part 2 and decode it:
decoder.seek(position, state)
decoded_part2 = decoder.decode(model, 5)
assert np.all(decoded_part2 == message_part2)
class RangeEncoder

An encoder that uses the range coding algorithm.

To encode data with a RangeEncoder, call its method encode one or more times. A RangeEncoder has an internal buffer of compressed data, and each encode operation appends to this internal buffer. You can copy out the contents of the internal buffer by calling the method get_compressed. This will return a rank-1 numpy array with dtype=np.uint32 that you can pass to the constructor of a RangeDecoder or write to a file for decoding at some later time (see example in the documentation of the method get_compressed).

Example

See module level example.

Methods

def clear(self, /)

Resets the encoder to an empty state.

This removes any existing compressed data on the coder. It is equivalent to replacing the coder with a new one but slightly more efficient.

def clone(self, /)

Creates a deep copy of the coder and returns it.

The returned copy will initially encapsulate the identical compressed data as the original coder, but the two coders can be used independently without influencing other.

def encode(self, /, symbols, model, *optional_model_params)

Encodes one or more symbols, appending them to the encapsulated compressed data.

This method can be called in 3 different ways:

Option 1: encode(symbol, model)

Encodes a single symbol with a concrete (i.e., fully parameterized) entropy model; (for optimal computational efficiency, don't use this option in a loop if you can instead use one of the two alternative options below.)

For example:

# Define a concrete categorical entropy model over the (implied)
# alphabet {0, 1, 2}:
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float32)
model = constriction.stream.model.Categorical(probabilities, perfect=False)

# Encode a single symbol with this entropy model:
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(2, model) # Encodes the symbol `2`.
# ... then encode some more symbols ...

Option 2: encode(symbols, model)

Encodes multiple i.i.d. symbols, i.e., all symbols in the rank-1 array symbols will be encoded with the same concrete (i.e., fully parameterized) entropy model.

For example:

# Use the same concrete entropy model as in the previous example:
probabilities = np.array([0.1, 0.6, 0.3], dtype=np.float32)
model = constriction.stream.model.Categorical(probabilities, perfect=False)

# Encode an example message using the above `model` for all symbols:
symbols = np.array([0, 2, 1, 2, 0, 2, 0, 2, 1], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(symbols, model)
print(encoder.get_compressed()) # (prints: [369323576])

Option 3: encode(symbols, model_family, params1, params2, …)

Encodes multiple symbols, using the same family of entropy models (e.g., categorical or quantized Gaussian) for all symbols, but with different model parameters for each symbol; here, each paramsX argument is an array of the same length as symbols. The number of required paramsX arguments and their shapes and dtypes depend on the model family.

For example, the QuantizedGaussian model family expects two rank-1 model parameters with float dtype, which specify the mean and standard deviation for each entropy model:

# Define a generic quantized Gaussian distribution for all integers
# in the range from -100 to 100 (both ends inclusive):
model_family = constriction.stream.model.QuantizedGaussian(-100, 100)

# Specify the model parameters for each symbol:
means = np.array([10.3, -4.7, 20.5], dtype=np.float32)
stds  = np.array([ 5.2, 24.2,  3.1], dtype=np.float32)

# Encode an example message:
# (needs `len(symbols) == len(means) == len(stds)`)
symbols = np.array([12, -13, 25], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(symbols, model_family, means, stds)
print(encoder.get_compressed()) # (prints: [2655472005])

By contrast, the Categorical model family expects a single rank-2 model parameter where the i'th row lists the probabilities for each possible value of the i'th symbol:

# Define 2 categorical models over the alphabet {0, 1, 2, 3, 4}:
probabilities = np.array(
    [[0.1, 0.2, 0.3, 0.1, 0.3],  # (for first encoded symbol)
     [0.3, 0.2, 0.2, 0.2, 0.1]], # (for second encoded symbol)
    dtype=np.float32)
model_family = constriction.stream.model.Categorical(perfect=False)

# Encode 2 symbols (needs `len(symbols) == probabilities.shape[0]`):
symbols = np.array([3, 1], dtype=np.int32)
encoder = constriction.stream.queue.RangeEncoder()
encoder.encode(symbols, model_family, probabilities)
print(encoder.get_compressed()) # (prints: [2705829510])
def get_compressed(self, /)

Returns a copy of the compressed data accumulated so far, as a rank-1 numpy array of dtype=np.uint32.

You will typically only want to call this method at the very end of your encoding task, i.e., once you've encoded the entire message. There is usually no need to call this method after encoding each symbol or other portion of your message. The encoders in constriction accumulate compressed data in an internal buffer, and encoding (semantically) appends to this buffer.

That said, calling get_compressed has no side effects, so you can call get_compressed, then continue to encode more symbols, and then call get_compressed again. The first call of get_compressed will have no effect on the return value of the second call of get_compressed.

The return value is a rank-1 numpy array of dtype=np.uint32. You can write it to a file by calling to_file on it, but we recommend to convert it into an architecture-independent byte order first:

import sys

encoder = constriction.stream.queue.RangeEncoder()
# ... encode some message (skipped here) ...
compressed = encoder.get_compressed() # returns a numpy array.
if sys.byteorder != 'little':
    # Let's save data in little-endian byte order by convention.
    compressed.byteswap(inplace=True)
compressed.tofile('compressed-file.bin')

# At a later point, you might want to read and decode the file:
compressed = np.fromfile('compressed-file.bin', dtype=np.uint32)
if sys.byteorder != 'little':
    # Restore native byte order before passing it to `constriction`.
    compressed.byteswap(inplace=True)
decoder = constriction.stream.queue.RangeDecoder(compressed)
# ... decode the message (skipped here) ...
def get_decoder(self, /)

Returns a RangeDecoder that is initialized with a copy of the compressed data currently on this RangeEncoder.

If encoder is a RangeEncoder, then

decoder = encoder.get_decoder()

is equivalent to:

compressed = encoder.get_compressed()
decoder = constriction.stream.stack.RangeDecoder(compressed)

Calling get_decoder is more efficient since it copies the compressed data only once whereas the longhand version copies the data twice.

def is_empty(self, /)

Returns True iff the coder is in its default initial state.

The default initial state is the state returned by the constructor when called without arguments, or the state to which the coder is set when calling clear.

def num_bits(self, /)

Returns the current size of the compressed data, in bits, rounded up to full words.

This is 32 times the result of what num_words would return.

def num_words(self, /)

Returns the current size of the encapsulated compressed data, in np.uint32 words.

Thus, the number returned by this method is the length of the array that you would get if you called get_compressed.

def pos(self, /)

Records a checkpoint to which you can jump during decoding using seek.

Returns a tuple (position, state) where position is an integer that specifies how many 32-bit words of compressed data have been produced so far, and state is a tuple of two integers that define the RangeEncoder's internal state (so that it can be restored upon seeking.

Note: Don't call pos if you just want to find out how much compressed data has been produced so far. Call num_words instead.

Example

See seek.