1つで表す必要のあるヘッダーとデータがありByte Array
ます。また、ヘッダーをにパックするための特定の形式とByte Array
、データをにパックするための別の形式がありますByte Array
。これら2つを入手したらByte Array
、それから1つのファイナルを作成する必要があります。
以下はで定義されているレイアウトでありC++
、それに応じてで行う必要がありJava
ます。
// below is my header offsets layout
// addressedCenter must be the first byte
static constexpr uint32_t addressedCenter = 0;
static constexpr uint32_t version = addressedCenter + 1;
static constexpr uint32_t numberOfRecords = version + 1;
static constexpr uint32_t bufferUsed = numberOfRecords + sizeof(uint32_t);
static constexpr uint32_t location = bufferUsed + sizeof(uint32_t);
static constexpr uint32_t locationFrom = location + sizeof(CustomerAddress);
static constexpr uint32_t locationOrigin = locationFrom + sizeof(CustomerAddress);
static constexpr uint32_t partition = locationOrigin + sizeof(CustomerAddress);
static constexpr uint32_t copy = partition + 1;
// this is the full size of the header
static constexpr uint32_t headerOffset = copy + 1;
そして、CustomerAddress
はtypedefでuint64_t
あり、次のように構成されています-
typedef uint64_t CustomerAddress;
void client_data(uint8_t datacenter,
uint16_t clientId,
uint8_t dataId,
uint32_t dataCounter,
CustomerAddress& customerAddress)
{
customerAddress = (uint64_t(datacenter) << 56)
+ (uint64_t(clientId) << 40)
+ (uint64_t(dataId) << 32)
+ dataCounter;
}
そして以下は私のデータレイアウトです-
// below is my data layout -
//
// key type - 1 byte
// key len - 1 byte
// key (variable size = key_len)
// timestamp (sizeof uint64_t)
// data size (sizeof uint16_t)
// data (variable size = data size)
問題文:-
プロジェクトの一部として、Javaの特定のクラスで全体的なものを表現しようとしています。これにより、必要なフィールドを渡すだけでByte Array
、最初にヘッダー、次にデータを持つ最終的なものを作成できます。
以下は私のDataFrame
クラスです:
public final class DataFrame {
private final byte addressedCenter;
private final byte version;
private final Map<byte[], byte[]> keyDataHolder;
private final long location;
private final long locationFrom;
private final long locationOrigin;
private final byte partition;
private final byte copy;
public DataFrame(byte addressedCenter, byte version,
Map<byte[], byte[]> keyDataHolder, long location, long locationFrom,
long locationOrigin, byte partition, byte copy) {
this.addressedCenter = addressedCenter;
this.version = version;
this.keyDataHolder = keyDataHolder;
this.location = location;
this.locationFrom = locationFrom;
this.locationOrigin = locationOrigin;
this.partition = partition;
this.copy = copy;
}
public byte[] serialize() {
// All of the data is embedded in a binary array with fixed maximum size 70000
ByteBuffer byteBuffer = ByteBuffer.allocate(70000);
byteBuffer.order(ByteOrder.BIG_ENDIAN);
int numOfRecords = keyDataHolder.size();
int bufferUsed = getBufferUsed(keyDataHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;
// header layout
byteBuffer.put(addressedCenter); // byte
byteBuffer.put(version); // byte
byteBuffer.putInt(numOfRecords); // int
byteBuffer.putInt(bufferUsed); // int
byteBuffer.putLong(location); // long
byteBuffer.putLong(locationFrom); // long
byteBuffer.putLong(locationOrigin); // long
byteBuffer.put(partition); // byte
byteBuffer.put(copy); // byte
// now the data layout
for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) {
byte keyType = 0;
byte keyLength = (byte) entry.getKey().length;
byte[] key = entry.getKey();
byte[] data = entry.getValue();
short dataSize = (short) data.length;
ByteBuffer dataBuffer = ByteBuffer.wrap(data);
long timestamp = 0;
if (dataSize > 10) {
timestamp = dataBuffer.getLong(2);
}
byteBuffer.put(keyType);
byteBuffer.put(keyLength);
byteBuffer.put(key);
byteBuffer.putLong(timestamp);
byteBuffer.putShort(dataSize);
byteBuffer.put(data);
}
return byteBuffer.array();
}
private int getBufferUsed(final Map<byte[], byte[]> keyDataHolder) {
int size = 36;
for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) {
size += 1 + 1 + 8 + 2;
size += entry.getKey().length;
size += entry.getValue().length;
}
return size;
}
}
そして、以下は私が上記のDataFrame
クラスをどのように使用しているかです:
public static void main(String[] args) throws IOException {
// header layout
byte addressedCenter = 0;
byte version = 1;
long location = packCustomerAddress((byte) 12, (short) 13, (byte) 32, (int) 120);
long locationFrom = packCustomerAddress((byte) 21, (short) 23, (byte) 41, (int) 130);
long locationOrigin = packCustomerAddress((byte) 21, (short) 24, (byte) 41, (int) 140);
byte partition = 3;
byte copy = 0;
// this map will have key as the actual key and value as the actual data, both in byte array
// for now I am storing only two entries in this map
Map<byte[], byte[]> keyDataHolder = new HashMap<byte[], byte[]>();
for (int i = 1; i <= 2; i++) {
keyDataHolder.put(generateKey(), getMyData());
}
DataFrame records =
new DataFrame(addressedCenter, version, keyDataHolder, location, locationFrom,
locationOrigin, partition, copy);
// this will give me final packed byte array
// which will have header and data in it.
byte[] packedArray = records.serialize();
}
private static long packCustomerAddress(byte datacenter, short clientId, byte dataId,
int dataCounter) {
return ((long) (datacenter) << 56) | ((long) clientId << 40) | ((long) dataId << 32)
| ((long) dataCounter);
}
私のDataFrame
クラスでわかるように、ByteBuffer
事前定義されたサイズの70000
。を割り当てています。ByteBuffer
ハードコードされたものを使用する代わりに、作成中に使用しているサイズを割り当てることができるより良い方法はあり70000
ますか?
また、ヘッダーとデータを1バイト配列にパックする私が行っていることと比較してより良い方法はありますか?また、複数のスレッドから呼び出すことができるため、スレッドセーフであることを確認する必要があります。
ByteBuffer
ハードコードされたものを使用する代わりに、作成中に使用しているサイズを割り当てることができるより良い方法はあり70000
ますか?
少なくとも2つの重複しないアプローチがあります。両方を使用できます。
1つはバッファプーリングです。ピーク時に必要なバッファーの数を確認し、それを超える最大値を使用する必要があります。たとえば、最大+最大/ 2、最大+平均、最大+モード、2 *最大。
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;
import java.util.function.Function;
public class ByteBufferPool {
private final int bufferCapacity;
private final LinkedBlockingDeque<ByteBuffer> queue;
public ByteBufferPool(int limit, int bufferCapacity) {
if (limit < 0) throw new IllegalArgumentException("limit must not be negative.");
if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative.");
this.bufferCapacity = bufferCapacity;
this.queue = (limit == 0) ? null : new LinkedBlockingDeque<>(limit);
}
public ByteBuffer acquire() {
ByteBuffer buffer = (queue == null) ? null : queue.pollFirst();
if (buffer == null) {
buffer = ByteBuffer.allocate(bufferCapacity);
}
else {
buffer.clear();
buffer.order(ByteOrder.BIG_ENDIAN);
}
return buffer;
}
public boolean release(ByteBuffer buffer) {
if (buffer == null) throw new IllegalArgumentException("buffer must not be null.");
if (buffer.capacity() != bufferCapacity) throw new IllegalArgumentException("buffer has unsupported capacity.");
if (buffer.isDirect()) throw new IllegalArgumentException("buffer must not be direct.");
if (buffer.isReadOnly()) throw new IllegalArgumentException("buffer must not be read-only.");
return (queue == null) ? false : queue.offerFirst(buffer);
}
public void withBuffer(Consumer<ByteBuffer> action) {
if (action == null) throw new IllegalArgumentException("action must not be null.");
ByteBuffer buffer = acquire();
try {
action.accept(buffer);
}
finally {
release(buffer);
}
}
public <T> T withBuffer(Function<ByteBuffer, T> function) {
if (function == null) throw new IllegalArgumentException("function must not be null.");
ByteBuffer buffer = acquire();
try {
return function.apply(buffer);
}
finally {
release(buffer);
}
}
public <T> CompletionStage<T> withBufferAsync(Function<ByteBuffer, CompletionStage<T>> asyncFunction) {
if (asyncFunction == null) throw new IllegalArgumentException("asyncFunction must not be null.");
ByteBuffer buffer = acquire();
CompletionStage<T> future = null;
try {
future = asyncFunction.apply(buffer);
}
finally {
if (future == null) {
release(buffer);
}
else {
future = future.whenComplete((result, throwable) -> release(buffer));
}
}
return future;
}
}
このwithBuffer
方法では、プールを簡単に使用でき、acquire
とrelease
は取得ポイントと解放ポイントを分離できます。
もう一つは、直列化インタフェース、例えば分離されput
、putInt
そしてputLong
あなたは、その後、バイトカウントクラスと実際のバイトのバッファリングクラスを実装することができ、。不要なバイト生成を回避するために、シリアライザーがバイトをカウントしているかバッファリングしているかを知るためのメソッドと、実際にシリアル化せずに一部のエンコーディングで文字列のサイズを計算するときに役立つ、バイト使用量を直接インクリメントする別のメソッドをこのようなインターフェイスに追加する必要があります。
public interface ByteSerializer {
ByteSerializer put(byte value);
ByteSerializer putInt(int value);
ByteSerializer putLong(long value);
boolean isSerializing();
ByteSerializer add(int bytes);
int position();
}
public class ByteCountSerializer implements ByteSerializer {
private int count = 0;
@Override
public ByteSerializer put(byte value) {
count += 1;
return this;
}
@Override
public ByteSerializer putInt(int value) {
count += 4;
return this;
}
@Override
public ByteSerializer putLong(long value) {
count += 8;
return this;
}
@Override
public boolean isSerializing() {
return false;
}
@Override
public ByteSerializer add(int bytes) {
if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative.");
count += bytes;
return this;
}
@Override
public int position() {
return count;
}
}
import java.nio.ByteBuffer;
public class ByteBufferSerializer implements ByteSerializer {
private final ByteBuffer buffer;
public ByteBufferSerializer(int bufferCapacity) {
if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative.");
this.buffer = ByteBuffer.allocate(bufferCapacity);
}
@Override
public ByteSerializer put(byte value) {
buffer.put(value);
return this;
}
@Override
public ByteSerializer putInt(int value) {
buffer.putInt(value);
return this;
}
@Override
public ByteSerializer putLong(long value) {
buffer.putLong(value);
return this;
}
@Override
public boolean isSerializing() {
return true;
}
@Override
public ByteSerializer add(int bytes) {
if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative.");
for (int b = 0; b < bytes; b++) {
buffer.put((byte)0);
}
return this;
// or throw new UnsupportedOperationException();
}
@Override
public int position() {
return buffer.position();
}
public ByteBuffer buffer() {
return buffer;
}
}
あなたのコードでは、これらの線に沿って何かをします(テストされていません):
ByteCountSerializer counter = new ByteCountSerializer();
dataFrame.serialize(counter);
ByteBufferSerializer serializer = new ByteByfferSerializer(counter.position());
dataFrame.serialize(serializer);
ByteBuffer buffer = serializer.buffer();
// ... write buffer, ?, profit ...
あなたのDataFrame.serialize
方法は受け入れるようにリファクタリングする必要がありByteSerializer
、それがデータを生成する場合には、それがチェックする必要がありisSerializing
、それが唯一の大きさや、実際に書き込みバイトを計算する必要がありますかどうかを知るために。
両方のアプローチを組み合わせることは、主にあなたがそれをどのように行うかによって大きく異なるため、演習として残しておきます。
たとえばByteBufferSerializer
、プールを直接使用して任意の容量(70000など)を維持したり、容量ごとにプールしたりできますByteBuffer
(ただし、必要な容量の代わりに、必要な容量より2大きい最小電力を使用して、acquire
)から戻る前のバッファの制限、またはメソッドByteBufferSerializer
を追加する限り、を直接プールすることができますreset()
。
また、ヘッダーとデータを1バイト配列にパックする私が行っていることと比較してより良い方法はありますか?
はい。特定のメソッドがバイト配列を返す代わりに、バイトバッファリングインスタンスを渡します。バイト配列は、長さがチェックされた後、または内容がコピーされた直後に破棄されます。
また、複数のスレッドから呼び出すことができるため、スレッドセーフであることを確認する必要があります。
各バッファが1つのスレッドのみで使用され、適切に同期されている限り、心配する必要はありません。
適切な同期とは、プールマネージャーのメソッドで取得および解放のセマンティクスがあり、バッファーをプールからフェッチしてからプールに戻すまでの間に複数のスレッドでバッファーが使用されている場合、バッファーの使用を停止する解放のセマンティクスをスレッドに追加し、バッファの使用を開始するスレッドに取得セマンティクスを追加します。たとえば、バッファをCompletableFuture
sに渡す場合、これについて心配する必要はありません。Exchanger
または、またはの適切な実装を使用してスレッド間で明示的に通信している場合ですBlockingQueue
。
以下からjava.util.concurrent
のパッケージの説明:
内のすべてのクラス
java.util.concurrent
とそのサブパッケージのメソッドは、これらの保証をより高いレベルの同期に拡張します。特に:
オブジェクトを並行コレクションに配置する前のスレッド内のアクションは、別のスレッドのコレクションからその要素にアクセスまたは削除した後のアクションの前に発生します。
前の提出にスレッド内のアクション
Runnable
にExecutor
起こる-前にその実行が開始されます。同様ににCallables
提出するためExecutorService
。別のスレッドを介して結果を取得した後の
Future
発生前アクションによって表される非同期計算によって実行されるアクションFuture.get()
。以前のような「解放する」シンクロナイザ法にアクション
Lock.unlock
、Semaphore.release
およびCountDownLatch.countDown
起こり、前のような方法を「取得」の成功への後続のアクションLock.lock
、Semaphore.acquire
、Condition.await
、とCountDownLatch.await
別のスレッドで同じシンクロナイザーオブジェクト上。を介してオブジェクトを正常に交換するスレッドの各ペアについて、各スレッドの
Exchanger
前のアクションが発生します-別のスレッドの対応する後のアクションの前に。exchange()
exchange()
呼び出し前のアクション
CyclicBarrier.await
およびPhaser.awaitAdvance
(およびそのバリアント)が発生する-バリアアクションによって実行されるアクションの前、およびバリアアクションによって実行されるアクションが発生する-await
他のスレッドの対応するものから正常に戻った後のアクション。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加