Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug fix & performance improvement #4

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
031cb77
go fmt & tiny fix
ngaut Jun 7, 2013
be60ae3
go fmt
ngaut Jun 7, 2013
2d85301
adjust sleeping time to million seconds when there is no job
ngaut Jun 8, 2013
6622f28
bugfix:when got a job which not belong to us
ngaut Jun 17, 2013
0433844
add ql:log to event listener
ngaut Jun 24, 2013
206584a
fix arguments order in pop function
ngaut Jun 25, 2013
4e1329b
make heartbeat tick smaller
ngaut Jun 25, 2013
a5798b2
"add warning when handler is slow"
ngaut Jun 25, 2013
1f06da3
show job's queue name when send heartbeat
ngaut Jun 26, 2013
2429e12
add support for version before go1.1
ngaut Jun 26, 2013
571652b
one job each time
ngaut Jun 27, 2013
313e22a
fix memory leak
ngaut Jun 28, 2013
4f68890
better performance
ngaut Jul 4, 2013
3276e1b
using absolute qless-core path & change generate generateJid function
ngaut Jul 18, 2013
a4c67a6
remove print info
ngaut Jul 18, 2013
db863a6
make heartbeat tick to 30
ngaut Aug 22, 2013
b001637
log when job complete or failed
ngaut Aug 23, 2013
a689cc0
log more information when handle job
ngaut Aug 23, 2013
b57b684
only log jid
ngaut Aug 23, 2013
bea0fd5
log jid instead of job details
ngaut Aug 23, 2013
9393edf
add job.CompleteWithNoData to save memory
ngaut Aug 26, 2013
de92d90
reduce memory allocation
ngaut Aug 27, 2013
501013a
retry complete the job when complete failed
ngaut Aug 28, 2013
c004795
using rename success to status
ngaut Aug 28, 2013
bb4fa07
prepare for new version of qless-core
ngaut Sep 4, 2013
58c7945
clean up & tiny fix
ngaut Sep 5, 2013
666902c
change the way to generate job id, from sha1 to md5, which is faster …
ngaut Sep 5, 2013
8916934
rename qless-core-master -> qless-core
ngaut Sep 5, 2013
a11aeaa
fix typo
ngaut Sep 9, 2013
d756162
clean up
ngaut Sep 9, 2013
bdc3da1
remove qless-core
ngaut Sep 9, 2013
a4f235f
Update README.md
ngaut Sep 9, 2013
111843f
add requried redis version
ngaut Sep 9, 2013
f91f4f9
pretty print
ngaut Sep 9, 2013
2f3e378
using bitbucket version of osext
ngaut Nov 20, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
Go (golang) version of https://github.com/seomoz/qless. A redis job queue.


depends:
https://github.com/seomoz/qless-core
redis 2.6.16


Goqless now works but not every feature is implemented or tested.
Any reported issues or pull requests are welcome.

250 changes: 121 additions & 129 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,193 +1,187 @@
package goqless

import (
"encoding/json"
"fmt"
"github.com/garyburd/redigo/redis"
"errors"
"strconv"
"encoding/json"
"errors"
"fmt"
"github.com/garyburd/redigo/redis"
"strconv"
)

type TaggedReply struct {
Total int
Jobs StringSlice
Total int
Jobs StringSlice
}

type Client struct {
conn redis.Conn
host string
port string
conn redis.Conn
host string
port string

events *Events
lua *Lua
events *Events
lua *Lua
}

func NewClient(host, port string) *Client {
return &Client{host: host, port: port}
return &Client{host: host, port: port}
}

func Dial(host, port string) (*Client, error) {
c := NewClient(host, port)
c := NewClient(host, port)

conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%s", host, port))
if err != nil {
return nil, err
}
conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%s", host, port))
if err != nil {
return nil, err
}

c.lua = NewLua(conn)
// _, filename, _, _ := runtime.Caller(0)
err = c.lua.LoadScripts("qless-core") // make get from lib path
if err != nil {
conn.Close()
return nil, err
}
c.lua = NewLua(conn)
dir, err := GetCurrentDir()
if err != nil {
println(err.Error())
conn.Close()
return nil, err
}
//println(dir + "/qless-core")
err = c.lua.LoadScripts(dir + "/qless-core") // make get from lib path
if err != nil {
println(err.Error())
conn.Close()
return nil, err
}

c.conn = conn
return c, nil
c.conn = conn
return c, nil
}

func (c *Client) Close() {
c.conn.Close()
c.conn.Close()
}

func (c *Client) Events() *Events {
if c.events != nil {
return c.events
}
c.events = NewEvents(c.host, c.port)
return c.events
if c.events != nil {
return c.events
}
c.events = NewEvents(c.host, c.port)
return c.events
}

func (c *Client) Do(name string, keysAndArgs ...interface{}) (interface{}, error) {
return c.lua.Do(name, keysAndArgs...)
return c.lua.Do(name, keysAndArgs...)
}

func (c *Client) Queue(name string) *Queue {
q := NewQueue(c)
q.Name = name
return q
q := NewQueue(c)
q.Name = name
return q
}

