赞
踩
本文整理汇总了Python中flask.g方法的典型用法代码示例。如果您正苦于以下问题:Python flask.g方法的具体用法?Python flask.g怎么用?Python flask.g使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在模块flask的用法示例。
在下文中一共展示了flask.g方法的28个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: is_admin
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def is_admin():
""" Return whether the user is admin for this application or not. """
if not authenticated():
return False
user = flask.g.fas_user
auth_method = pagure_config.get("PAGURE_AUTH", None)
if auth_method == "fas":
if not user.cla_done:
return False
admin_users = pagure_config.get("PAGURE_ADMIN_USERS", [])
if not isinstance(admin_users, list):
admin_users = [admin_users]
if user.username in admin_users:
return True
admins = pagure_config["ADMIN_GROUP"]
if not isinstance(admins, list):
admins = [admins]
admins = set(admins or [])
groups = set(flask.g.fas_user.groups)
return not groups.isdisjoint(admins)
开发者ID:Pagure,项目名称:pagure,代码行数:27,
示例2: is_repo_admin
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def is_repo_admin(repo_obj, username=None):
""" Return whether the user is an admin of the provided repo. """
if not authenticated():
return False
if username:
user = username
else:
user = flask.g.fas_user.username
if is_admin():
return True
usergrps = [usr.user for grp in repo_obj.admin_groups for usr in grp.users]
return (
user == repo_obj.user.user
or (user in [usr.user for usr in repo_obj.admins])
or (user in usergrps)
)
开发者ID:Pagure,项目名称:pagure,代码行数:22,
示例3: admin_session_timedout
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def admin_session_timedout():
""" Check if the current user has been authenticated for more than what
is allowed (defaults to 15 minutes).
If it is the case, the user is logged out and the method returns True,
otherwise it returns False.
"""
timedout = False
if not pagure.utils.authenticated():
return True
login_time = flask.g.fas_user.login_time
# This is because flask_fas_openid will store this as a posix timestamp
if not isinstance(login_time, datetime.datetime):
login_time = datetime.datetime.utcfromtimestamp(login_time)
if (datetime.datetime.utcnow() - login_time) > pagure_config.get(
"ADMIN_SESSION_LIFETIME", datetime.timedelta(minutes=15)
):
timedout = True
logout()
return timedout
开发者ID:Pagure,项目名称:pagure,代码行数:21,
示例4: logout
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def logout():
""" Log out the user currently logged in in the application
"""
auth = pagure_config.get("PAGURE_AUTH", None)
if auth in ["fas", "openid"]:
if hasattr(flask.g, "fas_user") and flask.g.fas_user is not None:
from pagure.ui.fas_login import FAS
FAS.logout()
elif auth == "oidc":
from pagure.ui.oidc_login import oidc_logout
oidc_logout()
elif auth == "local":
import pagure.ui.login as login
login.logout()
开发者ID:Pagure,项目名称:pagure,代码行数:19,
示例5: after_request
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def after_request(response):
""" After request callback, adjust the headers returned """
csp_headers = pagure_config["CSP_HEADERS"]
try:
style_csp = "nonce-" + flask.g.nonce
script_csp = (
"unsafe-inline"
if "unsafe_javascript" in flask.g and flask.g.unsafe_javascript
else "nonce-" + flask.g.nonce
)
csp_headers = csp_headers.format(
nonce_script=script_csp, nonce_style=style_csp
)
except (KeyError, IndexError):
pass
response.headers.set(str("Content-Security-Policy"), csp_headers)
return response
开发者ID:Pagure,项目名称:pagure,代码行数:19,
示例6: confirm_user
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def confirm_user(token):
""" Confirm a user account.
"""
user_obj = pagure.lib.query.search_user(flask.g.session, token=token)
if not user_obj:
flask.flash("No user associated with this token.", "error")
else:
user_obj.token = None
flask.g.session.add(user_obj)
try:
flask.g.session.commit()
flask.flash("Email confirmed, account activated")
return flask.redirect(flask.url_for("auth_login"))
except SQLAlchemyError as err: # pragma: no cover
flask.flash(
"Could not set the account as active in the db, "
"please report this error to an admin",
"error",
)
_log.exception(err)
return flask.redirect(flask.url_for("ui_ns.index"))
开发者ID:Pagure,项目名称:pagure,代码行数:25,
示例7: get_locale
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def get_locale():
# if a user is logged in, use the locale from the user settings
user = getattr(g, 'user', None)
# user = None
if user is not None and hasattr(user, "locale"):
if user.nickname != 'Guest': # if the account is the guest account bypass the config lang settings
return user.locale
preferred = list()
if request.accept_languages:
for x in request.accept_languages.values():
try:
preferred.append(str(LC.parse(x.replace('-', '_'))))
except (UnknownLocaleError, ValueError) as e:
log.debug('Could not parse locale "%s": %s', x, e)
return negotiate_locale(preferred or ['en'], _BABEL_TRANSLATIONS)
开发者ID:janeczku,项目名称:calibre-web,代码行数:19,
示例8: send_onboarding_sms_messages
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def send_onboarding_sms_messages(user):
# First send the intro message
organisation = getattr(g, 'active_organisation', None) or user.default_organisation
intro_message = i18n_for(
user,
"general_sms.welcome.{}".format(organisation.custom_welcome_message_key or 'generic'),
first_name=user.first_name,
balance=rounded_dollars(user.transfer_account.balance),
token=user.transfer_account.token.name
)
send_message(user.phone, intro_message)
send_terms_message_if_required(user)
开发者ID:teamsempo,项目名称:SempoBlockchain,代码行数:18,
示例9: _make_initial_disbursement
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def _make_initial_disbursement(self, initial_disbursement, auto_resolve=False):
from server.utils.credit_transfer import make_payment_transfer
active_org = getattr(g, 'active_organisation', Organisation.master_organisation())
initial_disbursement = initial_disbursement or active_org.default_disbursement
if not initial_disbursement:
return None
user_id = get_authorising_user_id()
if user_id is not None:
sender = User.query.execution_options(show_all=True).get(user_id)
else:
sender = self.primary_user
disbursement = make_payment_transfer(
initial_disbursement, token=self.token, send_user=sender, receive_user=self.primary_user,
transfer_subtype=TransferSubTypeEnum.DISBURSEMENT, transfer_mode=TransferModeEnum.WEB,
is_ghost_transfer=False, require_sender_approved=False,
require_recipient_approved=False, automatically_resolve_complete=auto_resolve)
return disbursement
开发者ID:teamsempo,项目名称:SempoBlockchain,代码行数:23,
示例10: set_request_decorators
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def set_request_decorators(self) -> None:
# pylint: disable=inconsistent-return-statements
@self._app.before_request
def _check_for_ott_or_cookie(): # type: ignore[no-untyped-def]
if not self._ott_validated and self._ott == flask.request.args.get("ott"):
self._ott_validated = True
flask.g.set_cookie_token = True
return flask.redirect(flask.request.base_url)
if self._cookie_token == flask.request.cookies.get(
f"cookie_token_{self._port}"
):
self._ott_validated = True
else:
flask.abort(401)
@self._app.after_request
def _set_cookie_token_in_response(
response: flask.wrappers.Response,
) -> flask.wrappers.Response:
if "set_cookie_token" in flask.g and flask.g.set_cookie_token:
response.set_cookie(
key=f"cookie_token_{self._port}", value=self._cookie_token
)
return response
开发者ID:equinor,项目名称:webviz-config,代码行数:27,
示例11: per_request_callbacks
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def per_request_callbacks(response):
if response.headers["Content-Type"] != "application/json":
return response
try:
data = json.loads(response.data.decode("utf8"))
validate(data, schema)
data["meta"] = data.get("meta", {})
data["meta"]["validation"] = "ok"
response.data = json.dumps(data, indent=4)
except Exception as exc:
print(exc)
response.data = b'{"result" : "validation failed"}'
for func in getattr(g, "call_after_request", ()):
response = func(response)
return response
开发者ID:thomaxxl,项目名称:safrs,代码行数:18,
示例12: open_changeset_error
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def open_changeset_error(place, changeset, r):
url = place.candidates_url(_external=True)
username = g.user.username
body = f'''
user: {username}
name: {place.display_name}
page: {url}
message user: https://www.openstreetmap.org/message/new/{username}
sent:
{changeset}
reply:
{r.text}
'''
send_mail('error creating changeset:' + place.name, body)
开发者ID:EdwardBetts,项目名称:osm-wikidata,代码行数:23,
示例13: update_tag
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def update_tag(tag):
res = None
if request.method in ('PUT', 'POST'):
new_tags = request.form.getlist('tags')
result_flag = getattr(flask.g, 'bukudb', get_bukudb()).replace_tag(tag, new_tags)
op_text = 'replace tag [{}] with [{}]'.format(tag, ', '.join(new_tags))
if request.method == 'PUT' and result_flag and request.path.startswith('/api/'):
res = (jsonify(response.response_template['success']),
status.HTTP_200_OK,
{'ContentType': 'application/json'})
elif request.method == 'PUT' and request.path.startswith('/api/'):
res = (jsonify(response.response_template['failure']),
status.HTTP_400_BAD_REQUEST,
{'ContentType': 'application/json'})
elif request.method == 'POST' and result_flag:
flash(Markup('Success {}'.format(op_text)), 'success')
res = redirect(url_for('get_tags-html'))
elif request.method == 'POST':
flash(Markup('Failed {}'.format(op_text)), 'danger')
res = redirect(url_for('get_tags-html'))
else:
abort(400, description="Unknown Condition")
return res
开发者ID:jarun,项目名称:buku,代码行数:25,
示例14: put
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def put(self, rec_id: int):
bukudb = getattr(flask.g, 'bukudb', get_bukudb())
result_flag = bukudb.update_rec(
rec_id,
request.form.get('url'),
request.form.get('title'),
request.form.get('tags'),
request.form.get('description'))
if result_flag:
res = (jsonify(response.response_template['success']),
status.HTTP_200_OK,
{'ContentType': 'application/json'})
else:
res = (jsonify(response.response_template['failure']),
status.HTTP_400_BAD_REQUEST,
{'ContentType': 'application/json'})
return res
开发者ID:jarun,项目名称:buku,代码行数:19,
示例15: delete
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def delete(self, rec_id: Union[int, None]):
if rec_id is None:
bukudb = getattr(flask.g, 'bukudb', get_bukudb())
with mock.patch('buku.read_in', return_value='y'):
result_flag = bukudb.cleardb()
if result_flag:
res = jsonify(response.response_template['success'])
else:
res = jsonify(response.response_template['failure'])
res.status_code = status.HTTP_400_BAD_REQUEST
else:
bukudb = getattr(flask.g, 'bukudb', get_bukudb())
result_flag = bukudb.delete_rec(rec_id)
if result_flag:
res = (jsonify(response.response_template['success']),
status.HTTP_200_OK,
{'ContentType': 'application/json'})
else:
res = (jsonify(response.response_template['failure']),
status.HTTP_400_BAD_REQUEST,
{'ContentType': 'application/json'})
return res
开发者ID:jarun,项目名称:buku,代码行数:24,
示例16: get
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def get(self, starting_id: int, ending_id: int):
bukudb = getattr(flask.g, 'bukudb', get_bukudb())
max_id = bukudb.get_max_id()
if starting_id > max_id or ending_id > max_id:
return jsonify(response.response_template['failure']), status.HTTP_400_BAD_REQUEST, \
{'ContentType': 'application/json'}
result = {'bookmarks': {}} # type: ignore
for i in range(starting_id, ending_id + 1, 1):
bookmark = bukudb.get_rec_by_id(i)
result['bookmarks'][i] = {
'url': bookmark[1],
'title': bookmark[2],
'tags': list([_f for _f in bookmark[3].split(',') if _f]),
'description': bookmark[4]
}
res = jsonify(result)
return res
开发者ID:jarun,项目名称:buku,代码行数:19,
示例17: after_request_with_instana
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def after_request_with_instana(response):
scope = None
try:
# If we're not tracing, just return
if not hasattr(flask.g, 'scope'):
return response
scope = flask.g.scope
if scope is not None:
span = scope.span
if 500 <= response.status_code <= 511:
span.mark_as_errored()
span.set_tag(ext.HTTP_STATUS_CODE, int(response.status_code))
tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, response.headers)
response.headers.add('Server-Timing', "intid;desc=%s" % scope.span.context.trace_id)
except:
logger.debug("Flask after_request", exc_info=True)
finally:
if scope is not None:
scope.close()
flask.g.scope = None
return response
开发者ID:instana,项目名称:python-sensor,代码行数:26,
示例18: render_with_instana
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def render_with_instana(wrapped, instance, argv, kwargs):
ctx = argv[1]
# If we're not tracing, just return
if not hasattr(ctx['g'], 'scope'):
return wrapped(*argv, **kwargs)
with tracer.start_active_span("render", child_of=ctx['g'].scope.span) as rscope:
try:
template = argv[0]
rscope.span.set_tag("type", "template")
if template.name is None:
rscope.span.set_tag("name", '(from string)')
else:
rscope.span.set_tag("name", template.name)
return wrapped(*argv, **kwargs)
except Exception as e:
rscope.span.log_exception(e)
raise
开发者ID:instana,项目名称:python-sensor,代码行数:23,
示例19: request_finished_with_instana
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def request_finished_with_instana(sender, response, **extra):
scope = None
try:
if not hasattr(flask.g, 'scope'):
return
scope = flask.g.scope
if scope is not None:
span = scope.span
if 500 <= response.status_code <= 511:
span.mark_as_errored()
span.set_tag(ext.HTTP_STATUS_CODE, int(response.status_code))
tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, response.headers)
response.headers.add('Server-Timing', "intid;desc=%s" % scope.span.context.trace_id)
except:
logger.debug("Flask after_request", exc_info=True)
finally:
if scope is not None:
scope.close()
开发者ID:instana,项目名称:python-sensor,代码行数:23,
示例20: download_certificate
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def download_certificate(certificate):
if not flask.g.user.application:
flask.g.user.application = Application()
current_session.merge(flask.g.user)
cert = (
current_session.query(Certificate)
.filter(Certificate.name == certificate)
.filter(Certificate.application_id == flask.g.user.application.id)
.first()
)
if cert:
resp = flask.make_response(cert.data)
resp.headers["Content-Type"] = "application/octet-stream"
resp.headers["Content-Disposition"] = "attachment; filename={}.{}".format(
cert.name, cert.extension
)
return resp
else:
raise NotFound("No certificate with name {} found".format(certificate))
开发者ID:uc-cdis,项目名称:fence,代码行数:21,
示例21: load_globals
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def load_globals():
"""Prepopulate flask.g.* with properties."""
try:
del flask.g.user
except AttributeError:
pass
try:
del flask.g.team
except AttributeError:
pass
if load_apikey():
return
if (app.config.get('SESSION_EXPIRATION_SECONDS') and
flask.session.get('expires') and
flask.session.get('expires') < time.time()):
flask.session.clear()
flask.g.uid = flask.session.get('user')
flask.g.tid = flask.session.get('team')
flask.g.admin = flask.session.get('admin') or False
开发者ID:google,项目名称:ctfscoreboard,代码行数:21,
示例22: load_apikey
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def load_apikey():
"""Load flask.g.user, flask.g.uid from an API key."""
try:
key = flask.request.headers.get('X-SCOREBOARD-API-KEY')
if not key or len(key) != 32:
return
user = models.User.get_by_api_key(key)
if not user:
return
flask.g.user = user
flask.g.uid = user.uid
flask.g.admin = user.admin
flask.g.tid = None
return True
except Exception:
# Don't want any API key problems to block requests
pass
# Add headers to responses
开发者ID:google,项目名称:ctfscoreboard,代码行数:22,
示例23: catch
点赞 6
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def catch(self, rule=None, **options):
""" Catch request params
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# ignore with empty rule
if not rule and not options:
return func(*args, **kwargs)
# parse input params
try:
fmt_rst = self.parse(rule, **options)
except ParamsValueError as e:
return self.fmt_resp(e)
# assignment params to func args
from flask import g
setattr(g, self.store_key, fmt_rst)
if self.store_key in getfullargspec(func).args:
kwargs[self.store_key] = fmt_rst
return func(*args, **kwargs)
return wrapper
return decorator
开发者ID:Eastwu5788,项目名称:pre-request,代码行数:27,
示例24: request_payload
点赞 5
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def request_payload(self) -> Optional[BaseModel]:
return self.ContextLocalData.request_payload.get(g)
开发者ID:JoMingyu,项目名称:Flask-Large-Application-Example,代码行数:4,
示例25: authenticated
点赞 5
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def authenticated():
""" Utility function checking if the current user is logged in or not.
"""
fas_user = None
try:
fas_user = flask.g.fas_user
except (RuntimeError, AttributeError):
pass
return fas_user is not None
开发者ID:Pagure,项目名称:pagure,代码行数:12,
示例26: api_authenticated
点赞 5
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def api_authenticated():
""" Utility function checking if the current user is logged in or not
in the API.
"""
return (
hasattr(flask.g, "fas_user")
and flask.g.fas_user is not None
and hasattr(flask.g, "token")
and flask.g.token is not None
)
开发者ID:Pagure,项目名称:pagure,代码行数:12,
示例27: login_required
点赞 5
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def login_required(function):
""" Flask decorator to retrict access to logged in user.
If the auth system is ``fas`` it will also require that the user sign
the FPCA.
"""
@wraps(function)
def decorated_function(*args, **kwargs):
""" Decorated function, actually does the work. """
auth_method = pagure_config.get("PAGURE_AUTH", None)
if flask.session.get("_justloggedout", False):
return flask.redirect(flask.url_for("ui_ns.index"))
elif not authenticated():
return flask.redirect(
flask.url_for("auth_login", next=flask.request.url)
)
elif auth_method == "fas" and not flask.g.fas_user.cla_done:
flask.session["_requires_fpca"] = True
flask.flash(
flask.Markup(
'You must sign the FPCA (Fedora Project '
"Contributor Agreement) to use pagure"
),
"errors",
)
return flask.redirect(flask.url_for("ui_ns.index"))
return function(*args, **kwargs)
return decorated_function
开发者ID:Pagure,项目名称:pagure,代码行数:32,
示例28: generate_user_key_files
点赞 5
# 需要导入模块: import flask [as 别名]
# 或者: from flask import g [as 别名]
def generate_user_key_files():
""" Regenerate the key files used by gitolite.
"""
gitolite_home = pagure_config.get("GITOLITE_HOME", None)
if gitolite_home:
users = pagure.lib.query.search_user(flask.g.session)
for user in users:
pagure.lib.query.update_user_ssh(
flask.g.session,
user,
None,
pagure_config.get("GITOLITE_KEYDIR", None),
update_only=True,
)
pagure.lib.git.generate_gitolite_acls(project=None)
开发者ID:Pagure,项目名称:pagure,代码行数:17,
注:本文中的flask.g方法示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。