Skip to content

Commit

Permalink
go version
Browse files Browse the repository at this point in the history
  • Loading branch information
blinkinglight committed Feb 21, 2020
1 parent 9f103b9 commit f1cf730
Show file tree
Hide file tree
Showing 6 changed files with 343 additions and 0 deletions.
2 changes: 2 additions & 0 deletions go/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
files
db.db
125 changes: 125 additions & 0 deletions go/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package main

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"time"

"github.com/etcd-io/bbolt"
)

type DatabaseItem struct {
Name string
Len float64
T int64
}

var (
database *bbolt.DB
dbbucket = []byte("main")
)

func database_init() {
var err error
database, err = bbolt.Open("./db.db", 0755, nil)
if err != nil {
log.Fatalf("Open database error: %v", err)
}
database.Update(func(tx *bbolt.Tx) error {
tx.CreateBucketIfNotExists(dbbucket)
return nil
})
}

func database_store(item *DatabaseItem) {
encoded, _ := json.Marshal(item)
database.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(dbbucket)
bucket.Put(itob(int(item.T)), encoded)
return nil

})
}

func database_get(s, l string) []*DatabaseItem {

si, err := strconv.Atoi(s)
if err != nil {
log.Printf("database_get error: %v", err)
return nil
}
li, err := strconv.Atoi(l)
if err != nil {
log.Printf("database_get error: %v", err)
return nil
}

var rt []*DatabaseItem

database.View(func(tx *bbolt.Tx) error {
start := itob(si * 1000000000)
end := itob(si + li*60*1000000000)

bucket := tx.Bucket(dbbucket)
cursor := bucket.Cursor()

for k, v := cursor.Seek(start); k != nil; k, v = cursor.Next() {
if bytes.Compare(k, end) < 0 {
return nil
}
var item *DatabaseItem
err := json.Unmarshal(v, &item)
if err != nil {
log.Printf("error %v", err)
continue
}
rt = append(rt, item)

}
return nil
})

return rt
}

func database_worker() {
for {
current := time.Now().UnixNano() - int64(*flagTail*60*60*int(time.Nanosecond))
err := database.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(dbbucket)
cursor := bucket.Cursor()

for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
_ = v
if bytes.Compare(k, itob(int(current))) < 0 {
return nil
}
var item DatabaseItem
err := json.Unmarshal(v, &item)
if err != nil {
return err
}
log.Printf("removing ./files/%v", item.Name)
os.RemoveAll(fmt.Sprintf("./files/%v", item.Name))
bucket.Delete(k)

}
return nil
})
if err != nil {
log.Printf("error %v", err)
}
time.Sleep(time.Second)
}
}

func itob(v int) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(v))
return b
}
32 changes: 32 additions & 0 deletions go/fifocache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import (
"container/list"
"sync"
)

var (
fifomap = make(map[string]struct{})
fifolist = list.New()
fifomu = &sync.RWMutex{}
)

func cache_set(url string) bool {
fifomu.Lock()
defer fifomu.Unlock()

_, ok := fifomap[url]
if ok {
return false
}

fifomap[url] = struct{}{}
fifolist.PushFront(url)

for fifolist.Len() > 10 {
item := fifolist.Back()
delete(fifomap, item.Value.(string))
fifolist.Remove(item)
}
return true
}
10 changes: 10 additions & 0 deletions go/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module vsr

go 1.13

require (
github.com/etcd-io/bbolt v1.3.3
github.com/gorilla/mux v1.7.4
github.com/grafov/m3u8 v0.11.1
go.etcd.io/bbolt v1.3.3 // indirect
)
8 changes: 8 additions & 0 deletions go/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
github.com/etcd-io/bbolt v1.3.3 h1:gSJmxrs37LgTqR/oyJBWok6k6SvXEUerFTbltIhXkBM=
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grafov/m3u8 v0.11.1 h1:igZ7EBIB2IAsPPazKwRKdbhxcoBKO3lO1UY57PZDeNA=
github.com/grafov/m3u8 v0.11.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
166 changes: 166 additions & 0 deletions go/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package main

import (
"bytes"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"time"

"github.com/gorilla/mux"

"github.com/grafov/m3u8"
)

