# Copyright (c) 2009 Google Inc. All rights reserved. # Copyright (c) 2009 Apple Inc. All rights reserved. # Copyright (c) 2010 Research In Motion Limited. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # WebKit's Python module for interacting with Bugzilla import mimetypes import os.path import re import StringIO import urllib from datetime import datetime # used in timestamp() from .attachment import Attachment from .bug import Bug from webkitpy.common.system.deprecated_logging import log from webkitpy.common.config import committers from webkitpy.common.net.credentials import Credentials from webkitpy.common.system.user import User from webkitpy.thirdparty.autoinstalled.mechanize import Browser from webkitpy.thirdparty.BeautifulSoup import BeautifulSoup, SoupStrainer # FIXME: parse_bug_id should not be a free function. def parse_bug_id(message): if not message: return None match = re.search("http\://webkit\.org/b/(?P\d+)", message) if match: return int(match.group('bug_id')) match = re.search( Bugzilla.bug_server_regex + "show_bug\.cgi\?id=(?P\d+)", message) if match: return int(match.group('bug_id')) return None def timestamp(): return datetime.now().strftime("%Y%m%d%H%M%S") # A container for all of the logic for making and parsing buzilla queries. class BugzillaQueries(object): def __init__(self, bugzilla): self._bugzilla = bugzilla def _is_xml_bugs_form(self, form): # ClientForm.HTMLForm.find_control throws if the control is not found, # so we do a manual search instead: return "xml" in [control.id for control in form.controls] # This is kinda a hack. There is probably a better way to get this information from bugzilla. def _parse_result_count(self, results_page): result_count_text = BeautifulSoup(results_page).find(attrs={'class': 'bz_result_count'}).string result_count_parts = result_count_text.strip().split(" ") if result_count_parts[0] == "Zarro": return 0 if result_count_parts[0] == "One": return 1 return int(result_count_parts[0]) # Note: _load_query, _fetch_bug and _fetch_bugs_from_advanced_query # are the only methods which access self._bugzilla. def _load_query(self, query): self._bugzilla.authenticate() full_url = "%s%s" % (self._bugzilla.bug_server_url, query) return self._bugzilla.browser.open(full_url) def _fetch_bugs_from_advanced_query(self, query): results_page = self._load_query(query) if not self._parse_result_count(results_page): return [] # Bugzilla results pages have an "XML" submit button at the bottom # which can be used to get an XML page containing all of the elements. # This is slighty lame that this assumes that _load_query used # self._bugzilla.browser and that it's in an acceptable state. self._bugzilla.browser.select_form(predicate=self._is_xml_bugs_form) bugs_xml = self._bugzilla.browser.submit() return self._bugzilla._parse_bugs_from_xml(bugs_xml) def _fetch_bug(self, bug_id): return self._bugzilla.fetch_bug(bug_id) def _fetch_bug_ids_advanced_query(self, query): soup = BeautifulSoup(self._load_query(query)) # The contents of the inside the cells in the first column happen # to be the bug id. return [int(bug_link_cell.find("a").string) for bug_link_cell in soup('td', "first-child")] def _parse_attachment_ids_request_query(self, page): digits = re.compile("\d+") attachment_href = re.compile("attachment.cgi\?id=\d+&action=review") attachment_links = SoupStrainer("a", href=attachment_href) return [int(digits.search(tag["href"]).group(0)) for tag in BeautifulSoup(page, parseOnlyThese=attachment_links)] def _fetch_attachment_ids_request_query(self, query): return self._parse_attachment_ids_request_query(self._load_query(query)) def _parse_quips(self, page): soup = BeautifulSoup(page, convertEntities=BeautifulSoup.HTML_ENTITIES) quips = soup.find(text=re.compile(r"Existing quips:")).findNext("ul").findAll("li") return [unicode(quip_entry.string) for quip_entry in quips] def fetch_quips(self): return self._parse_quips(self._load_query("/quips.cgi?action=show")) # List of all r+'d bugs. def fetch_bug_ids_from_pending_commit_list(self): needs_commit_query_url = "buglist.cgi?query_format=advanced&bug_status=UNCONFIRMED&bug_status=NEW&bug_status=ASSIGNED&bug_status=REOPENED&field0-0-0=flagtypes.name&type0-0-0=equals&value0-0-0=review%2B" return self._fetch_bug_ids_advanced_query(needs_commit_query_url) def fetch_bugs_matching_quicksearch(self, search_string): # We may want to use a more explicit query than "quicksearch". # If quicksearch changes we should probably change to use # a normal buglist.cgi?query_format=advanced query. quicksearch_url = "buglist.cgi?quicksearch=%s" % urllib.quote(search_string) return self._fetch_bugs_from_advanced_query(quicksearch_url) # Currently this returns all bugs across all components. # In the future we may wish to extend this API to construct more restricted searches. def fetch_bugs_matching_search(self, search_string, author_email=None): query = "buglist.cgi?query_format=advanced" if search_string: query += "&short_desc_type=allwordssubstr&short_desc=%s" % urllib.quote(search_string) if author_email: query += "&emailreporter1=1&emailtype1=substring&email1=%s" % urllib.quote(search_string) return self._fetch_bugs_from_advanced_query(query) def fetch_patches_from_pending_commit_list(self): return sum([self._fetch_bug(bug_id).reviewed_patches() for bug_id in self.fetch_bug_ids_from_pending_commit_list()], []) def fetch_bug_ids_from_commit_queue(self): commit_queue_url = "buglist.cgi?query_format=advanced&bug_status=UNCONFIRMED&bug_status=NEW&bug_status=ASSIGNED&bug_status=REOPENED&field0-0-0=flagtypes.name&type0-0-0=equals&value0-0-0=commit-queue%2B&order=Last+Changed" return self._fetch_bug_ids_advanced_query(commit_queue_url) def fetch_patches_from_commit_queue(self): # This function will only return patches which have valid committers # set. It won't reject patches with invalid committers/reviewers. return sum([self._fetch_bug(bug_id).commit_queued_patches() for bug_id in self.fetch_bug_ids_from_commit_queue()], []) def fetch_bug_ids_from_review_queue(self): review_queue_url = "buglist.cgi?query_format=advanced&bug_status=UNCONFIRMED&bug_status=NEW&bug_status=ASSIGNED&bug_status=REOPENED&field0-0-0=flagtypes.name&type0-0-0=equals&value0-0-0=review?" return self._fetch_bug_ids_advanced_query(review_queue_url) # This method will make several requests to bugzilla. def fetch_patches_from_review_queue(self, limit=None): # [:None] returns the whole array. return sum([self._fetch_bug(bug_id).unreviewed_patches() for bug_id in self.fetch_bug_ids_from_review_queue()[:limit]], []) # NOTE: This is the only client of _fetch_attachment_ids_request_query # This method only makes one request to bugzilla. def fetch_attachment_ids_from_review_queue(self): review_queue_url = "request.cgi?action=queue&type=review&group=type" return self._fetch_attachment_ids_request_query(review_queue_url) class Bugzilla(object): def __init__(self, dryrun=False, committers=committers.CommitterList()): self.dryrun = dryrun self.authenticated = False self.queries = BugzillaQueries(self) self.committers = committers self.cached_quips = [] # FIXME: We should use some sort of Browser mock object when in dryrun # mode (to prevent any mistakes). self.browser = Browser() # Ignore bugs.webkit.org/robots.txt until we fix it to allow this # script. self.browser.set_handle_robots(False) # FIXME: Much of this should go into some sort of config module: bug_server_host = "bugs.webkit.org" bug_server_regex = "https?://%s/" % re.sub('\.', '\\.', bug_server_host) bug_server_url = "https://%s/" % bug_server_host def quips(self): # We only fetch and parse the list of quips once per instantiation # so that we do not burden bugs.webkit.org. if not self.cached_quips and not self.dryrun: self.cached_quips = self.queries.fetch_quips() return self.cached_quips def bug_url_for_bug_id(self, bug_id, xml=False): if not bug_id: return None content_type = "&ctype=xml" if xml else "" return "%sshow_bug.cgi?id=%s%s" % (self.bug_server_url, bug_id, content_type) def short_bug_url_for_bug_id(self, bug_id): if not bug_id: return None return "http://webkit.org/b/%s" % bug_id def add_attachment_url(self, bug_id): return "%sattachment.cgi?action=enter&bugid=%s" % (self.bug_server_url, bug_id) def attachment_url_for_id(self, attachment_id, action="view"): if not attachment_id: return None action_param = "" if action and action != "view": action_param = "&action=%s" % action return "%sattachment.cgi?id=%s%s" % (self.bug_server_url, attachment_id, action_param) def _parse_attachment_flag(self, element, flag_name, attachment, result_key): flag = element.find('flag', attrs={'name': flag_name}) if flag: attachment[flag_name] = flag['status'] if flag['status'] == '+': attachment[result_key] = flag['setter'] # Sadly show_bug.cgi?ctype=xml does not expose the flag modification date. def _string_contents(self, soup): # WebKit's bugzilla instance uses UTF-8. # BeautifulSoup always returns Unicode strings, however # the .string method returns a (unicode) NavigableString. # NavigableString can confuse other parts of the code, so we # convert from NavigableString to a real unicode() object using unicode(). return unicode(soup.string) # Example: 2010-01-20 14:31 PST # FIXME: Some bugzilla dates seem to have seconds in them? # Python does not support timezones out of the box. # Assume that bugzilla always uses PST (which is true for bugs.webkit.org) _bugzilla_date_format = "%Y-%m-%d %H:%M" @classmethod def _parse_date(cls, date_string): (date, time, time_zone) = date_string.split(" ") # Ignore the timezone because python doesn't understand timezones out of the box. date_string = "%s %s" % (date, time) return datetime.strptime(date_string, cls._bugzilla_date_format) def _date_contents(self, soup): return self._parse_date(self._string_contents(soup)) def _parse_attachment_element(self, element, bug_id): attachment = {} attachment['bug_id'] = bug_id attachment['is_obsolete'] = (element.has_key('isobsolete') and element['isobsolete'] == "1") attachment['is_patch'] = (element.has_key('ispatch') and element['ispatch'] == "1") attachment['id'] = int(element.find('attachid').string) # FIXME: No need to parse out the url here. attachment['url'] = self.attachment_url_for_id(attachment['id']) attachment["attach_date"] = self._date_contents(element.find("date")) attachment['name'] = self._string_contents(element.find('desc')) attachment['attacher_email'] = self._string_contents(element.find('attacher')) attachment['type'] = self._string_contents(element.find('type')) self._parse_attachment_flag( element, 'review', attachment, 'reviewer_email') self._parse_attachment_flag( element, 'commit-queue', attachment, 'committer_email') return attachment def _parse_bugs_from_xml(self, page): soup = BeautifulSoup(page) # Without the unicode() call, BeautifulSoup occasionally complains of being # passed None for no apparent reason. return [Bug(self._parse_bug_dictionary_from_xml(unicode(bug_xml)), self) for bug_xml in soup('bug')] def _parse_bug_dictionary_from_xml(self, page): soup = BeautifulSoup(page) bug = {} bug["id"] = int(soup.find("bug_id").string) bug["title"] = self._string_contents(soup.find("short_desc")) bug["bug_status"] = self._string_contents(soup.find("bug_status")) dup_id = soup.find("dup_id") if dup_id: bug["dup_id"] = self._string_contents(dup_id) bug["reporter_email"] = self._string_contents(soup.find("reporter")) bug["assigned_to_email"] = self._string_contents(soup.find("assigned_to")) bug["cc_emails"] = [self._string_contents(element) for element in soup.findAll('cc')] bug["attachments"] = [self._parse_attachment_element(element, bug["id"]) for element in soup.findAll('attachment')] return bug # Makes testing fetch_*_from_bug() possible until we have a better # BugzillaNetwork abstration. def _fetch_bug_page(self, bug_id): bug_url = self.bug_url_for_bug_id(bug_id, xml=True) log("Fetching: %s" % bug_url) return self.browser.open(bug_url) def fetch_bug_dictionary(self, bug_id): try: return self._parse_bug_dictionary_from_xml(self._fetch_bug_page(bug_id)) except KeyboardInterrupt: raise except: self.authenticate() return self._parse_bug_dictionary_from_xml(self._fetch_bug_page(bug_id)) # FIXME: A BugzillaCache object should provide all these fetch_ methods. def fetch_bug(self, bug_id): return Bug(self.fetch_bug_dictionary(bug_id), self) def fetch_attachment_contents(self, attachment_id): attachment_url = self.attachment_url_for_id(attachment_id) # We need to authenticate to download patches from security bugs. self.authenticate() return self.browser.open(attachment_url).read() def _parse_bug_id_from_attachment_page(self, page): # The "Up" relation happens to point to the bug. up_link = BeautifulSoup(page).find('link', rel='Up') if not up_link: # This attachment does not exist (or you don't have permissions to # view it). return None match = re.search("show_bug.cgi\?id=(?P\d+)", up_link['href']) return int(match.group('bug_id')) def bug_id_for_attachment_id(self, attachment_id): self.authenticate() attachment_url = self.attachment_url_for_id(attachment_id, 'edit') log("Fetching: %s" % attachment_url) page = self.browser.open(attachment_url) return self._parse_bug_id_from_attachment_page(page) # FIXME: This should just return Attachment(id), which should be able to # lazily fetch needed data. def fetch_attachment(self, attachment_id): # We could grab all the attachment details off of the attachment edit # page but we already have working code to do so off of the bugs page, # so re-use that. bug_id = self.bug_id_for_attachment_id(attachment_id) if not bug_id: return None attachments = self.fetch_bug(bug_id).attachments(include_obsolete=True) for attachment in attachments: if attachment.id() == int(attachment_id): return attachment return None # This should never be hit. def authenticate(self): if self.authenticated: return if self.dryrun: log("Skipping log in for dry run...") self.authenticated = True return credentials = Credentials(self.bug_server_host, git_prefix="bugzilla") attempts = 0 while not self.authenticated: attempts += 1 username, password = credentials.read_credentials() log("Logging in as %s..." % username) self.browser.open(self.bug_server_url + "index.cgi?GoAheadAndLogIn=1") self.browser.select_form(name="login") self.browser['Bugzilla_login'] = username self.browser['Bugzilla_password'] = password response = self.browser.submit() match = re.search("(.+?)", response.read()) # If the resulting page has a title, and it contains the word # "invalid" assume it's the login failure page. if match and re.search("Invalid", match.group(1), re.IGNORECASE): errorMessage = "Bugzilla login failed: %s" % match.group(1) # raise an exception only if this was the last attempt if attempts < 5: log(errorMessage) else: raise Exception(errorMessage) else: self.authenticated = True self.username = username def _commit_queue_flag(self, mark_for_landing, mark_for_commit_queue): if mark_for_landing: return '+' elif mark_for_commit_queue: return '?' return 'X' # FIXME: mark_for_commit_queue and mark_for_landing should be joined into a single commit_flag argument. def _fill_attachment_form(self, description, file_object, mark_for_review=False, mark_for_commit_queue=False, mark_for_landing=False, is_patch=False, filename=None, mimetype=None): self.browser['description'] = description if is_patch: self.browser['ispatch'] = ("1",) # FIXME: Should this use self._find_select_element_for_flag? self.browser['flag_type-1'] = ('?',) if mark_for_review else ('X',) self.browser['flag_type-3'] = (self._commit_queue_flag(mark_for_landing, mark_for_commit_queue),) filename = filename or "%s.patch" % timestamp() if not mimetype: mimetypes.add_type('text/plain', '.patch') # Make sure mimetypes knows about .patch mimetype, _ = mimetypes.guess_type(filename) if not mimetype: mimetype = "text/plain" # Bugzilla might auto-guess for us and we might not need this? self.browser.add_file(file_object, mimetype, filename, 'data') def _file_object_for_upload(self, file_or_string): if hasattr(file_or_string, 'read'): return file_or_string # Only if file_or_string is not already encoded do we want to encode it. if isinstance(file_or_string, unicode): file_or_string = file_or_string.encode('utf-8') return StringIO.StringIO(file_or_string) # timestamp argument is just for unittests. def _filename_for_upload(self, file_object, bug_id, extension="txt", timestamp=timestamp): if hasattr(file_object, "name"): return file_object.name return "bug-%s-%s.%s" % (bug_id, timestamp(), extension) def add_attachment_to_bug(self, bug_id, file_or_string, description, filename=None, comment_text=None): self.authenticate() log('Adding attachment "%s" to %s' % (description, self.bug_url_for_bug_id(bug_id))) if self.dryrun: log(comment_text) return self.browser.open(self.add_attachment_url(bug_id)) self.browser.select_form(name="entryform") file_object = self._file_object_for_upload(file_or_string) filename = filename or self._filename_for_upload(file_object, bug_id) self._fill_attachment_form(description, file_object, filename=filename) if comment_text: log(comment_text) self.browser['comment'] = comment_text self.browser.submit() # FIXME: The arguments to this function should be simplified and then # this should be merged into add_attachment_to_bug def add_patch_to_bug(self, bug_id, file_or_string, description, comment_text=None, mark_for_review=False, mark_for_commit_queue=False, mark_for_landing=False): self.authenticate() log('Adding patch "%s" to %s' % (description, self.bug_url_for_bug_id(bug_id))) if self.dryrun: log(comment_text) return self.browser.open(self.add_attachment_url(bug_id)) self.browser.select_form(name="entryform") file_object = self._file_object_for_upload(file_or_string) filename = self._filename_for_upload(file_object, bug_id, extension="patch") self._fill_attachment_form(description, file_object, mark_for_review=mark_for_review, mark_for_commit_queue=mark_for_commit_queue, mark_for_landing=mark_for_landing, is_patch=True, filename=filename) if comment_text: log(comment_text) self.browser['comment'] = comment_text self.browser.submit() # FIXME: There has to be a more concise way to write this method. def _check_create_bug_response(self, response_html): match = re.search("Bug (?P<bug_id>\d+) Submitted", response_html) if match: return match.group('bug_id') match = re.search( '
(?P.+)