Browse Source

Added Sync from Gogs; Added Crons using it; Crons can delete old repositories archives

tags/v0.3
Dashie der otter 3 years ago
parent
commit
29080af0b0
Signed by: dashie <rhaamo@leloop.org> GPG Key ID: C2D57B325840B755
9 changed files with 332 additions and 4 deletions
  1. +2
    -2
      bindata/bindata.go
  2. +2
    -1
      cmd/web.go
  3. +13
    -0
      conf/app.ini
  4. +70
    -0
      models/gitxt.go
  5. +19
    -1
      setting/setting.go
  6. +37
    -0
      stuff/cron/cron.go
  7. +70
    -0
      stuff/sync/exclusive_pool.go
  8. +49
    -0
      stuff/sync/status_pool.go
  9. +70
    -0
      stuff/sync/unique_queue.go

+ 2
- 2
bindata/bindata.go
File diff suppressed because it is too large
View File


+ 2
- 1
cmd/web.go View File

@@ -24,9 +24,9 @@ import (
"os"
"dev.sigpipe.me/dashie/git.txt/models"
"dev.sigpipe.me/dashie/git.txt/routers/user"
//"dev.sigpipe.me/dashie/git.txt/routers/gitxt"
"dev.sigpipe.me/dashie/git.txt/routers/gitxt"
"dev.sigpipe.me/dashie/git.txt/routers/repo"
"dev.sigpipe.me/dashie/git.txt/stuff/cron"
)