// Queues(0, now, [queue])
func (c *Client) Queues(name string) ([]*Queue, error) {
args := []interface{}{0, timestamp()}
if name != "" {
args = append(args, name)
}

byts, err := redis.Bytes(c.Do("queues", args...))
if err != nil {
return nil, err
}

qr := []*Queue{NewQueue(c)}
if name == "" {
err = json.Unmarshal(byts, &qr)
for _, q := range qr {
q.cli = c
}
} else {
err = json.Unmarshal(byts, &qr[0])
}

if err != nil {
return nil, err
}

return qr, err
}

// Track(0, 'track', jid, now, tag, ...)
args := []interface{}{0, "queues", timestamp()}
if name != "" {
args = append(args, name)
}

byts, err := redis.Bytes(c.Do("qless", args...))
if err != nil {
return nil, err
}

qr := []*Queue{NewQueue(c)}
if name == "" {
err = json.Unmarshal(byts, &qr)
for _, q := range qr {
q.cli = c
}
} else {
err = json.Unmarshal(byts, &qr[0])
}

if err != nil {
return nil, err
}

return qr, err
}

// Track the jid
func (c *Client) Track(jid string) (bool, error) {
return Bool(c.Do("track", 0, "track", jid, timestamp(), ""))
return Bool(c.Do("qless", 0, "track", timestamp(), "track", jid, ""))
}

// Track(0, 'untrack', jid, now)
// Untrack the jid
func (c *Client) Untrack(jid string) (bool, error) {
return Bool(c.Do("track", 0, "untrack", jid, timestamp()))
return Bool(c.Do("qless", 0, "track", timestamp(), 0, "untrack", jid))
}

// Track(0)
// Returns all the tracked jobs
func (c *Client) Tracked() (string, error) {
return redis.String(c.Do("track", 0))
return redis.String(c.Do("qless", 0, "track", timestamp()))
}

func (c *Client) Get(jid string) (interface{}, error) {
job, err := c.GetJob(jid)
if err == redis.ErrNil {
rjob, err := c.GetRecurringJob(jid)
return rjob, err
}
return job, err
job, err := c.GetJob(jid)
if err == redis.ErrNil {
rjob, err := c.GetRecurringJob(jid)
return rjob, err
}
return job, err
}

func (c *Client) GetJob(jid string) (*Job, error) {
byts, err := redis.Bytes(c.Do("get", 0, jid))
if err != nil {
return nil, err
}
byts, err := redis.Bytes(c.Do("qless", 0, "get", timestamp(), jid))
if err != nil {
return nil, err
}

job := NewJob(c)
err = json.Unmarshal(byts, job)
if err != nil {
return nil, err
}
return job, err
job := NewJob(c)
err = json.Unmarshal(byts, job)
if err != nil {
return nil, err
}
return job, err
}

func (c *Client) GetRecurringJob(jid string) (*RecurringJob, error) {
byts, err := redis.Bytes(c.Do("recur", 0, "get", jid))
if err != nil {
return nil, err
}
byts, err := redis.Bytes(c.Do("qless", 0, "recur", timestamp(), "get", jid))
if err != nil {
return nil, err
}

job := NewRecurringJob(c)
err = json.Unmarshal(byts, job)
if err != nil {
return nil, err
}
return job, err
job := NewRecurringJob(c)
err = json.Unmarshal(byts, job)
if err != nil {
return nil, err
}
return job, err
}

func (c *Client) Completed(start, count int) ([]string, error) {
reply, err := redis.Values(c.Do("jobs", 0, "complete"))
if err != nil {
return nil, err
}

// fmt.Println(reply)
reply, err := redis.Values(c.Do("qless", 0, "jobs", timestamp(), "complete"))
if err != nil {
return nil, err
}

ret := []string{}
for _, val := range reply {
s, _ := redis.String(val, err)
ret = append(ret, s)
}
return ret, err
ret := []string{}
for _, val := range reply {
s, _ := redis.String(val, err)
ret = append(ret, s)
}
return ret, err
}

func (c *Client) Tagged(tag string, start, count int) (*TaggedReply, error) {
byts, err := redis.Bytes(c.Do("tag", 0, "get", tag, start, count))
if err != nil {
return nil, err
}
byts, err := redis.Bytes(c.Do("qless", 0, "tag", timestamp(), "get", tag, start, count))
if err != nil {
return nil, err
}

t := &TaggedReply{}
err = json.Unmarshal(byts, t)
return t, err
t := &TaggedReply{}
err = json.Unmarshal(byts, t)
return t, err
}

// // returns all the failed jobs
// func (c *Client) Failed(group string, start, limit int) ([]*Job, error) {
// c.Do("failed", 0,
// }

// config(0, 'get', [option])
func (c *Client) GetConfig(option string) (string, error) {

interf, err := c.Do("config", 0, "get", option)
interf, err := c.Do("qless", 0, "config.get", timestamp(), option)
if err != nil {
return "", err
}
Expand All @@ -212,15 +206,13 @@ func (c *Client) GetConfig(option string) (string, error) {
return contentStr, err
}

// config(0, 'set', option, value)
func (c *Client) SetConfig(option string, value interface{}) {
intf, err := c.Do("config", 0, "set", option, value)
if err != nil {
intf, err := c.Do("qless", 0, "config.set", timestamp(), option, value)
if err != nil {
fmt.Println("setconfig, c.Do fail. interface:", intf, " err:", err)
}
}

// config(0, 'unset', [option])
func (c *Client) UnsetConfig(option string) {
c.Do("unset", option)
c.Do("qless", 0, "config.unset", timestamp(), option)
}
Loading