Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Simplify Barrage Viewport Table Updates #6347

Merged
merged 38 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
1852b0a
Restore initial commit from grpc-java, plus a few local changes
niloc132 Oct 21, 2024
68b925a
Guard writing payload as hex if FINEST is enabled
niloc132 Oct 21, 2024
e88c47e
Apply upstream "Fix AsyncServletOutputStreamWriterConcurrencyTest
niloc132 Oct 21, 2024
f9a19fc
Apply upstream "Avoid flushing headers when the server returns a single
niloc132 Oct 21, 2024
4733524
Apply upstream "servlet: introduce ServletServerBuilder.buildServlet()"
niloc132 Oct 21, 2024
06e63ec
Bump grpc vers, add inprocess dep for tests
niloc132 Oct 21, 2024
09ade64
Merge branch 'main' into grpc-history-replay
niloc132 Oct 28, 2024
c8af47c
Apply https://github.com/deephaven/deephaven-core/pull/6301
niloc132 Oct 28, 2024
57c8008
Bump to 1.65.1 to better match arrow 18
niloc132 Nov 1, 2024
cbf8ab2
Merge remote-tracking branch 'colin/grpc-history-replay' into vp_simp…
nbauernfeind Nov 6, 2024
85f604f
Version Upgrades; MavenLocal
nbauernfeind Nov 6, 2024
70a0207
Implement Simplified Viewport Table Updates in BMP/BT
nbauernfeind Nov 8, 2024
0089d62
Ryan's Synchronous Review
nbauernfeind Nov 9, 2024
485746d
Merge remote-tracking branch 'upstream/main' into vp_simplification
nbauernfeind Nov 9, 2024
ad8de73
Remove SNAPSHOT version and mavenLocal references
nbauernfeind Nov 11, 2024
02ce2ad
Fixes removed/added rows in most VP cases
nbauernfeind Nov 12, 2024
da23e2b
Bug fixes around viewport snapshot rowsRemoved and rowsAdded
nbauernfeind Nov 12, 2024
299f56e
Bugfix for correct growing VP logic
nbauernfeind Nov 12, 2024
9d6f389
remaining java side fixes
nbauernfeind Nov 13, 2024
fd5aced
Ryan's feedback on javaserver/client impls
nbauernfeind Nov 14, 2024
53b1eed
Inline Feedback from VC w/Ryan
nbauernfeind Nov 14, 2024
6e7fe94
Do not propagate modifies for any repainted rows
nbauernfeind Nov 14, 2024
d568eb7
Minor cleanup from personal review
nbauernfeind Nov 14, 2024
6653ca6
Ryan's feedback latest round.
nbauernfeind Nov 14, 2024
44cdf93
jsAPI mostly complete; looking for tree table issue
nbauernfeind Nov 15, 2024
d549d79
Fixes for jsapi and HierarchicalTable
nbauernfeind Nov 15, 2024
b4d5b69
Lazily compute rowset encoding
nbauernfeind Nov 15, 2024
6c12314
Fixup jsapi tests
nbauernfeind Nov 15, 2024
f9be6e5
Quick round feedback
nbauernfeind Nov 15, 2024
4252622
spotless
nbauernfeind Nov 15, 2024
2767def
Double checked locking fixes
nbauernfeind Nov 15, 2024
78c4cb7
Ryan's final review
nbauernfeind Nov 15, 2024
ea6f898
Clarify strategy on who owns RowSets passed into getSubView
nbauernfeind Nov 15, 2024
3eeb628
npe fix
nbauernfeind Nov 15, 2024
84a6100
Bugfix if HT is empty or viewport past end of table
nbauernfeind Nov 16, 2024
476ae65
Colin's feedback
nbauernfeind Nov 16, 2024
738cb11
Limit jsapi data change event to prev and curr table sizes
nbauernfeind Nov 16, 2024
44fadff
Colin's Final Feedback
nbauernfeind Nov 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions buildSrc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ java {
}

