Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
<version>2.0.2</version>
<artifactId>aircompressor-v3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,53 @@
*/
package org.apache.parquet.hadoop.codec;

import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.compress.v3.lz4.Lz4Compressor;
import java.io.IOException;
import java.nio.ByteBuffer;

public class Lz4RawCompressor extends NonBlockedCompressor {

private Lz4Compressor compressor = new Lz4Compressor();
private final Lz4Compressor compressor = Lz4Compressor.create();

/** Reused for direct buffers; lazily allocated and grown when needed. */
private byte[] inputBuf;
/** Reused for direct buffers; lazily allocated and grown when needed. */
private byte[] outputBuf;

@Override
protected int maxCompressedLength(int byteSize) {
return io.airlift.compress.lz4.Lz4RawCompressor.maxCompressedLength(byteSize);
return compressor.maxCompressedLength(byteSize);
}

@Override
protected int compress(ByteBuffer uncompressed, ByteBuffer compressed) throws IOException {
compressor.compress(uncompressed, compressed);
int compressedSize = compressed.position();
compressed.limit(compressedSize);
compressed.rewind();
int startPos = compressed.position();
int inputLen = uncompressed.remaining();
int maxOut = compressed.remaining();

final int compressedSize;
if (uncompressed.hasArray() && compressed.hasArray()) {
int inputOffset = uncompressed.arrayOffset() + uncompressed.position();
int outputOffset = compressed.arrayOffset() + compressed.position();
compressedSize = compressor.compress(
uncompressed.array(), inputOffset, inputLen,
compressed.array(), outputOffset, maxOut);
// Advance positions to match the direct-buffer path (where get/put do this)
uncompressed.position(uncompressed.position() + inputLen);
} else {
if (inputBuf == null || inputBuf.length < inputLen) {
inputBuf = new byte[inputLen];
}
if (outputBuf == null || outputBuf.length < maxOut) {
outputBuf = new byte[maxOut];
}
uncompressed.get(inputBuf, 0, inputLen);
compressedSize = compressor.compress(inputBuf, 0, inputLen, outputBuf, 0, maxOut);
compressed.put(outputBuf, 0, compressedSize);
}

compressed.limit(startPos + compressedSize);
compressed.position(startPos);
return compressedSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
*/
package org.apache.parquet.hadoop.codec;

import io.airlift.compress.lz4.Lz4Decompressor;
import io.airlift.compress.v3.lz4.Lz4Decompressor;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.compress.DirectDecompressor;

public class Lz4RawDecompressor extends NonBlockedDecompressor implements DirectDecompressor {

private Lz4Decompressor decompressor = new Lz4Decompressor();
private final Lz4Decompressor decompressor = Lz4Decompressor.create();

/** Reused for direct buffers; lazily allocated and grown when needed. */
private byte[] inputBuf;
/** Reused for direct buffers; lazily allocated and grown when needed. */
private byte[] outputBuf;

@Override
protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLength) throws IOException {
Expand All @@ -36,10 +41,34 @@ protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLe

@Override
protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException {
decompressor.decompress(compressed, uncompressed);
int uncompressedSize = uncompressed.position();
uncompressed.limit(uncompressedSize);
uncompressed.rewind();
int startPos = uncompressed.position();
int compressedLen = compressed.remaining();
int maxOut = uncompressed.remaining();

final int uncompressedSize;
if (compressed.hasArray() && uncompressed.hasArray()) {
int inputOffset = compressed.arrayOffset() + compressed.position();
int outputOffset = uncompressed.arrayOffset() + uncompressed.position();
uncompressedSize = decompressor.decompress(
compressed.array(), inputOffset, compressedLen,
uncompressed.array(), outputOffset, maxOut);
// Advance positions to match the direct-buffer path (where get/put do this)
compressed.position(compressed.position() + compressedLen);
} else {
if (inputBuf == null || inputBuf.length < compressedLen) {
inputBuf = new byte[compressedLen];
}
if (outputBuf == null || outputBuf.length < maxOut) {
outputBuf = new byte[maxOut];
}
compressed.get(inputBuf, 0, compressedLen);
uncompressedSize = decompressor.decompress(
inputBuf, 0, compressedLen, outputBuf, 0, maxOut);
uncompressed.put(outputBuf, 0, uncompressedSize);
}

uncompressed.limit(startPos + uncompressedSize);
uncompressed.position(startPos);
return uncompressedSize;
}

Expand Down