var (
flagURL = flag.String("url", "", "url to fetch")
flagTail = flag.Int("tail", 24, "how much hours to keep")
flagHost = flag.String("host", "http://localhos:8080", "add host to m3u8")
flagBindTo = flag.String("bind-to", ":8080", "bind to ip:port")

flagDebug = flag.Bool("debug", false, "")
)

func main() {
flag.Parse()

if *flagURL == "" {
log.Printf("Set url to fetch: ./app --url [url-here]")
return
}

err := os.MkdirAll("./files", 0755)
if err != nil {
log.Fatalf("mkdir fail: %v", err)
}

database_init()

if !*flagDebug {
go fetcher()
}

go database_worker()

router := mux.NewRouter()

router.HandleFunc("/start/{start}/{limit}/stream.m3u8", func(w http.ResponseWriter, r *http.Request) {

varz := mux.Vars(r)

out := "#EXTM3U\n"
out += "#EXT-X-PLAYLIST-TYPE:VOD\n"
out += "#EXT-X-TARGETDURATION:20\n"
out += "#EXT-X-VERSION:4\n"
out += "#EXT-X-MEDIA-SEQUENCE:0\n"

items := database_get(varz["start"], varz["limit"])

for _, item := range items {
out += fmt.Sprintf("#EXTINF:%f\n", item.Len)
out += fmt.Sprintf("%s\n", item.Name)
}

out += "#EXT-X-ENDLIST\n"

w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
w.Header().Set("Content-Lenght", fmt.Sprintf("%d", len(out)))
w.Write([]byte(out))
})

router.HandleFunc("/start/{start}/{limit}/{ts:.+}", func(w http.ResponseWriter, r *http.Request) {
varz := mux.Vars(r)
w.Header().Set("Content-Type", "text/vnd.trolltech.linguist")
b, err := ioutil.ReadFile(fmt.Sprintf("./files/%s", varz["ts"]))
if err != nil {
log.Printf("error %v", err)
return
}
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(b)))
w.Write(b)

})

log.Printf("Starting server on %v", *flagBindTo)
log.Fatal(http.ListenAndServe(*flagBindTo, router))
}

func fetcher() {
mainurl, _ := url.Parse(*flagURL)
for {
start_at:
b := fetch(mainurl.String())
buf := bytes.NewBuffer(b)
pl, pt, err := m3u8.Decode(*buf, true)
if err != nil {
log.Printf("fetcher error: %v %v", mainurl.String(), err)
time.Sleep(1 * time.Second)
continue
}
if pt == m3u8.MASTER {
masterpl := pl.(*m3u8.MasterPlaylist)
for _, variant := range masterpl.Variants {
mainurl, _ = mainurl.Parse(variant.URI)
log.Printf("%v", mainurl.String())
goto start_at
}

} else if pt == m3u8.MEDIA {
mediapl := pl.(*m3u8.MediaPlaylist)
for _, segment := range mediapl.Segments {
if segment == nil {
continue
}
fetchurl, _ := mainurl.Parse(segment.URI)
fetchurl.RawQuery = mainurl.RawQuery
if cache_set(fetchurl.String()) {
log.Printf("%v", fetchurl.String())
currenttime := time.Now().UnixNano()
item := &DatabaseItem{
Name: fmt.Sprintf("%v.ts", currenttime),
Len: segment.Duration,
T: currenttime,
}
database_store(item)

b := fetch(fetchurl.String())
if b != nil {
err := ioutil.WriteFile("./files/"+item.Name, b, 0755)
if err != nil {
log.Printf("error on write file to fs %v", err)
continue
}
}
}
}
}
time.Sleep(3 * time.Second)
}
}

func fetch(url string) []byte {
hc := http.Client{Timeout: 10 * time.Second}

request, _ := http.NewRequest("GET", url, nil)
request.Header.Set("User-Agent", "iptv/1.0")

response, err := hc.Do(request)
if err != nil {
log.Printf("fetch error %v %v", url, err)
return nil
}
defer response.Body.Close()
if response.StatusCode/100 != 2 {
log.Printf("Invalid response code %v %v", url, response.StatusCode)
return nil
}
b, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil
}
return b
}

0 comments on commit f1cf730

Please sign in to comment.