class Gem::GemcutterUtilities::WebauthnListener

属性

host[R]

公共类方法

listener_thread(host, server) 点击以切换源代码
# File rubygems/gemcutter_utilities/webauthn_listener.rb, line 35
def self.listener_thread(host, server)
  Thread.new do
    thread = Thread.current
    thread.abort_on_exception = true
    thread.report_on_exception = false
    thread[:otp] = new(host).wait_for_otp_code(server)
  rescue Gem::WebauthnVerificationError => e
    thread[:error] = e
  ensure
    server.close
  end
end
new(host) 点击以切换源代码
# File rubygems/gemcutter_utilities/webauthn_listener.rb, line 31
def initialize(host)
  @host = host
end

公共实例方法

wait_for_otp_code(server) 点击以切换源代码
# File rubygems/gemcutter_utilities/webauthn_listener.rb, line 48
def wait_for_otp_code(server)
  loop do
    socket = server.accept
    request_line = socket.gets

    method, req_uri, _protocol = request_line.split(" ")
    req_uri = Gem::URI.parse(req_uri)

    responder = SocketResponder.new(socket)

    unless root_path?(req_uri)
      responder.send(NotFoundResponse.for(host))
      raise Gem::WebauthnVerificationError, "Page at #{req_uri.path} not found."
    end

    case method.upcase
    when "OPTIONS"
      responder.send(NoContentResponse.for(host))
      next # will be GET
    when "GET"
      if otp = parse_otp_from_uri(req_uri)
        responder.send(OkResponse.for(host))
        return otp
      end
      responder.send(BadRequestResponse.for(host))
      raise Gem::WebauthnVerificationError, "Did not receive OTP from #{host}."
    else
      responder.send(MethodNotAllowedResponse.for(host))
      raise Gem::WebauthnVerificationError, "Invalid HTTP method #{method.upcase} received."
    end
  end
end

私有实例方法

parse_otp_from_uri(uri) 点击以切换源代码
# File rubygems/gemcutter_utilities/webauthn_listener.rb, line 87
def parse_otp_from_uri(uri)
  require "cgi"

  return if uri.query.nil?
  CGI.parse(uri.query).dig("code", 0)
end
root_path?(uri) 点击以切换源代码
# File rubygems/gemcutter_utilities/webauthn_listener.rb, line 83
def root_path?(uri)
  uri.path == "/"
end