Skip to content

Commit

Permalink
Add shell&curl method
Browse files Browse the repository at this point in the history
  • Loading branch information
ZYallers committed Mar 28, 2022
1 parent 7852de7 commit e667071
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 0 deletions.
13 changes: 13 additions & 0 deletions funcs/execs/shell_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// +build !darwin,!dragonfly,!freebsd,!linux,!netbsd,!openbsd,!plan9,!solaris

package tool

import (
"context"
"errors"
)

// 执行shell命令,可设置执行超时时间
func ExecShellWithContext(ctx context.Context, command string) (string, error) {
return "", errors.New("this function does not support running under the current system")
}
41 changes: 41 additions & 0 deletions funcs/execs/shell_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// +build darwin dragonfly freebsd linux netbsd openbsd plan9 solaris

package tool

import (
"context"
"errors"
"fmt"
"os/exec"
"syscall"
)

type shellResult struct {
output string
err error
}

// 执行shell命令,可设置执行超时时间
func ExecShellWithContext(ctx context.Context, command string) (string, error) {
cmd := exec.Command("/bin/bash", "-c", command)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
resultChan := make(chan shellResult)
go func() {
defer func() {
if err := recover(); err != nil {
resultChan <- shellResult{"", errors.New(fmt.Sprintf("%v", err))}
}
}()
output, err := cmd.CombinedOutput()
resultChan <- shellResult{string(output), err}
}()
select {
case <-ctx.Done():
if cmd.Process.Pid > 0 {
_ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
}
return "", errors.New("timeout killed")
case result := <-resultChan:
return result.output, result.err
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/onsi/gomega v1.18.1 // indirect
github.com/syyongx/php2go v0.9.6
github.com/techoner/gophp v0.2.0
github.com/tidwall/gjson v1.6.1
go.uber.org/zap v1.21.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gorm.io/driver/mysql v1.0.3
Expand Down
251 changes: 251 additions & 0 deletions utils/curl/go_curl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package curl

import (
"encoding/json"
"errors"
"fmt"
"github.com/tidwall/gjson"
"log"
"net/http"
"strconv"
"strings"
"time"
)

type goCurl struct {
reqCounter int
debug bool
timeout time.Duration
Runtime time.Duration
requests []interface{}
Data map[string]interface{}
reqDepend map[string]chan interface{}
reqDependTimes map[string]uint64
Result map[string]interface{}
resultChan chan interface{}
}

func NewGoCurl() *goCurl {
return &goCurl{
timeout: 10 * time.Second,
reqDependTimes: make(map[string]uint64),
resultChan: make(chan interface{}),
reqDepend: make(map[string]chan interface{}),
Result: make(map[string]interface{}),
}
}

func (gc *goCurl) Debug() *goCurl {
gc.debug = true
return gc
}

func (gc *goCurl) print(format string, v ...interface{}) *goCurl {
if gc.debug {
log.Printf(format, v...)
}
return gc
}

func (gc *goCurl) SetData(input string) *goCurl {
// 不用json.Unmarshal方法,是因为解析后存在科学计数法问题
d := json.NewDecoder(strings.NewReader(input))
d.UseNumber()
_ = d.Decode(&gc.Data)
return gc
}

func (gc *goCurl) Done() (*goCurl, error) {
var nowTime = time.Now()
if val, ok := gc.Data["timeout"].(string); ok {
if num, err := strconv.ParseInt(val, 10, 64); err == nil {
gc.timeout = time.Duration(num) * time.Second
}
}

if val, ok := gc.Data["requests"].([]interface{}); ok {
gc.requests = val
} else {
return gc, errors.New(`lack of necessary parameters "requests"`)
}
gc.print("request length: %d, timeout: %v\n", len(gc.requests), gc.timeout)

for _, req := range gc.requests {
if params, ok := req.(map[string]interface{})["params"].(map[string]interface{}); ok {
for _, param := range params {
if param, ok := param.(map[string]interface{}); ok {
dependId := param["depend_id"].(string)
gc.reqDependTimes[dependId]++
}
}
}
}
gc.print("reqDependTimes: %v\n", gc.reqDependTimes)

for _, val := range gc.requests {
if req, ok := val.(map[string]interface{}); ok {
id := req["id"].(string)
if dependTimes, ok := gc.reqDependTimes[id]; ok && dependTimes > 0 {
gc.reqDepend[id] = make(chan interface{})
}
go gc.handler(req)
}
}

LOOP:
for {
select {
case resp, ok := <-gc.resultChan:
if ok {
gc.reqCounter++
gc.print("resultChan: %v, counter: %d\n", resp, gc.reqCounter)
if ele, ok := resp.(map[string]string); ok {
gc.Result[ele["id"]] = map[string]string{"data": ele["data"], "runtime": ele["runtime"]}
}
if gc.reqCounter >= len(gc.requests) {
break LOOP
}
}
case <-time.After(gc.timeout):
gc.print("timeout: %s\n", gc.timeout)
break LOOP
}
}
gc.safeCloseChan(gc.resultChan)
gc.Runtime = time.Since(nowTime)
return gc, nil
}

func (gc *goCurl) handler(req map[string]interface{}) {
var (
nowTime = time.Now()
id string
)
if val, ok := req["id"].(string); ok {
id = val
} else {
err := `missing necessary parameters "id"`
gc.print(err)
gc.safeSendChan(gc.resultChan, map[string]string{"id": id, "data": err, "runtime": time.Since(nowTime).String()})
return
}

var url string
if val, ok := req["url"].(string); ok {
url = val
} else {
err := `missing necessary parameters "url"`
gc.print(err)
gc.safeSendChan(gc.resultChan, map[string]string{"id": id, "data": err, "runtime": time.Since(nowTime).String()})
return
}

var httpMethod = http.MethodGet
if val, ok := req["type"].(string); ok {
httpMethod = strings.ToUpper(val)
}

var timeout = gc.timeout - 1*time.Second // 比全局的timeout少1秒
if val, ok := req["timeout"].(string); ok {
if num, err := strconv.ParseInt(val, 10, 64); err == nil {
if to := time.Duration(num) * time.Second; to < timeout { // DIY的时间不能大于全局的时间
timeout = to
}
}
}

headers := make(map[string]string)
switch httpMethod {
case http.MethodPost:
headers["Content-Type"] = "application/x-www-form-urlencoded"
default:
headers["Content-Type"] = "application/json;charset=utf-8"
}
if val, ok := req["headers"].(map[string]interface{}); ok {
for k, v := range val {
headers[k] = v.(string)
}
}

queries := make(map[string]string)
postData := make(map[string]interface{})
if params, ok := req["params"].(map[string]interface{}); ok {
for key, value := range params {
var transfer interface{}
if depend, ok := value.(map[string]interface{}); ok {
dependId := depend["depend_id"].(string)
if dependChan, ok := gc.reqDepend[dependId]; ok {
LOOP:
for {
select {
case resp, ok := <-dependChan:
if ok {
gc.print("dependChan: %s, resp: %s\n", dependId, resp)
if res := gjson.Get(resp.(string), depend["depend_param"].(string)); res.Exists() {
transfer = res.Value()
}
break LOOP
}
case <-time.After(timeout):
gc.print("dependChan: %s, timeout: %s\n", dependId, timeout)
break LOOP
}
}
}
} else {
transfer = value
}
if transfer == nil {
transfer = ""
}
if httpMethod == http.MethodGet {
queries[key] = fmt.Sprintf("%v", transfer)
} else {
postData[key] = transfer
}
}
}

curl := NewRequest(url).SetMethod(httpMethod).SetTimeOut(timeout).SetHeaders(headers).SetQueries(queries).SetPostData(postData)
gc.print("------>begin id: %s, url: %s, type: %s, headers: %#v, queries: %#v, postData: %#v\n", id, url, httpMethod, headers, queries, postData)

var respBody string
if resp, err := curl.Send(); err == nil {
respBody = resp.Body
} else {
respBody = err.Error()
}
runtime := time.Since(nowTime).String()
gc.print("<------end id: %s, runtime: %s, respBody: %v.\n", id, runtime, respBody)
gc.safeSendChan(gc.resultChan, map[string]string{"id": id, "data": respBody, "runtime": runtime})

if dependChan, ok := gc.reqDepend[id]; ok {
if dependTimes, ok := gc.reqDependTimes[id]; ok && dependTimes > 0 {
var i uint64
for i = 0; i < dependTimes; i++ {
gc.safeSendChan(dependChan, respBody)
}
gc.safeCloseChan(dependChan)
}
}
}

func (gc *goCurl) safeSendChan(ch chan<- interface{}, value interface{}) (closed bool) {
defer func() {
if recover() != nil {
closed = true
}
}()
ch <- value
return false
}

func (gc *goCurl) safeCloseChan(ch chan interface{}) (closed bool) {
defer func() {
if recover() != nil {
closed = false
}
}()
close(ch)
return true
}
66 changes: 66 additions & 0 deletions utils/json/time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package json

import (
"database/sql/driver"
"fmt"
"time"
)

// Time 自定义时间
type Time time.Time

const (
format = "2006-01-02 15:04:05"
zone = "Asia/Shanghai"
)

// UnmarshalJSON implements json unmarshal interface.
func (t *Time) UnmarshalJSON(data []byte) (err error) {
now, err := time.ParseInLocation(`"`+format+`"`, string(data), time.Local)
*t = Time(now)
return
}

// MarshalJSON implements json marshal interface.
func (t Time) MarshalJSON() ([]byte, error) {
if time.Time(t).IsZero() {
return []byte{'n', 'u', 'l', 'l'}, nil
}

b := make([]byte, 0, len(format)+2)
b = append(b, '"')
b = time.Time(t).AppendFormat(b, format)
b = append(b, '"')
return b, nil
}

// String ...
func (t Time) String() string {
return time.Time(t).Format(format)
}

// local ...
func (t Time) local() time.Time {
loc, _ := time.LoadLocation(zone)
return time.Time(t).In(loc)
}

// Value ...
func (t Time) Value() (driver.Value, error) {
var zeroTime time.Time
var ti = time.Time(t)
if ti.UnixNano() == zeroTime.UnixNano() {
return nil, nil
}
return ti, nil
}

// Scan value of time.Time
func (t *Time) Scan(v interface{}) error {
value, ok := v.(time.Time)
if ok {
*t = Time(value)
return nil
}
return fmt.Errorf("can not convert %v to timestamp", v)
}

0 comments on commit e667071

Please sign in to comment.