Skip to content

Commit

Permalink
[INLONG-11675][SDK] Optimize IpUtils class related implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gosonzhang committed Jan 15, 2025
1 parent 4d45c7b commit eb6da07
Show file tree
Hide file tree
Showing 14 changed files with 102 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
import org.apache.inlong.sdk.dataproxy.network.IpUtils;

import lombok.Data;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -98,11 +97,8 @@ public class ProxyClientConfig {
private int senderMaxAttempt = ConfigConstants.DEFAULT_SENDER_MAX_ATTEMPT;

/* pay attention to the last url parameter ip */
public ProxyClientConfig(String localHost, boolean visitManagerByHttp, String managerIp,
public ProxyClientConfig(boolean visitManagerByHttp, String managerIp,
int managerPort, String inlongGroupId, String authSecretId, String authSecretKey) throws ProxySdkException {
if (StringUtils.isBlank(localHost)) {
throw new ProxySdkException("localHost is blank!");
}
if (StringUtils.isBlank(managerIp)) {
throw new ProxySdkException("managerIp is Blank!");
}
Expand All @@ -116,7 +112,6 @@ public ProxyClientConfig(String localHost, boolean visitManagerByHttp, String ma
this.visitManagerByHttp = visitManagerByHttp;
this.managerPort = managerPort;
this.managerIP = managerIp;
IpUtils.validLocalIp(localHost);
this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE;
this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE;
this.proxyHttpUpdateIntervalMinutes = ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.EncryptInfo;
import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import org.apache.inlong.sdk.dataproxy.utils.AuthzUtils;
import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
Expand Down Expand Up @@ -84,8 +85,8 @@ private ByteBuf writeToBuf8(EncodeObject object) {
}
long timestamp = System.currentTimeMillis();
int nonce = new SecureRandom(String.valueOf(timestamp).getBytes()).nextInt(Integer.MAX_VALUE);
endAttr = endAttr + "_userName=" + object.getUserName() + "&_clientIP=" + IpUtils.getLocalIp()
+ "&_signature=" + IpUtils.generateSignature(object.getUserName(),
endAttr = endAttr + "_userName=" + object.getUserName() + "&_clientIP=" + ProxyUtils.getLocalIp()
+ "&_signature=" + AuthzUtils.generateSignature(object.getUserName(),
timestamp, nonce, object.getSecretKey())
+ "&_timeStamp=" + timestamp + "&_nonce=" + nonce;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.inlong.sdk.dataproxy.ConfigConstants;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.apache.inlong.sdk.dataproxy.utils.Tuple2;

import com.google.gson.Gson;
Expand Down Expand Up @@ -850,7 +850,7 @@ private void addAuthorizationInfo(HttpPost httpPost) {

private List<BasicNameValuePair> buildProxyNodeQueryParams() {
ArrayList<BasicNameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("ip", IpUtils.getLocalIp()));
params.add(new BasicNameValuePair("ip", ProxyUtils.getLocalIp()));
params.add(new BasicNameValuePair("protocolType", clientConfig.getProtocolType()));
return params;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,23 @@ public static void main(String[] args) {
String configBasePath = "";
String inLongManagerAddr = "127.0.0.1";
String inLongManagerPort = "8083";
String localIP = "127.0.0.1";
String messageBody = "inlong message body!";

HttpProxySender sender = getMessageSender(localIP, inLongManagerAddr,
HttpProxySender sender = getMessageSender(inLongManagerAddr,
inLongManagerPort, inlongGroupId, true, false,
configBasePath);
sendHttpMessage(sender, inlongGroupId, inlongStreamId, messageBody);
sender.close(); // close the sender
}

public static HttpProxySender getMessageSender(String localIP, String inLongManagerAddr,
public static HttpProxySender getMessageSender(String inLongManagerAddr,
String inLongManagerPort, String inlongGroupId,
boolean requestByHttp, boolean isReadProxyIPFromLocal,
String configBasePath) {
ProxyClientConfig proxyConfig = null;
HttpProxySender sender = null;
try {
proxyConfig = new ProxyClientConfig(localIP, requestByHttp, inLongManagerAddr,
proxyConfig = new ProxyClientConfig(requestByHttp, inLongManagerAddr,
Integer.valueOf(inLongManagerPort),
inlongGroupId, "admin", "inlong");// user and password of manager
proxyConfig.setConfigStoreBasePath(configBasePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class TcpClientExample {

private static final Logger logger = LoggerFactory.getLogger(TcpClientExample.class);

public static String localIP = "127.0.0.1";

/**
* Example of client tcp.
*/
Expand All @@ -54,20 +52,20 @@ public static void main(String[] args) throws InterruptedException {

TcpClientExample tcpClientExample = new TcpClientExample();
DefaultMessageSender sender = tcpClientExample
.getMessageSender(localIP, inLongManagerAddr, inLongManagerPort,
.getMessageSender(inLongManagerAddr, inLongManagerPort,
inlongGroupId, true, false, configBasePath, msgType);
tcpClientExample.sendTcpMessage(sender, inlongGroupId, inlongStreamId,
messageBody, System.currentTimeMillis());
sender.close(); // close the sender
}

public DefaultMessageSender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort,
public DefaultMessageSender getMessageSender(String inLongManagerAddr, String inLongManagerPort,
String inlongGroupId, boolean requestByHttp, boolean isReadProxyIPFromLocal,
String configBasePath, int msgType) {
ProxyClientConfig dataProxyConfig = null;
DefaultMessageSender messageSender = null;
try {
dataProxyConfig = new ProxyClientConfig(localIP, requestByHttp, inLongManagerAddr,
dataProxyConfig = new ProxyClientConfig(requestByHttp, inLongManagerAddr,
Integer.valueOf(inLongManagerPort), inlongGroupId, "admin", "inlong");
if (StringUtils.isNotEmpty(configBasePath)) {
dataProxyConfig.setConfigStoreBasePath(configBasePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.apache.inlong.sdk.dataproxy.utils.Tuple2;

import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class ClientMgr {
private static final LogCounter logCounter = new LogCounter(10, 100000, 60 * 1000L);
private static final LogCounter updConExptCnt = new LogCounter(10, 100000, 60 * 1000L);
private static final LogCounter exptCounter = new LogCounter(10, 100000, 60 * 1000L);
private static final byte[] hbMsgBody = IpUtils.getLocalIp().getBytes(StandardCharsets.UTF_8);
private static final byte[] hbMsgBody = ProxyUtils.getLocalIp().getBytes(StandardCharsets.UTF_8);

private final Sender sender;
private final ProxyClientConfig configure;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
import org.apache.inlong.sdk.dataproxy.threads.TimeoutScanThread;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.apache.inlong.sdk.dataproxy.utils.Tuple2;

import io.netty.channel.Channel;
Expand Down Expand Up @@ -167,7 +168,7 @@ public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID) {
}
if (configure.isEnableMetric()) {
metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(),
encodeObject.getStreamId(), IpUtils.getLocalIp(), encodeObject.getDt(),
encodeObject.getStreamId(), ProxyUtils.getLocalIp(), encodeObject.getDt(),
encodeObject.getPackageTime(), encodeObject.getRealCnt());
}
SendResult message;
Expand Down Expand Up @@ -320,7 +321,7 @@ public void asyncSendMessage(EncodeObject encodeObject,
}
if (configure.isEnableMetric()) {
metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(),
encodeObject.getStreamId(), IpUtils.getLocalIp(), encodeObject.getPackageTime(),
encodeObject.getStreamId(), ProxyUtils.getLocalIp(), encodeObject.getPackageTime(),
encodeObject.getDt(), encodeObject.getRealCnt());
}
// send message package time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.inlong.sdk.dataproxy.network;

import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

import java.security.SecureRandom;
import java.util.concurrent.atomic.AtomicInteger;

public class SequentialID {

private static final SecureRandom sRandom = new SecureRandom(
Long.toString(System.nanoTime()).getBytes());
private final String ip = IpUtils.getLocalIp();
private final String ip = ProxyUtils.getLocalIp();
private final AtomicInteger id = new AtomicInteger(sRandom.nextInt());

public SequentialID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.inlong.sdk.dataproxy.metric.MessageRecord;
import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
import org.apache.inlong.sdk.dataproxy.metric.MetricTimeNumSummary;
import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -207,7 +207,7 @@ private void sendSingleLine(String line, String streamId, long dtTime) {
EncodeObject encodeObject = new EncodeObject(Collections.singletonList(line.getBytes()), 7,
false, false, false,
dtTime, idGenerator.getNextInt(),
metricConfig.getMetricGroupId(), streamId, "", "", IpUtils.getLocalIp());
metricConfig.getMetricGroupId(), streamId, "", "", ProxyUtils.getLocalIp());
MetricSendCallBack callBack = new MetricSendCallBack(encodeObject);
tryToSendMetricToManager(encodeObject, callBack);
}
Expand Down
Loading

0 comments on commit eb6da07

Please sign in to comment.