Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V018 info #477

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions discovery/src/dns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,6 @@ impl DnsCache {
// 每16个tick执行一次empty,避免某一次刷新未解释成功导致需要等待下一个周期。
// 其他情况下,每个tick只会刷新部分chunk数据。
fn iter(&mut self) -> HostRecordIter<'_> {
const PERIOD: usize = 128;
let ith = self.cycle % PERIOD;
// 如果当前是走空搜索,下一次扫描的时候会搜索2个chunk.
self.cycle += 1;
// 16是一个经验值。
if self.hosts.len() > self.last_len || (self.cycle % 16 == 0 && self.hosts.has_empty()) {
self.last_len = self.hosts.len();
Expand All @@ -180,10 +176,22 @@ impl DnsCache {
// 刷新从上个idx开始的一个chunk长度的数据
assert!(self.last_len == self.hosts.len());
let len = self.last_len;
const PERIOD: usize = 128;
let ith = self.cycle % PERIOD;
self.cycle += 1;
let chunk = (len + (PERIOD - 1)) / PERIOD;
// 因为chunk是动态变化的,所以不能用last_idx + chunk
let end = ((ith + 1) * chunk).min(len);
assert!(self.last_idx <= end);
assert!(
self.last_idx <= end,
"addr:{:p} last_idx:{}, end:{}, len:{}, ith:{} chunk:{}",
&self,
self.last_idx,
end,
len,
ith,
chunk,
);
let iter = self.hosts.hosts[self.last_idx..end].iter_mut();
self.last_idx = end;
if self.last_idx == len {
Expand Down
2 changes: 1 addition & 1 deletion endpoint/src/redisservice/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl RedisNamespace {
pub(super) fn timeout_slave(&self) -> Timeout {
let mut to = TO_REDIS_S;
if self.basic.timeout_ms_slave > 0 {
to.adjust(self.basic.timeout_ms_master);
to.adjust(self.basic.timeout_ms_slave);
}
to
}
Expand Down
2 changes: 1 addition & 1 deletion endpoint/src/vector/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl VectorNamespace {
pub(crate) fn timeout_slave(&self) -> Timeout {
let mut to = TO_VECTOR_S;
if self.basic.timeout_ms_slave > 0 {
to.adjust(self.basic.timeout_ms_master);
to.adjust(self.basic.timeout_ms_slave);
}
to
}
Expand Down
2 changes: 1 addition & 1 deletion stream/src/checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<P, Req> BackendChecker<P, Req> {
let handler = Handler::from(rx, stream, p, path_addr.clone());
let handler = Entry::timeout(handler, Timeout::from(self.timeout.ms()));
let ret = handler.await;
println!(
log::info!(
"backend error {:?} => {:?} finish: {}",
path_addr,
ret,
Expand Down
9 changes: 8 additions & 1 deletion stream/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct Handler<'r, Req, P, S> {

// 连续多少个cycle检查到当前没有请求发送,则发送一个ping
ping_cycle: u16,
name: Path,
}
impl<'r, Req, P, S> Future for Handler<'r, Req, P, S>
where
Expand Down Expand Up @@ -60,6 +61,7 @@ where
{
pub(crate) fn from(data: &'r mut Receiver<Req>, s: S, parser: P, path: Path) -> Self {
data.enable();
let name = path.clone();
let rtt = path.rtt("req");
Self {
data,
Expand All @@ -71,6 +73,7 @@ where
num: Number::default(),
req_buf: Vec::with_capacity(4),
ping_cycle: 0,
name,
}
}
// 检查连接是否存在
Expand Down Expand Up @@ -133,6 +136,9 @@ where
while self.s.len() > 0 {
let l = self.s.len();
if let Some(cmd) = self.parser.parse_response(&mut self.s)? {
if self.pending.len() == 0 {
panic!("unexpect response handler:{:?}", &self);
}
let (req, start) = self.pending.pop_front().expect("take response");
self.num.rx();
// 统计请求耗时。
Expand Down Expand Up @@ -237,8 +243,9 @@ impl<'r, Req, P, S: Debug> Debug for Handler<'r, Req, P, S> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"handler num:{:?} p_req:{} {} {} buf:{:?} data:{:?}",
"handler num:{:?} resource:{:?} p_req:{} {} {} buf:{:?} data:{:?}",
self.num,
self.name,
self.pending.len(),
self.rtt,
self.host_metric,
Expand Down
Loading