repositories {
mavenLocal()
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
mavenCentral()
maven {
url "https://plugins.gradle.org/m2/"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
repositories {
mavenCentral()
mavenLocal()
maven {
url 'https://oss.sonatype.org/content/repositories/snapshots'
content {
includeGroup 'com.vertispan.flatbuffers'
}
}
maven {
url 'https://jitpack.io'
content {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.table.impl.sources.RedirectedColumnSource;
import io.deephaven.engine.table.impl.util.*;
import org.apache.commons.lang3.mutable.MutableObject;
import org.jetbrains.annotations.NotNull;

import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -98,6 +99,34 @@ private void onUpdate(final TableUpdate upstream) {
try (final RowSet prevRowSet = rowSet.copyPrev()) {
downstream.removed = prevRowSet.invert(upstream.removed());
}

if (newSize < prevSize) {
resultTable.getRowSet().writableCast().removeRange(newSize, prevSize - 1);
} else if (newSize > prevSize) {
resultTable.getRowSet().writableCast().insertRange(prevSize, newSize - 1);
}

downstream.shifted = computeFlattenedRowSetShiftData(downstream.removed(), downstream.added(), prevSize);
prevSize = newSize;
resultTable.notifyListeners(downstream);
}

/**
* Compute the shift data for a flattened row set given which rows were removed and which were added.
*
* @param removed the rows that were removed in the flattened pre-update key-space
* @param added the rows that were added in the flattened post-update key-space
* @param prevSize the size of the table before the update
* @return the shift data
*/
public static RowSetShiftData computeFlattenedRowSetShiftData(
@NotNull final RowSet removed,
@NotNull final RowSet added,
final long prevSize) {
if (removed.isEmpty() && added.isEmpty()) {
return RowSetShiftData.EMPTY;
}

final RowSetShiftData.Builder outShifted = new RowSetShiftData.Builder();

// Helper to ensure that we can prime iterators and still detect the end.
Expand All @@ -110,8 +139,8 @@ private void onUpdate(final TableUpdate upstream) {
};

// Create our range iterators and prime them.
final MutableObject<RowSet.RangeIterator> rmIt = new MutableObject<>(downstream.removed().rangeIterator());
final MutableObject<RowSet.RangeIterator> addIt = new MutableObject<>(downstream.added().rangeIterator());
final MutableObject<RowSet.RangeIterator> rmIt = new MutableObject<>(removed.rangeIterator());
final MutableObject<RowSet.RangeIterator> addIt = new MutableObject<>(added.rangeIterator());
updateIt.accept(rmIt);
updateIt.accept(addIt);

Expand Down Expand Up @@ -163,14 +192,6 @@ private void onUpdate(final TableUpdate upstream) {
outShifted.shiftRange(currMarker, prevSize - 1, currDelta);
}

if (newSize < prevSize) {
resultTable.getRowSet().writableCast().removeRange(newSize, prevSize - 1);
} else if (newSize > prevSize) {
resultTable.getRowSet().writableCast().insertRange(prevSize, newSize - 1);
}

downstream.shifted = outShifted.build();
prevSize = newSize;
resultTable.notifyListeners(downstream);
return outShifted.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.annotations.BuildableStyle;
import io.deephaven.barrage.flatbuf.BarrageSnapshotRequest;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.util.annotations.FinalDefault;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;

Expand All @@ -21,12 +22,11 @@ public static BarrageSnapshotOptions of(final io.deephaven.barrage.flatbuf.Barra
if (options == null) {
return builder().build();
}
final byte mode = options.columnConversionMode();
return builder()
.useDeephavenNulls(options.useDeephavenNulls())
.columnConversionMode(ColumnConversionMode.conversionModeFbToEnum(mode))
.batchSize(options.batchSize())
.maxMessageSize(options.maxMessageSize())
.previewListLengthLimit(options.previewListLengthLimit())
.build();
}

Expand Down Expand Up @@ -65,27 +65,37 @@ public int maxMessageSize() {

@Override
@Default
public ColumnConversionMode columnConversionMode() {
return ColumnConversionMode.Stringify;
public int previewListLengthLimit() {
return 0;
}

public int appendTo(FlatBufferBuilder builder) {
return io.deephaven.barrage.flatbuf.BarrageSnapshotOptions.createBarrageSnapshotOptions(
builder, ColumnConversionMode.conversionModeEnumToFb(columnConversionMode()), useDeephavenNulls(),
builder, useDeephavenNulls(),
batchSize(),
maxMessageSize());
maxMessageSize(),
previewListLengthLimit());
}

public interface Builder {

Builder useDeephavenNulls(boolean useDeephavenNulls);

Builder columnConversionMode(ColumnConversionMode columnConversionMode);
/**
* Deprecated since 0.37.0 and is marked for removal. (our GWT artifacts do not yet support the attributes)
*/
@FinalDefault
@Deprecated
default Builder columnConversionMode(ColumnConversionMode columnConversionMode) {
return this;
}

Builder batchSize(int batchSize);

Builder maxMessageSize(int messageSize);

Builder previewListLengthLimit(int previewListLengthLimit);
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved

BarrageSnapshotOptions build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ BarrageStreamGenerator newGenerator(
* Obtain a Full-Subscription View of this StreamGenerator that can be sent to a single subscriber.
*
* @param options serialization options for this specific view
* @param isInitialSnapshot indicates whether or not this is the first snapshot for the listener
* @param isInitialSnapshot indicates whether this is the first snapshot for the listener
* @return a MessageView filtered by the subscription properties that can be sent to that subscriber
*/
MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot);
Expand All @@ -68,15 +68,24 @@ BarrageStreamGenerator newGenerator(
* Obtain a View of this StreamGenerator that can be sent to a single subscriber.
*
* @param options serialization options for this specific view
* @param isInitialSnapshot indicates whether or not this is the first snapshot for the listener
* @param isInitialSnapshot indicates whether this is the first snapshot for the listener
* @param isFullSubscription whether this is a full subscription (possibly a growing viewport)
* @param viewport is the position-space viewport
* @param reverseViewport is the viewport reversed (relative to end of table instead of beginning)
* @param keyspaceViewportPrev is the key-space viewport in prior to applying the update
* @param keyspaceViewport is the key-space viewport
* @param subscribedColumns are the columns subscribed for this view
* @return a MessageView filtered by the subscription properties that can be sent to that subscriber
*/
MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot, @Nullable RowSet viewport,
boolean reverseViewport, @Nullable RowSet keyspaceViewport, BitSet subscribedColumns);
MessageView getSubView(
BarrageSubscriptionOptions options,
boolean isInitialSnapshot,
boolean isFullSubscription,
@Nullable RowSet viewport,
boolean reverseViewport,
@Nullable RowSet keyspaceViewportPrev,
@Nullable RowSet keyspaceViewport,
BitSet subscribedColumns);

/**
* Obtain a Full-Snapshot View of this StreamGenerator that can be sent to a single requestor.
Expand Down
Loading
Loading