DATAMONGO-2393 - Polishing.
Extract read requests into inner class. Original Pull Request: #799
This commit is contained in:
committed by
Christoph Strobl
parent
6cb246c18a
commit
ec3ccc004e
@@ -27,9 +27,7 @@ import reactor.util.context.Context;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.reactivestreams.Subscription;
|
||||
@@ -74,8 +72,7 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
private volatile boolean cancelled;
|
||||
private volatile boolean allDataBuffersReceived;
|
||||
private volatile Throwable error;
|
||||
private final Queue<BiConsumer<DataBuffer, Integer>> readRequests = Queues.<BiConsumer<DataBuffer, Integer>> small()
|
||||
.get();
|
||||
private final Queue<ReadRequest> readRequests = Queues.<ReadRequest> small().get();
|
||||
|
||||
private final Queue<DataBuffer> bufferQueue = Queues.<DataBuffer> small().get();
|
||||
|
||||
@@ -94,52 +91,7 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
|
||||
return Flux.create(sink -> {
|
||||
|
||||
AtomicLong written = new AtomicLong();
|
||||
readRequests.offer((db, bytecount) -> {
|
||||
|
||||
try {
|
||||
|
||||
if (error != null) {
|
||||
onError(sink, error);
|
||||
return;
|
||||
}
|
||||
|
||||
if (bytecount == -1) {
|
||||
|
||||
onComplete(sink, written.get() > 0 ? written.intValue() : -1);
|
||||
return;
|
||||
}
|
||||
|
||||
ByteBuffer byteBuffer = db.asByteBuffer();
|
||||
int remaining = byteBuffer.remaining();
|
||||
int writeCapacity = Math.min(dst.remaining(), remaining);
|
||||
int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity());
|
||||
int toWrite = limit - byteBuffer.position();
|
||||
|
||||
if (toWrite == 0) {
|
||||
|
||||
onComplete(sink, written.intValue());
|
||||
return;
|
||||
}
|
||||
|
||||
int oldPosition = byteBuffer.position();
|
||||
|
||||
byteBuffer.limit(toWrite);
|
||||
dst.put(byteBuffer);
|
||||
byteBuffer.limit(byteBuffer.capacity());
|
||||
byteBuffer.position(oldPosition);
|
||||
db.readPosition(db.readPosition() + toWrite);
|
||||
written.addAndGet(toWrite);
|
||||
|
||||
} catch (Exception e) {
|
||||
onError(sink, e);
|
||||
} finally {
|
||||
|
||||
if (db != null && db.readableByteCount() == 0) {
|
||||
DataBufferUtils.release(db);
|
||||
}
|
||||
}
|
||||
});
|
||||
readRequests.offer(new ReadRequest(sink, dst));
|
||||
|
||||
sink.onCancel(this::terminatePendingReads);
|
||||
sink.onDispose(this::terminatePendingReads);
|
||||
@@ -243,12 +195,12 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
continue;
|
||||
}
|
||||
|
||||
BiConsumer<DataBuffer, Integer> consumer = AsyncInputStreamAdapter.this.readRequests.peek();
|
||||
ReadRequest consumer = AsyncInputStreamAdapter.this.readRequests.peek();
|
||||
if (consumer == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
consumer.accept(wip, wip.readableByteCount());
|
||||
consumer.transferBytes(wip, wip.readableByteCount());
|
||||
}
|
||||
|
||||
if (bufferQueue.isEmpty()) {
|
||||
@@ -269,10 +221,10 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
*/
|
||||
void terminatePendingReads() {
|
||||
|
||||
BiConsumer<DataBuffer, Integer> readers;
|
||||
ReadRequest readers;
|
||||
|
||||
while ((readers = readRequests.poll()) != null) {
|
||||
readers.accept(null, -1);
|
||||
readers.onComplete();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -299,7 +251,7 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
return;
|
||||
}
|
||||
|
||||
BiConsumer<DataBuffer, Integer> readRequest = AsyncInputStreamAdapter.this.readRequests.peek();
|
||||
ReadRequest readRequest = AsyncInputStreamAdapter.this.readRequests.peek();
|
||||
|
||||
if (readRequest == null) {
|
||||
|
||||
@@ -336,4 +288,76 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Request to read bytes and transfer these to the associated {@link ByteBuffer}.
|
||||
*/
|
||||
class ReadRequest {
|
||||
|
||||
private final FluxSink<Integer> sink;
|
||||
private final ByteBuffer dst;
|
||||
|
||||
private int writtenBytes;
|
||||
|
||||
ReadRequest(FluxSink<Integer> sink, ByteBuffer dst) {
|
||||
this.sink = sink;
|
||||
this.dst = dst;
|
||||
this.writtenBytes = -1;
|
||||
}
|
||||
|
||||
public void onComplete() {
|
||||
|
||||
if (error != null) {
|
||||
AsyncInputStreamAdapter.this.onError(sink, error);
|
||||
return;
|
||||
}
|
||||
|
||||
AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes);
|
||||
}
|
||||
|
||||
public void transferBytes(DataBuffer db, int bytes) {
|
||||
|
||||
try {
|
||||
|
||||
if (error != null) {
|
||||
AsyncInputStreamAdapter.this.onError(sink, error);
|
||||
return;
|
||||
}
|
||||
|
||||
ByteBuffer byteBuffer = db.asByteBuffer();
|
||||
int remaining = byteBuffer.remaining();
|
||||
int writeCapacity = Math.min(dst.remaining(), remaining);
|
||||
int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity());
|
||||
int toWrite = limit - byteBuffer.position();
|
||||
|
||||
if (toWrite == 0) {
|
||||
|
||||
AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes);
|
||||
return;
|
||||
}
|
||||
|
||||
int oldPosition = byteBuffer.position();
|
||||
|
||||
byteBuffer.limit(toWrite);
|
||||
dst.put(byteBuffer);
|
||||
byteBuffer.limit(byteBuffer.capacity());
|
||||
byteBuffer.position(oldPosition);
|
||||
db.readPosition(db.readPosition() + toWrite);
|
||||
|
||||
if (writtenBytes == -1) {
|
||||
writtenBytes = bytes;
|
||||
} else {
|
||||
writtenBytes += bytes;
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
AsyncInputStreamAdapter.this.onError(sink, e);
|
||||
} finally {
|
||||
|
||||
if (db.readableByteCount() == 0) {
|
||||
DataBufferUtils.release(db);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user