Skip to content

Commit

Permalink
first
Browse files Browse the repository at this point in the history
  • Loading branch information
wding authored and wding committed Sep 11, 2019
0 parents commit 90c1385
Show file tree
Hide file tree
Showing 4,166 changed files with 835,312 additions and 0 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# collector
31 changes: 31 additions & 0 deletions conf/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package conf

import (
"log"
"gopkg.in/yaml.v2"
"io/ioutil"
)

type Config struct {
Input InputConf
Filter FilterConf
Output []OutPutConf
}

func ParseConf(path string) *Config {

yamlFile, err := ioutil.ReadFile(path)
if err != nil {
log.Printf("ReadFile fail:%v",err)
return nil
}

c := &Config{}

err = yaml.Unmarshal(yamlFile, c)
if err != nil {
log.Printf("ReadFile fail:%v",err)
return nil
}
return c
}
17 changes: 17 additions & 0 deletions conf/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package conf

type FilterConf struct {
Plugin string
Table []string
Worker int
Redis string
TagList map[string]interface{} `yaml:"taglist,omitempty"`
FieldList map[string]interface{}
Time []string
TimeShift float64
TimeUnit int
RedisHost string
RedisPass string
RedisTTl int64
Mongo string
}
7 changes: 7 additions & 0 deletions conf/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package conf

type InputConf struct {
BrokerList []string
Topic []string
Name string
}
10 changes: 10 additions & 0 deletions conf/output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package conf

type OutPutConf struct {
OutType string
Addr string
Worker int
Dbname string
Rp string
Name string
}
35 changes: 35 additions & 0 deletions config/membership_coupon.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
output:
- outtype: influxdb
addr: :9000
worker: 3
dbname: xx
name: xx

input:
name: xx
brokerlist:
- xx:9092
topic:
- xx



filter:
worker: 5
table:
- xx1
- xx2
taglist:
xx: t
xx: $oid
fieldlist:
xx: t
time:
- utime
timeshift: 0
timeunit: 1 #1 秒; 0 毫秒
redishost: xx:6379
redispass: xx
redisttl: 1209600
mongo: xx:xx@xx

118 changes: 118 additions & 0 deletions filter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package filter


import (
"context"
"fmt"
"log"
"time"
"github.com/collector/stat"
"github.com/collector/conf"
"github.com/collector/sender"
"github.com/collector/model"
"github.com/collector/model/oplogManagerPB"
"github.com/golang/protobuf/proto"
"github.com/garyburd/redigo/redis"
)

var (
_ = fmt.Sprintf("")
)

type Filter struct {
senderList []sender.Sender
conf conf.FilterConf
input *stat.Static
data chan []byte
stop chan bool
ctx context.Context
cancel context.CancelFunc
model model.General
redisIns *redis.Pool
}

func NewRedisPoos(host string,pass string) *redis.Pool {

RedisClient := &redis.Pool{
MaxIdle: 1,
MaxActive: 60,
IdleTimeout: 180 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", host)
if err != nil {
return nil, err
}
if _, err := c.Do("AUTH", pass); err != nil {
c.Close()
return nil, err
}

c.Do("SELECT", 0)
return c, nil
},
}
return RedisClient
}

func NewFilter(c *conf.Config,senderList []sender.Sender) *Filter {
f := &Filter{}
f.senderList = senderList
f.conf = c.Filter
f.data = make(chan []byte,100000)
f.stop = make(chan bool)
//f.input = stat.NewStatic("input:filter")
f.ctx, f.cancel = context.WithCancel(context.Background())

f.redisIns = NewRedisPoos(f.conf.RedisHost, f.conf.RedisPass)

f.model = model.FetchInstance(&f.conf,f.redisIns)

for k:=0;k<f.conf.Worker;k++ {
go f.Run(k)
}
log.Printf( "new filter:%+v,model:%T",f.conf,f.model )

return f
}


func (f *Filter) Input(data []byte) {
f.data <- data
}

func (f *Filter) Run(num int) {
for {
select {
case msg := <- f.data:

o := &oplogManagerPB.Oplog{}
err := proto.Unmarshal(msg[4:], o)
if err != nil {
log.Printf("proto decode fail:%v",err)
continue
}

for index,_ := range f.senderList {
tt := f.senderList[index].Type()
if tt == "inf" {
data := f.model.ConverToInfP(o)
f.senderList[index].Add(data)
} else {
ret := f.model.Filter(o, f.senderList[index].Type() )
if len(ret) == 0 {
continue
}
f.senderList[index].Convert(ret)
}
}
//f.input.Add(1)
case <- f.ctx.Done():
log.Printf("close filter worker:%v",num)
return
}
}
}

func (f *Filter) Close() {
f.cancel()
}
Binary file added main
Binary file not shown.
44 changes: 44 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package main

import (
"flag"
"os"
"os/signal"
"github.com/collector/receiver"
"github.com/collector/sender"
"github.com/collector/filter"
"github.com/collector/conf"
)



func main() {

confPath := flag.String("conf","","path")

flag.Parse()

c := conf.ParseConf(*confPath)

senderList := sender.FetchSenders(c)

filterT := filter.NewFilter(c,senderList)

kafka := receiver.NewKafkaIns(c,filterT)

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

for {
select {
case <-signals:
//以后改成使用context关闭
kafka.Close()
filterT.Close()
for k,_ := range senderList {
senderList[k].Close()
}
return
}
}
}
Loading

0 comments on commit 90c1385

Please sign in to comment.