-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpipeServer.js
92 lines (92 loc) · 3.21 KB
/
pipeServer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.PipeType = void 0;
exports.createPipeServer = createPipeServer;
const net_1 = require("net");
const mtcp_1 = require("./mtcp");
var PipeType;
(function (PipeType)
{
PipeType[PipeType["tcp2mtcp"] = 0] = "tcp2mtcp";
PipeType[PipeType["mtcp2tcp"] = 1] = "mtcp2tcp";
})(PipeType || (exports.PipeType = PipeType = {}));
function createPipeServer(upstream_port, upstream_host, pipe_type = PipeType.tcp2mtcp, lisent_port, poolCount = 2, //每个连接的子流个数
preLinkCount = 0)
{
let createServerFn = pipe_type == PipeType.tcp2mtcp ? net_1.createServer : mtcp_1.createMTcpServer;
let connectFn = pipe_type == PipeType.tcp2mtcp ? mtcp_1.connectMTcp : net_1.connect;
let preConnPool = []; //预连接池
//预连接
const PreConnect = () =>
{
let conn = connectFn({ port: upstream_port, host: upstream_host }, function ()
{
preConnPool.push(conn);
const remove = () =>
{
let index = preConnPool.indexOf(conn);
if (index !== -1) {
preConnPool.splice(index, 1);
PreConnect();
}
};
conn.on("close", remove);
conn.on('error', () =>
{
remove();
conn.destroy();
});
});
};
if (pipe_type === PipeType.mtcp2tcp && preLinkCount > 0) {
for (let i = 0; i < preLinkCount; i++)
setTimeout(PreConnect, i * 500);
//每3s新建连接
setInterval(() =>
{
if (preConnPool.length === preLinkCount) {
preConnPool.shift();
PreConnect();
}
}, 3000);
}
let server = createServerFn(function (conn)
{
if (pipe_type === PipeType.tcp2mtcp)
mtcp_1.MSocket.PoolCount = poolCount;
let up_socket;
if (preConnPool.length) {
up_socket = preConnPool.shift();
up_socket.removeAllListeners("close");
up_socket.removeAllListeners("error");
up_socket.pipe(conn, { end: true });
conn.pipe(up_socket, { end: true });
}
else
up_socket = connectFn({ port: upstream_port, host: upstream_host }, function ()
{
up_socket.pipe(conn, { end: true });
conn.pipe(up_socket, { end: true });
});
const destroy = () =>
{
up_socket.destroySoon();
conn.destroySoon();
};
conn.on('error', destroy);
up_socket.on('error', destroy);
conn.on('close', destroy);
up_socket.on('close', destroy);
});
if (lisent_port) {
server.listen(lisent_port, () =>
{
console.log(`启动成功:listen ${pipe_type === PipeType.mtcp2tcp ? "mtcp" : "tcp"}:${lisent_port} -> ${pipe_type === PipeType.mtcp2tcp ? "tcp" : "mtcp"}:${upstream_host}:${upstream_port}`);
});
server.on("error", (err) =>
{
console.log(`启动失败:listen:${lisent_port} -> ${upstream_host}:${upstream_port} err:${err.message}`);
});
}
return server;
}