From f1cf730fb697530f53a8f4ce575695a17ccb4b0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mindaugas=20=C5=BDvirblis?= Date: Fri, 21 Feb 2020 12:36:15 +0200 Subject: [PATCH] go version --- go/.gitignore | 2 + go/database.go | 125 ++++++++++++++++++++++++++++++++++++ go/fifocache.go | 32 ++++++++++ go/go.mod | 10 +++ go/go.sum | 8 +++ go/main.go | 166 ++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 343 insertions(+) create mode 100644 go/.gitignore create mode 100644 go/database.go create mode 100644 go/fifocache.go create mode 100644 go/go.mod create mode 100644 go/go.sum create mode 100644 go/main.go diff --git a/go/.gitignore b/go/.gitignore new file mode 100644 index 0000000..cc1b2e7 --- /dev/null +++ b/go/.gitignore @@ -0,0 +1,2 @@ +files +db.db diff --git a/go/database.go b/go/database.go new file mode 100644 index 0000000..80effef --- /dev/null +++ b/go/database.go @@ -0,0 +1,125 @@ +package main + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "fmt" + "log" + "os" + "strconv" + "time" + + "github.com/etcd-io/bbolt" +) + +type DatabaseItem struct { + Name string + Len float64 + T int64 +} + +var ( + database *bbolt.DB + dbbucket = []byte("main") +) + +func database_init() { + var err error + database, err = bbolt.Open("./db.db", 0755, nil) + if err != nil { + log.Fatalf("Open database error: %v", err) + } + database.Update(func(tx *bbolt.Tx) error { + tx.CreateBucketIfNotExists(dbbucket) + return nil + }) +} + +func database_store(item *DatabaseItem) { + encoded, _ := json.Marshal(item) + database.Update(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(dbbucket) + bucket.Put(itob(int(item.T)), encoded) + return nil + + }) +} + +func database_get(s, l string) []*DatabaseItem { + + si, err := strconv.Atoi(s) + if err != nil { + log.Printf("database_get error: %v", err) + return nil + } + li, err := strconv.Atoi(l) + if err != nil { + log.Printf("database_get error: %v", err) + return nil + } + + var rt []*DatabaseItem + + database.View(func(tx *bbolt.Tx) error { + start := itob(si * 1000000000) + end := itob(si + li*60*1000000000) + + bucket := tx.Bucket(dbbucket) + cursor := bucket.Cursor() + + for k, v := cursor.Seek(start); k != nil; k, v = cursor.Next() { + if bytes.Compare(k, end) < 0 { + return nil + } + var item *DatabaseItem + err := json.Unmarshal(v, &item) + if err != nil { + log.Printf("error %v", err) + continue + } + rt = append(rt, item) + + } + return nil + }) + + return rt +} + +func database_worker() { + for { + current := time.Now().UnixNano() - int64(*flagTail*60*60*int(time.Nanosecond)) + err := database.Update(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(dbbucket) + cursor := bucket.Cursor() + + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + _ = v + if bytes.Compare(k, itob(int(current))) < 0 { + return nil + } + var item DatabaseItem + err := json.Unmarshal(v, &item) + if err != nil { + return err + } + log.Printf("removing ./files/%v", item.Name) + os.RemoveAll(fmt.Sprintf("./files/%v", item.Name)) + bucket.Delete(k) + + } + return nil + }) + if err != nil { + log.Printf("error %v", err) + } + time.Sleep(time.Second) + } +} + +func itob(v int) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(v)) + return b +} diff --git a/go/fifocache.go b/go/fifocache.go new file mode 100644 index 0000000..ab8375e --- /dev/null +++ b/go/fifocache.go @@ -0,0 +1,32 @@ +package main + +import ( + "container/list" + "sync" +) + +var ( + fifomap = make(map[string]struct{}) + fifolist = list.New() + fifomu = &sync.RWMutex{} +) + +func cache_set(url string) bool { + fifomu.Lock() + defer fifomu.Unlock() + + _, ok := fifomap[url] + if ok { + return false + } + + fifomap[url] = struct{}{} + fifolist.PushFront(url) + + for fifolist.Len() > 10 { + item := fifolist.Back() + delete(fifomap, item.Value.(string)) + fifolist.Remove(item) + } + return true +} diff --git a/go/go.mod b/go/go.mod new file mode 100644 index 0000000..7957971 --- /dev/null +++ b/go/go.mod @@ -0,0 +1,10 @@ +module vsr + +go 1.13 + +require ( + github.com/etcd-io/bbolt v1.3.3 + github.com/gorilla/mux v1.7.4 + github.com/grafov/m3u8 v0.11.1 + go.etcd.io/bbolt v1.3.3 // indirect +) diff --git a/go/go.sum b/go/go.sum new file mode 100644 index 0000000..ce8d019 --- /dev/null +++ b/go/go.sum @@ -0,0 +1,8 @@ +github.com/etcd-io/bbolt v1.3.3 h1:gSJmxrs37LgTqR/oyJBWok6k6SvXEUerFTbltIhXkBM= +github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= +github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc= +github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/grafov/m3u8 v0.11.1 h1:igZ7EBIB2IAsPPazKwRKdbhxcoBKO3lO1UY57PZDeNA= +github.com/grafov/m3u8 v0.11.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080= +go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= +go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= diff --git a/go/main.go b/go/main.go new file mode 100644 index 0000000..1ef4ad0 --- /dev/null +++ b/go/main.go @@ -0,0 +1,166 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "os" + "time" + + "github.com/gorilla/mux" + + "github.com/grafov/m3u8" +) + +var ( + flagURL = flag.String("url", "", "url to fetch") + flagTail = flag.Int("tail", 24, "how much hours to keep") + flagHost = flag.String("host", "http://localhos:8080", "add host to m3u8") + flagBindTo = flag.String("bind-to", ":8080", "bind to ip:port") + + flagDebug = flag.Bool("debug", false, "") +) + +func main() { + flag.Parse() + + if *flagURL == "" { + log.Printf("Set url to fetch: ./app --url [url-here]") + return + } + + err := os.MkdirAll("./files", 0755) + if err != nil { + log.Fatalf("mkdir fail: %v", err) + } + + database_init() + + if !*flagDebug { + go fetcher() + } + + go database_worker() + + router := mux.NewRouter() + + router.HandleFunc("/start/{start}/{limit}/stream.m3u8", func(w http.ResponseWriter, r *http.Request) { + + varz := mux.Vars(r) + + out := "#EXTM3U\n" + out += "#EXT-X-PLAYLIST-TYPE:VOD\n" + out += "#EXT-X-TARGETDURATION:20\n" + out += "#EXT-X-VERSION:4\n" + out += "#EXT-X-MEDIA-SEQUENCE:0\n" + + items := database_get(varz["start"], varz["limit"]) + + for _, item := range items { + out += fmt.Sprintf("#EXTINF:%f\n", item.Len) + out += fmt.Sprintf("%s\n", item.Name) + } + + out += "#EXT-X-ENDLIST\n" + + w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") + w.Header().Set("Content-Lenght", fmt.Sprintf("%d", len(out))) + w.Write([]byte(out)) + }) + + router.HandleFunc("/start/{start}/{limit}/{ts:.+}", func(w http.ResponseWriter, r *http.Request) { + varz := mux.Vars(r) + w.Header().Set("Content-Type", "text/vnd.trolltech.linguist") + b, err := ioutil.ReadFile(fmt.Sprintf("./files/%s", varz["ts"])) + if err != nil { + log.Printf("error %v", err) + return + } + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(b))) + w.Write(b) + + }) + + log.Printf("Starting server on %v", *flagBindTo) + log.Fatal(http.ListenAndServe(*flagBindTo, router)) +} + +func fetcher() { + mainurl, _ := url.Parse(*flagURL) + for { + start_at: + b := fetch(mainurl.String()) + buf := bytes.NewBuffer(b) + pl, pt, err := m3u8.Decode(*buf, true) + if err != nil { + log.Printf("fetcher error: %v %v", mainurl.String(), err) + time.Sleep(1 * time.Second) + continue + } + if pt == m3u8.MASTER { + masterpl := pl.(*m3u8.MasterPlaylist) + for _, variant := range masterpl.Variants { + mainurl, _ = mainurl.Parse(variant.URI) + log.Printf("%v", mainurl.String()) + goto start_at + } + + } else if pt == m3u8.MEDIA { + mediapl := pl.(*m3u8.MediaPlaylist) + for _, segment := range mediapl.Segments { + if segment == nil { + continue + } + fetchurl, _ := mainurl.Parse(segment.URI) + fetchurl.RawQuery = mainurl.RawQuery + if cache_set(fetchurl.String()) { + log.Printf("%v", fetchurl.String()) + currenttime := time.Now().UnixNano() + item := &DatabaseItem{ + Name: fmt.Sprintf("%v.ts", currenttime), + Len: segment.Duration, + T: currenttime, + } + database_store(item) + + b := fetch(fetchurl.String()) + if b != nil { + err := ioutil.WriteFile("./files/"+item.Name, b, 0755) + if err != nil { + log.Printf("error on write file to fs %v", err) + continue + } + } + } + } + } + time.Sleep(3 * time.Second) + } +} + +func fetch(url string) []byte { + hc := http.Client{Timeout: 10 * time.Second} + + request, _ := http.NewRequest("GET", url, nil) + request.Header.Set("User-Agent", "iptv/1.0") + + response, err := hc.Do(request) + if err != nil { + log.Printf("fetch error %v %v", url, err) + return nil + } + defer response.Body.Close() + if response.StatusCode/100 != 2 { + log.Printf("Invalid response code %v %v", url, response.StatusCode) + return nil + } + b, err := ioutil.ReadAll(response.Body) + if err != nil { + return nil + } + return b +}