j7s_branch_trigger/j7s_branch_trigger/j7s_branch_trigger.py

112 lines
4.4 KiB
Python

#!/usr/bin/env python
#
# Copyright 2023 James Pace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import subprocess
import re
import json
import os
import requests
import time
import schedule
import logging
def main():
logging.basicConfig(level=logging.DEBUG)
logging.debug('Starting up....')
(repo, url, data_file, pattern, loop_time, repo_name, org_name, job_name) = get_config()
def job():
loop(repo, pattern, url, data_file, repo_name, org_name, job_name)
schedule.every(loop_time).minutes.do(job)
logging.debug('Doing initial run.')
schedule.run_all()
logging.debug('Starting loop.')
while True:
schedule.run_pending()
time.sleep(1)
def loop(repo, pattern, request_url, data_file, repo_name, org_name, job_name):
logging.debug('Looking at {} for {}.'.format(repo, pattern))
all_branches = git_ls(repo)
logging.debug('All branches: {}'.format(all_branches))
concerned_branches = find_concerned_branches(all_branches, pattern)
logging.debug('Concerned branches for {}: {}'.format(pattern, concerned_branches))
last_time = load_last_time(data_file)
compare_branches(last_time, concerned_branches, request_url, repo_name, org_name, job_name)
save_this_time(data_file, concerned_branches)
def git_ls(repo):
cmd = 'git ls-remote {repo}'.format(repo=repo)
output = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE)
if output.returncode != 0:
return None
branch_to_commit = {}
for line in output.stdout.splitlines():
(commit, branch) = line.decode().split('\t')
branch_to_commit[branch] = commit
return branch_to_commit
def find_concerned_branches(branch_to_commit, pattern):
github_pull_pattern = '^refs/pull/.*'
onedev_update_pattern = '^refs/update/.*'
relevant_branches = {}
for (branch, commit) in branch_to_commit.items():
is_relevant_branch = re.fullmatch(pattern, branch)
is_github_pull_request = re.fullmatch(github_pull_pattern, branch)
is_onedev_pull_update = re.fullmatch(onedev_update_pattern, branch)
if is_relevant_branch and not is_github_pull_request and not is_onedev_pull_update:
relevant_branches[branch] = commit
return relevant_branches
def load_last_time(file_name):
from_last_time = {}
try:
with open(file_name, 'r') as f:
from_last_time = json.load(f)
except IOError:
logging.debug("Can't find state last time.")
return from_last_time
def save_this_time(file_name, this_time):
with open(file_name, 'w') as f:
json.dump(this_time, f)
def compare_branches(last_time, this_time, url, repo_name, org_name, job_name):
for branch in this_time.keys():
my_hash = this_time[branch]
last_hash = last_time.get(branch, None)
if my_hash != last_hash:
logging.debug('Branch {} has new hash.'.format(branch))
r = requests.post(url, json={"sha": my_hash, "repo_name": repo_name, "org_name": org_name, "job_name": job_name})
if r.status_code == 200 or r.status_code == 202:
logging.debug("Request ok!")
else:
logging.warning("Something went wrong on request.")
def get_config():
repo = os.environ.get('J7S_REPO', None)
url = os.environ.get('J7S_URL', None)
if not url or not repo:
raise Exception('Missing J7S_REPO OR J7S_URL.')
# the part right before .git in gitea clone url
repo_name = os.environ.get('J7S_REPO_NAME', "")
# the gitea org the repo lives in.
org_name = os.environ.get('J7S_ORG_NAME', "")
job_name = os.environ.get('J7S_JOB_NAME', "")
data_file = os.environ.get('J7S_HISTORY_FILE', '/tmp/branch-history.json')
pattern = os.environ.get('J7S_PATTERN', '^HEAD$')
loop_time = float(os.environ.get('J7S_LOOPTIME', 1.0))
return (repo, url, data_file, pattern, loop_time, repo_name, org_name, job_name)