var Web = cli.Command{
@@ -121,6 +121,7 @@ func runWeb(ctx *cli.Context) error {

setting.InitConfig()
models.InitDb()
cron.NewContext()

m := newMacaron()



+ 13
- 0
conf/app.ini View File

@@ -121,3 +121,16 @@ LOGIN_STATUS_COOKIE_NAME = login_status
[i18n]
LANGS = en-US,fr-FR
NAMES = English,Français

[cron]
; Enable running cron tasks periodically.
ENABLED = true
; Run cron tasks when Gitxt starts.
RUN_AT_START = false

; Cleanup repository archives
[cron.repo_archive_cleanup]
RUN_AT_START = true
SCHEDULE = @every 24h
; Time duration to check if archive should be cleaned
OLDER_THAN = 24h

+ 70
- 0
models/gitxt.go View File

@@ -5,6 +5,13 @@ import (
"dev.sigpipe.me/dashie/git.txt/models/errors"
"github.com/go-xorm/xorm"
"fmt"
"os"
"dev.sigpipe.me/dashie/git.txt/stuff/repository"
"dev.sigpipe.me/dashie/git.txt/stuff/sync"
"dev.sigpipe.me/dashie/git.txt/setting"
log "gopkg.in/clog.v1"
"path/filepath"
"github.com/Unknwon/com"
)

type Gitxt struct {
@@ -31,6 +38,13 @@ type GitxtWithUser struct {
Gitxt `xorm:"extends"`
}

// Preventing duplicate running tasks
var taskStatusTable = sync.NewStatusTable()

const (
_CLEAN_OLD_ARCHIVES = "clean_old_archives"
)

// IsHashUsed checks if given hash exist,
func IsHashUsed(uid int64, hash string) (bool, error) {
if len(hash) == 0 {
@@ -130,3 +144,59 @@ func updateGitxt(e Engine, u *Gitxt) error {
func UpdateGitxt(u *Gitxt) error {
return updateGitxt(x, u)
}


// Archive deletion
func DeleteOldRepositoryArchives() {
if taskStatusTable.IsRunning(_CLEAN_OLD_ARCHIVES) {
return
}
taskStatusTable.Start(_CLEAN_OLD_ARCHIVES)
defer taskStatusTable.Stop(_CLEAN_OLD_ARCHIVES)

log.Trace("Doing: DeleteOldRepositoryArchives")

formats := []string{"zip", "targz"}
oldestTime := time.Now().Add(-setting.Cron.RepoArchiveCleanup.OlderThan)

if err := x.Where("gitxt.id > 0").Table(&Gitxt{}).Join("LEFT", "user", "gitxt.user_id = user.id").Iterate(new(GitxtWithUser),
func(idx int, bean interface{}) error {
repo := bean.(*GitxtWithUser)
basePath := filepath.Join(repository.RepoPath(repo.User.UserName, repo.Gitxt.Hash), "archives")
for _, format := range formats {
dirPath := filepath.Join(basePath, format)
if !com.IsDir(dirPath) {
continue
}

dir, err := os.Open(dirPath)
if err != nil {
log.Error(3, "Fail to open directory '%s': %v", dirPath, err)
continue
}

fis, err := dir.Readdir(0)
dir.Close()
if err != nil {
log.Error(3, "Fail to read directory '%s': %v", dirPath, err)
continue
}

for _, fi := range fis {
if fi.IsDir() || fi.ModTime().After(oldestTime) {
continue
}

archivePath := filepath.Join(dirPath, fi.Name())
if err = os.Remove(archivePath); err != nil {
desc := fmt.Sprintf("Fail to health delete archive '%s': %v", archivePath, err)
log.Warn(desc)
}
}
}

return nil
}); err != nil {
log.Error(2, "DeleteOldRepositoryArchives: %v", err)
}
}

+ 19
- 1
setting/setting.go View File

@@ -13,6 +13,7 @@ import (
"github.com/Unknwon/com"
"net/url"
"github.com/go-macaron/session"
"time"
)

type Scheme string
@@ -39,6 +40,16 @@ var (
CanRegister bool
AnonymousCreate bool

// Cron tasks
Cron struct {
RepoArchiveCleanup struct {
Enabled bool
RunAtStart bool
Schedule string
OlderThan time.Duration
} `ini:"cron.repo_archive_cleanup"`
}

// Server settings
Protocol Scheme
UnixSocketPermission uint32
@@ -98,6 +109,7 @@ var (
Langs []string
Names []string
dateLangs map[string]string

)

// DateLang transforms standard language locale name to corresponding value in datetime plugin.
@@ -166,6 +178,7 @@ func InitConfig() {
if err != nil {
log.Fatal(2, "Fail to parse '%s': %v", CustomConf, err)
}
Cfg.NameMapper = ini.AllCapsUnderscore

homeDir, err := com.HomeDir()
if err != nil {
@@ -246,8 +259,13 @@ func InitConfig() {
EnableLoginStatusCookie = sec.Key("ENABLE_LOGIN_STATUS_COOKIE").MustBool(false)
LoginStatusCookieName = sec.Key("LOGIN_STATUS_COOKIE_NAME").MustString("login_status")


initLogging()

err = Cfg.Section("cron").MapTo(&Cron)
if err != nil {
log.Fatal(2, "Fail to map Cron settings: %v", err)
}

initSession()
initCache()
}


+ 37
- 0
stuff/cron/cron.go View File

@@ -0,0 +1,37 @@
package cron

import (
log "gopkg.in/clog.v1"
"github.com/gogits/cron"
"dev.sigpipe.me/dashie/git.txt/setting"
"dev.sigpipe.me/dashie/git.txt/models"
"time"
)

var c = cron.New()

func NewContext() {
var (
entry *cron.Entry
err error
)

if setting.Cron.RepoArchiveCleanup.Enabled {
log.Trace("Enabling RepoArchiveCleanup")
entry, err = c.AddFunc("Repository archive cleanup", setting.Cron.RepoArchiveCleanup.Schedule, models.DeleteOldRepositoryArchives)
if err != nil {
log.Fatal(2, "Cron.(repository archive cleanup): %v", err)
}
if setting.Cron.RepoArchiveCleanup.RunAtStart {
entry.Prev = time.Now()
entry.ExecTimes++
go models.DeleteOldRepositoryArchives()
}
}
c.Start()
}

// ListTasks returns all running cron tasks.
func ListTasks() []*cron.Entry {
return c.Entries()
}

+ 70
- 0
stuff/sync/exclusive_pool.go View File

@@ -0,0 +1,70 @@
// Copyright 2016 The Gogs Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package sync

import (
"sync"
)

// ExclusivePool is a pool of non-identical instances
// that only one instance with same identity is in the pool at a time.
// In other words, only instances with different identities can be in
// the pool the same time. If another instance with same identity tries
// to get into the pool, it hangs until previous instance left the pool.
//
// This pool is particularly useful for performing tasks on same resource
// on the file system in different goroutines.
type ExclusivePool struct {
lock sync.Mutex

// pool maintains locks for each instance in the pool.
pool map[string]*sync.Mutex

// count maintains the number of times an instance with same identity checks in
// to the pool, and should be reduced to 0 (removed from map) by checking out
// with same number of times.
// The purpose of count is to delete lock when count down to 0 and recycle memory
// from map object.
count map[string]int
}

// NewExclusivePool initializes and returns a new ExclusivePool object.
func NewExclusivePool() *ExclusivePool {
return &ExclusivePool{
pool: make(map[string]*sync.Mutex),
count: make(map[string]int),
}
}

// CheckIn checks in an instance to the pool and hangs while instance
// with same indentity is using the lock.
func (p *ExclusivePool) CheckIn(identity string) {
p.lock.Lock()

lock, has := p.pool[identity]
if !has {
lock = &sync.Mutex{}
p.pool[identity] = lock
}
p.count[identity]++

p.lock.Unlock()
lock.Lock()
}

// CheckOut checks out an instance from the pool and releases the lock
// to let other instances with same identity to grab the lock.
func (p *ExclusivePool) CheckOut(identity string) {
p.lock.Lock()
defer p.lock.Unlock()

p.pool[identity].Unlock()
if p.count[identity] == 1 {
delete(p.pool, identity)
delete(p.count, identity)
} else {
p.count[identity]--
}
}

+ 49
- 0
stuff/sync/status_pool.go View File

@@ -0,0 +1,49 @@
// Copyright 2016 The Gogs Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package sync

import (
"sync"
)

// StatusTable is a table maintains true/false values.
//
// This table is particularly useful for un/marking and checking values
// in different goroutines.
type StatusTable struct {
sync.RWMutex
pool map[string]bool
}

// NewStatusTable initializes and returns a new StatusTable object.
func NewStatusTable() *StatusTable {
return &StatusTable{
pool: make(map[string]bool),
}
}

// Start sets value of given name to true in the pool.
func (p *StatusTable) Start(name string) {
p.Lock()
defer p.Unlock()

p.pool[name] = true
}

// Stop sets value of given name to false in the pool.
func (p *StatusTable) Stop(name string) {
p.Lock()
defer p.Unlock()

p.pool[name] = false
}

// IsRunning checks if value of given name is set to true in the pool.
func (p *StatusTable) IsRunning(name string) bool {
p.RLock()
defer p.RUnlock()

return p.pool[name]
}

+ 70
- 0
stuff/sync/unique_queue.go View File

@@ -0,0 +1,70 @@
// Copyright 2016 The Gogs Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package sync

import (
"github.com/Unknwon/com"
)

// UniqueQueue is a queue which guarantees only one instance of same
// identity is in the line. Instances with same identity will be
// discarded if there is already one in the line.
//
// This queue is particularly useful for preventing duplicated task
// of same purpose.
type UniqueQueue struct {
table *StatusTable
queue chan string
}

// NewUniqueQueue initializes and returns a new UniqueQueue object.
func NewUniqueQueue(queueLength int) *UniqueQueue {
if queueLength <= 0 {
queueLength = 100
}

return &UniqueQueue{
table: NewStatusTable(),
queue: make(chan string, queueLength),
}
}

// Queue returns channel of queue for retrieving instances.
func (q *UniqueQueue) Queue() <-chan string {
return q.queue
}

// Exist returns true if there is an instance with given indentity
// exists in the queue.
func (q *UniqueQueue) Exist(id interface{}) bool {
return q.table.IsRunning(com.ToStr(id))
}

// AddFunc adds new instance to the queue with a custom runnable function,
// the queue is blocked until the function exits.
func (q *UniqueQueue) AddFunc(id interface{}, fn func()) {
if q.Exist(id) {
return
}

idStr := com.ToStr(id)
q.table.Lock()
q.table.pool[idStr] = true
if fn != nil {
fn()
}
q.table.Unlock()
q.queue <- idStr
}

// Add adds new instance to the queue.
func (q *UniqueQueue) Add(id interface{}) {
q.AddFunc(id, nil)
}

// Remove removes instance from the queue.
func (q *UniqueQueue) Remove(id interface{}) {
q.table.Stop(com.ToStr(id))
}

Loading…
Cancel
Save