Skip to content

Commit

Permalink
fix: Add feature to control systemic property of executeCode (#6519)
Browse files Browse the repository at this point in the history
Add markThreadNotSystemic() to support Core+, systemic object tracking
in PQs

---------

Co-authored-by: Charles P. Wright <[email protected]>
  • Loading branch information
abaranec and cpwright authored Jan 8, 2025
1 parent 1e4bb0f commit dd72175
Show file tree
Hide file tree
Showing 12 changed files with 1,662 additions and 1,366 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,23 @@ public static boolean isSystemicThread() {
}

/**
* Marks the current thread as systemically important, this is a permanent change.
* Marks the current thread as systemically important. This can be changed with {@link #markThreadNotSystemic()}
*/
public static void markThreadSystemic() {
if (SYSTEMIC_OBJECT_MARKING_ENABLED) {
SYSTEMIC_CREATION_THREAD.set(true);
}
}

/**
* Marks the current thread as systemically important. This can be changed with {@link #markThreadSystemic()} ()}
*/
public static void markThreadNotSystemic() {
if (SYSTEMIC_OBJECT_MARKING_ENABLED) {
SYSTEMIC_CREATION_THREAD.set(false);
}
}

/**
* Execute the supplier with the thread's systemic importance set to the value of systemicThread.
*
Expand Down
2,510 changes: 1,293 additions & 1,217 deletions go/internal/proto/console/console.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,87 @@ public interface ConsoleSession extends Closeable {
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException;
default Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException {
return executeCode(code, ExecuteCodeOptions.DEFAULT);
}

/**
* Execute the given {@code code} against the script session.
*
* @param code the code
* @param options an {@link ExecuteCodeOptions} to control behavior
* @return the changes
* @throws InterruptedException if the current thread is interrupted
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeCode(String code, ExecuteCodeOptions options)
throws InterruptedException, ExecutionException, TimeoutException;

/**
* Execute the given {@code path path's} code against the script session.
*
* @param path the path to the code
* @return the changes
* @throws InterruptedException if the current thread is interrupted
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
default Changes executeScript(Path path)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
return executeScript(path, ExecuteCodeOptions.DEFAULT);
}

/**
* Execute the given {@code path path's} code against the script session.
*
* @param path the path to the code
* @param options an {@link ExecuteCodeOptions} to control behavior
* @return the changes
* @throws InterruptedException if the current thread is interrupted
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeScript(Path path) throws IOException, InterruptedException, ExecutionException, TimeoutException;
Changes executeScript(Path path, ExecuteCodeOptions options)
throws IOException, InterruptedException, ExecutionException, TimeoutException;

/**
* Execute the given {@code code} against the script session.
*
* @param code the code
* @return the changes future
*/
CompletableFuture<Changes> executeCodeFuture(String code);
default CompletableFuture<Changes> executeCodeFuture(String code) {
return executeCodeFuture(code, ExecuteCodeOptions.DEFAULT);
}

/**
* Execute the given {@code code} against the script session.
*
* @param code the code
* @param options an {@link ExecuteCodeOptions} to control behavior
* @return the changes future
*/
CompletableFuture<Changes> executeCodeFuture(String code, ExecuteCodeOptions options);

/**
* Execute the given {@code path path's} code against the script session.
*
* @param path the path to the code
* @return the changes future
*/
default CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException {
return executeScriptFuture(path, ExecuteCodeOptions.DEFAULT);
}

/**
* Execute the given {@code path path's} code against the script session.
*
* @param path the path to the code
* @param options an {@link ExecuteCodeOptions} to control behavior
* @return the changes future
*/
CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException;
CompletableFuture<Changes> executeScriptFuture(Path path, ExecuteCodeOptions options) throws IOException;

/**
* Closes {@code this} console session.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.client.impl;

import io.deephaven.annotations.BuildableStyle;
import org.immutables.value.Value;

/**
* An object to control the behavior of the {@link ConsoleSession#executeCode(String, ExecuteCodeOptions) executeCode}
* API
*/
@Value.Immutable
@BuildableStyle
public interface ExecuteCodeOptions {
/**
* The default options. See the method javadoc for default values.
*/
ExecuteCodeOptions DEFAULT = ExecuteCodeOptions.builder().build();

enum SystemicType {
ServerDefault, Systemic, NotSystemic
}

/**
* If the code should be executed systemically or not. When code is executed systemically, failures of the script or
* tables in the script are fatal.
*
* <p>
* The default value is {@code null} which uses the system default behavior. See the documentation for
* SystemicObjectTracker for more details.
* </p>
*
* @return if the code should be systemically executed.
*/
@Value.Default
default SystemicType executeSystemic() {
return SystemicType.ServerDefault;
}

/**
* Create a new options builder.
*
* @return a new builder
*/
static Builder builder() {
return ImmutableExecuteCodeOptions.builder();
}

interface Builder {
/**
* Set if the code should be executed systemically or not. A value of {@code null} uses default system behavior.
*
* @param systemicType if the code should be executed systemically.
* @return this {@link Builder}
*/
ExecuteCodeOptions.Builder executeSystemic(SystemicType systemicType);

/**
* Create a new {@link ExecuteCodeOptions} from the state of this builder.
*
* @return a new {@link ExecuteCodeOptions}
*/
ExecuteCodeOptions build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -418,21 +418,30 @@ public Ticket ticket() {
}

@Override
public Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException {
return executeCodeFuture(code).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
public Changes executeCode(String code, ExecuteCodeOptions options)
throws InterruptedException, ExecutionException, TimeoutException {
return executeCodeFuture(code, options).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
}

@Override
public Changes executeScript(Path path)
public Changes executeScript(Path path, ExecuteCodeOptions options)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
return executeScriptFuture(path).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
return executeScriptFuture(path, options).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
}

@Override
public CompletableFuture<Changes> executeCodeFuture(String code) {
final ExecuteCommandRequest request =
ExecuteCommandRequest.newBuilder().setConsoleId(ticket()).setCode(code).build();
return UnaryGrpcFuture.of(request, channel().console()::executeCommand,
public CompletableFuture<Changes> executeCodeFuture(String code, ExecuteCodeOptions options) {
final ExecuteCommandRequest.Builder requestBuilder =
ExecuteCommandRequest.newBuilder().setConsoleId(ticket()).setCode(code);

final ExecuteCodeOptions.SystemicType systemicOption = options.executeSystemic();
if (systemicOption != ExecuteCodeOptions.SystemicType.ServerDefault) {
requestBuilder.setSystemic(systemicOption == ExecuteCodeOptions.SystemicType.Systemic
? ExecuteCommandRequest.SystemicType.EXECUTE_SYSTEMIC
: ExecuteCommandRequest.SystemicType.EXECUTE_NOT_SYSTEMIC);
}

return UnaryGrpcFuture.of(requestBuilder.build(), channel().console()::executeCommand,
response -> {
Changes.Builder builder = Changes.builder().changes(new FieldChanges(response.getChanges()));
if (!response.getErrorMessage().isEmpty()) {
Expand All @@ -443,9 +452,10 @@ public CompletableFuture<Changes> executeCodeFuture(String code) {
}

@Override
public CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException {
public CompletableFuture<Changes> executeScriptFuture(Path path, ExecuteCodeOptions options)
throws IOException {
final String code = String.join(System.lineSeparator(), Files.readAllLines(path, StandardCharsets.UTF_8));
return executeCodeFuture(code);
return executeCodeFuture(code, options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,24 @@ message LogSubscriptionData {
reserved 4;//if we can scope logs to a script session
// Ticket console_id = 4;
}

message ExecuteCommandRequest {
enum SystemicType {
NOT_SET_SYSTEMIC = 0;
EXECUTE_NOT_SYSTEMIC = 1;
EXECUTE_SYSTEMIC = 2;
}

io.deephaven.proto.backplane.grpc.Ticket console_id = 1;
reserved 2;//if script sessions get a ticket, we will use this reserved tag
string code = 3;

// If set to `EXECUTE_SYSTEMIC` the command will be executed systemically. Failures in systemic code
// are treated as important failures and cause errors to be reported to the io.deephaven.engine.table.impl.util.AsyncClientErrorNotifier.
// If this is unset it is treated as `EXECUTE_NOT_SYSTEMIC`
optional SystemicType systemic = 4;
}

message ExecuteCommandResponse {
string error_message = 1;
io.deephaven.proto.backplane.grpc.FieldsChangeUpdate changes = 2;
Expand Down
2 changes: 1 addition & 1 deletion py/client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ tasks.register('updateProtobuf', Sync) {
String randomSuffix = UUID.randomUUID().toString();
deephavenDocker {
envVars.set([
'START_OPTS':'-Xmx512m -DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler'
'START_OPTS':'-Xmx512m -DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler -DSystemicObjectTracker.enabled=true'
])
containerName.set "pydeephaven-test-container-${randomSuffix}"
networkName.set "pydeephaven-network-${randomSuffix}"
Expand Down
252 changes: 127 additions & 125 deletions py/client/deephaven_core/proto/console_pb2.py

Large diffs are not rendered by default.

18 changes: 14 additions & 4 deletions py/client/pydeephaven/_console_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
from typing import Any
from typing import Any, Optional

from pydeephaven.dherror import DHError
from deephaven_core.proto import console_pb2_grpc, console_pb2
Expand Down Expand Up @@ -32,16 +32,26 @@ def start_console(self):
except Exception as e:
raise DHError("failed to start a console.") from e

def run_script(self, server_script: str) -> Any:
"""Runs a Python script in the console."""
def run_script(self, server_script: str, systemic: Optional[bool] = None) -> Any:
"""Runs a Python script in the console.
Args:
server_script (str): The script code to run
systemic (bool): Whether to treat the code as systemically important. Defaults to None which uses the
default system behavior
"""
self.start_console()

try:
systemic_opt = console_pb2.ExecuteCommandRequest.SystemicType.NOT_SET_SYSTEMIC if systemic is None else \
console_pb2.ExecuteCommandRequest.SystemicType.EXECUTE_SYSTEMIC if systemic else \
console_pb2.ExecuteCommandRequest.SystemicType.EXECUTE_NOT_SYSTEMIC

response = self.session.wrap_rpc(
self._grpc_console_stub.ExecuteCommand,
console_pb2.ExecuteCommandRequest(
console_id=self.console_id,
code=server_script))
code=server_script,
systemic=systemic_opt))
return response
except Exception as e:
raise DHError("failed to execute a command in the console.") from e
Expand Down
6 changes: 4 additions & 2 deletions py/client/pydeephaven/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,16 +494,18 @@ def release(self, ticket: ExportTicket) -> None:
self.session_service.release(ticket)

# convenience/factory methods
def run_script(self, script: str) -> None:
def run_script(self, script: str, systemic: Optional[bool] = None) -> None:
"""Runs the supplied Python script on the server.
Args:
script (str): the Python script code
systemic (bool): Whether to treat the code as systemically important. Defaults to None which uses the
default system behavior
Raises:
DHError
"""
response = self.console_service.run_script(script)
response = self.console_service.run_script(script, systemic)
if response.error_message != '':
raise DHError("could not run script: " + response.error_message)

Expand Down
33 changes: 32 additions & 1 deletion py/client/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from pydeephaven import DHError
from pydeephaven import Session
from pydeephaven.ticket import SharedTicket
from pydeephaven.ticket import SharedTicket, ScopeTicket
from tests.testbase import BaseTestCase


Expand Down Expand Up @@ -409,6 +409,37 @@ def _interact_with_server(ti):
for t in threads:
t.join()

def test_systemic_scripts(self):
fields = [pa.field(f"S", pa.bool_())]
schema = pa.schema(fields)

with Session() as session:
# Run the setup script.
session.run_script("""
from deephaven import time_table
import jpy
j_sot = jpy.get_type("io.deephaven.engine.util.systemicmarking.SystemicObjectTracker")
""")
table_script = """
t1 = time_table("PT1S").update("A=ii")
t2 = empty_table(1).update("S = (boolean)j_sot.isSystemic(t1.j_table)")
"""
# Make sure defaults apply (expected false)
session.run_script(table_script)
t = session.fetch_table(ticket=ScopeTicket.scope_ticket("t2"))
pa_table = pa.table([ pa.array([False]) ], schema=schema)
self.assertTrue(pa_table.equals(t.to_arrow()))

session.run_script(table_script, True)
t = session.fetch_table(ticket=ScopeTicket.scope_ticket("t2"))
pa_table = pa.table([ pa.array([True]) ], schema=schema)
self.assertTrue(pa_table.equals(t.to_arrow()))

session.run_script(table_script, False)
t = session.fetch_table(ticket=ScopeTicket.scope_ticket("t2"))
pa_table = pa.table([ pa.array([False]) ], schema=schema)
self.assertTrue(pa_table.equals(t.to_arrow()))

if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit dd72175

Please sign in to comment.