From 6987297e78f623fd8f5b37d22f759bc7af21832a Mon Sep 17 00:00:00 2001 From: RolfHeG <15918921@qq.com> Date: Wed, 14 Jul 2021 22:04:14 +0800 Subject: [PATCH] =?UTF-8?q?#755=20ExtensionDefault=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E8=B0=83=E6=95=B4=E3=80=81MsgHolder=E5=A2=9E=E5=8A=A0partition?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../SaturnExecutorExtensionDefault.java | 6 ++--- .../com/vip/saturn/job/msg/MsgHolder.java | 22 +++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/saturn-core/src/main/java/com/vip/saturn/job/executor/SaturnExecutorExtensionDefault.java b/saturn-core/src/main/java/com/vip/saturn/job/executor/SaturnExecutorExtensionDefault.java index 4035ad4c8..f4124479d 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/executor/SaturnExecutorExtensionDefault.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/executor/SaturnExecutorExtensionDefault.java @@ -38,7 +38,7 @@ public class SaturnExecutorExtensionDefault extends SaturnExecutorExtension { private static Logger log; - private static final String NAME_VIP_SATURN_LOG_DIR = "VIP_SATURN_LOG_DIR"; + protected static final String NAME_VIP_SATURN_LOG_DIR = "VIP_SATURN_LOG_DIR"; public SaturnExecutorExtensionDefault(String executorName, String namespace, ClassLoader executorClassLoader, ClassLoader jobClassLoader) { @@ -57,7 +57,7 @@ public void initLogDirEnv() { System.setProperty("saturn.log.dir", saturnLogDir); // for logback.xml } - private static String getEnv(String key, String defaultValue) { + protected static String getEnv(String key, String defaultValue) { String v = System.getenv(key); if (v == null || v.isEmpty()) { return defaultValue; @@ -65,7 +65,7 @@ private static String getEnv(String key, String defaultValue) { return v; } - private static String getDefaultLogDir(String executorName) { + protected String getDefaultLogDir(String executorName) { return "/apps/logs/saturn/" + System.getProperty("namespace") + "/" + executorName + "-" + LocalHostService.cachedIpAddress; } diff --git a/saturn-job-api/src/main/java/com/vip/saturn/job/msg/MsgHolder.java b/saturn-job-api/src/main/java/com/vip/saturn/job/msg/MsgHolder.java index 51290442c..11799414c 100644 --- a/saturn-job-api/src/main/java/com/vip/saturn/job/msg/MsgHolder.java +++ b/saturn-job-api/src/main/java/com/vip/saturn/job/msg/MsgHolder.java @@ -41,6 +41,9 @@ public class MsgHolder implements Serializable { /** Kafka offset */ private long offset; + /** Kafka partition */ + private int partition; + /** * @deprecated because the String type of payload maybe is not right */ @@ -58,6 +61,14 @@ public MsgHolder(byte[] payloadBytes, Set> prop, String me this.offset = offset; } + public MsgHolder(byte[] payloadBytes, Set> prop, String messageId, long offset, int partition) {// NOSONAR + this.payloadBytes = payloadBytes; + this.prop = prop; + this.messageId = messageId; + this.offset = offset; + this.partition = partition; + } + public MsgHolder(byte[] payloadBytes, Set> prop, String messageId) {// NOSONAR this.payloadBytes = payloadBytes; this.prop = prop; @@ -112,6 +123,13 @@ public void copyFrom(Object source) { this.offset = (long) res; } + field = clazz.getDeclaredField("partition"); + field.setAccessible(true); + res = field.get(source); + if (res != null) { + this.partition = (int) res; + } + } catch (Exception e) { throw new RuntimeException(e); } @@ -157,4 +175,8 @@ public String getMessageId() { public long getOffset() { return offset; } + + public int getPartition() { + return partition; + } }