from urllib.request import urlopen import json import subprocess, shlex import time import os gitlabToken = 'token' gitlabAddr = 'gitlab.com' # gitlab地址 targets = ['intel'] # 所属群组或子群组,也可填其id (ps: 为空克隆所有群组,慎用) withShared = 'false' # 是否包含共享的项目,默认false (ps:为true会拉取到归属其他群组的项目) max_projects = '100' #脚本只能读取一页,设置最大项目数量,超过的部分不会下载 #------------------------- counter = 0 procs = [] def get_next(group_id): global counter global procs print('get_next group_id:', group_id) url = gen_next_url(group_id) allProjects = urlopen(url) allProjectsDict = json.loads(allProjects.read().decode()) if len(allProjectsDict) == 0: return for thisProject in allProjectsDict: try: thisProjectURL = thisProject['http_url_to_repo'] thisProjectPath = thisProject['path_with_namespace'] if os.path.exists(thisProjectPath): command = shlex.split('git -C "%s" pull' % (thisProjectPath)) else: print("=========== 开始克隆 %s %s ===========" % (group_id, thisProject['name'])) print('执行:git clone %s %s' % (thisProjectURL, thisProjectPath)) command = shlex.split('git clone %s %s' % (thisProjectURL, thisProjectPath)) proc = subprocess.Popen(command) procs.append(proc) time.sleep(1) counter += 1 except Exception as e: print("Error on %s: %s" % (thisProjectURL, e.strerror)) print("=========== 等待子线程执行结束 ===========") for p in procs: p.wait() p.kill() print("=========== 子线程执行结束 ===========") return def have_next_projects(group_id): url = gen_next_url(group_id) allProjects = urlopen(url) allProjectsDict = json.loads(allProjects.read().decode()) if len(allProjectsDict) == 0: return False return True def get_sub_groups(parent_id): url = gen_subgroups_url(parent_id) allProjects = urlopen(url) allProjectsDict = json.loads(allProjects.read().decode()) sub_ids = [] if len(allProjectsDict) == 0: return sub_ids for thisProject in allProjectsDict: try: id = thisProject['id'] sub_ids.append(id) except Exception as e: print("Error on %s: %s" % (id, e.strerror)) return sub_ids def cal_next_sub_groupids(parent_id): parent = '' parent = parent_id is_start = 1 parent_list = [] sub_ids = get_sub_groups(parent_id) print('cal_next_sub_groupids sub_ids and parent_id:',sub_ids, parent_id) ok = have_next_projects(parent_id) print('have_next_projects result:', ok) if len(sub_ids)!=0 and ok == False: for i in range(len(sub_ids)): print('cal_next_sub_groupids sub_ids[i]:', sub_ids[i]) parent = sub_ids[i] a = cal_next_sub_groupids(sub_ids[i]) return a if len(sub_ids) !=0 and ok == True: for i in range(len(sub_ids)): print('cal_next_sub_groupids parent:', parent) parent = sub_ids[i] parent_list.append(sub_ids[i]) a = cal_next_sub_groupids(sub_ids[i]) parent_list.extend(a) if len(sub_ids) == 0 and ok == True: print('cal_next_sub_groupids is_start:',is_start) parent_list.append(parent) return parent_list if len(sub_ids) ==0 and ok == False: return parent_list return parent_list def download_code(parent_id): data =cal_next_sub_groupids(parent_id) print('download_code result: ',data) for group_id in data: get_next(group_id) return def gen_next_url(target_id): return "https://%s/api/v4/groups/%s/projects?per_page=%s&private_token=%s&with_shared=%s&order_by=updated_at" % (max_projects, gitlabAddr, target_id, gitlabToken, withShared) def gen_subgroups_url(target_id): return "https://%s/api/v4/groups/%s/subgroups?private_token=%s" % (gitlabAddr, target_id, gitlabToken) def gen_global_url(): return "http://%s/api/v4/projects?private_token=%s" % (gitlabAddr, gitlabToken) def download_global_code(): global counter global procs url = gen_global_url() allProjects = urlopen(url) allProjectsDict = json.loads(allProjects.read().decode()) if len(allProjectsDict) == 0: return for thisProject in allProjectsDict: try: thisProjectURL = thisProject['http_url_to_repo'] thisProjectPath = thisProject['path_with_namespace'] print(thisProjectURL + ' ' + thisProjectPath) if os.path.exists(thisProjectPath): command = shlex.split('git -C "%s" pull' % (thisProjectPath)) else: print("=========== 开始克隆 %s %s ===========" % (group_id, thisProject['name'])) print('执行:git clone %s %s' % (thisProjectURL, thisProjectPath)) command = shlex.split('git clone %s %s' % (thisProjectURL, thisProjectPath)) proc = subprocess.Popen(command) procs.append(proc) time.sleep(1) counter += 1 except Exception as e: print("Error on %s: %s" % (thisProjectURL, e.strerror)) print("=========== 等待子线程执行结束 ===========") for p in procs: p.wait() p.kill() print("=========== 子线程执行结束 ===========") return def download_targets_code(): for target in targets: url = "https://%s/api/v4/groups?private_token=%s&search=%s" % (gitlabAddr, gitlabToken, target) allProjects = urlopen(url) allProjectsDict = json.loads(allProjects.read().decode()) if len(allProjectsDict) == 0: return target_id = '' for thisProject in allProjectsDict: try: this_name = thisProject['name'] if target == this_name: target_id = thisProject['id'] break except Exception as e: print("Error on %s: %s" % (this_name, e.strerror)) download_code(target_id) return def main(): if len(targets) == 0: download_global_code() else: download_targets_code() print("=========== 执行结束,克隆项目数: %s ===========" % (counter)) return main()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。