PolySwarm Customer API v3
A Python interface for interacting with version 3 of the PolySwarm Customer APIs, to view the Legacy Version 2 documentation, navigate here.
Supports Python 3.7 and greater.
Getting Started
Installation
From PyPI:
$ pip install polyswarm-api
If you get an error about a missing package named
wheel
, that means your version of pip is too old. You need pip version 19 or newer. To update pip, runpip install -U pip
.
From source:
$ python setup.py install
If you get an error about a missing package named
wheel
, that means your version of setuptools is too old. You need setuptools version 40.8.0 or newer. To update setuptools, runpip install -U setuptools
.
Creating an API Client
from polyswarm_api.api import PolyswarmAPI
api_key = "1234123412341234123412341234"
community_name = "default"
api = PolyswarmAPI(key=api_key, community=community_name)
Locate the api_key
for the User/Team from here
If the Subscription plan has "Private Communities" then Define the Private Community Name provided to you by PolySwarm in the
community_name
value above and the Team API key in theapi_key
field.
Retrieve account information
Feature | What is it for? | Package |
---|---|---|
Account Details | Retrieve account information including account number and what teams you are part of. | api.account_whois() |
Account features and quotas | Retrieve what features your account/team has enabled and the quota details. | api.features() |
Scanning an Artifact
Feature | What is it for? | Package |
---|---|---|
Scan File | Scan a File in the PolySwarm network to retrieve a verdict. | api.submit() |
Scan URL | Scan a URL in the PolySwarm network to retrieve a verdict. | api.submit(URL, artifact_type='url') |
Scan a File
FILE = '/home/user/malicious.bin'
positives = 0
total = 0
instance = api.submit(FILE)
result = api.wait_for(instance)
if result.failed:
print(f'Failed to get results')
sys.exit()
print('Engine Assertions:')
for assertion in result.assertions:
if assertion.verdict:
positives += 1
total += 1
print('\tEngine {} asserts {}'.\
format(assertion.author_name,
'Malicious' if assertion.verdict else 'Benign'))
print(f'Positives: {positives}')
print(f'Total: {total}')
print(f'PolyScore: {result.polyscore}\n')
print(f'sha256: {result.sha256}')
print(f'sha1: {result.sha1}')
print(f'md5: {result.md5}')
print(f'Extended type: {result.extended_type}')
print(f'First Seen: {result.first_seen}')
print(f'Last Seen: {result.last_seen}\n')
print(f'Permalink: {result.permalink}')
Here is another example of sending a sample inside a zip file that is protected with a password infected
:
result = api.submit('./malicious-enc.zip',
preprocessing={'type': 'zip', 'password': 'infected'})
print(result.status)
Scan a URL
When scanning a URL, you should always include the protocol (
http://
orhttps://
).
URL = 'https://polyswarm.io'
positives = 0
total = 0
instance = api.submit(URL, artifact_type='url')
result = api.wait_for(instance)
if result.failed:
print(f'Failed to get results')
sys.exit()
print('Engine Assertions:')
for assertion in result.assertions:
if assertion.verdict:
positives += 1
total += 1
print('\tEngine {} asserts {}'.\
format(assertion.author_name,
'Malicious' if assertion.verdict else 'Benign'))
print(f'Positives: {positives}')
print(f'Total: {total}\n')
print(f'Permalink: {result.permalink}')
The Scanning endpoint has options available to extend the scan time, for urls this should always be most-time
.
Option | Value | Description |
---|---|---|
scan_config |
default |
25 second scan window |
scan_config |
more-time |
50 second scan window |
scan_config |
most-time |
100 second scan window |
instance = api.submit(URL, artifact_type='url', scan_config='most-time')
result = api.wait_for(instance)
Rescanning Artifacts
Feature | What is it for? | Package |
---|---|---|
Rescan | Rescan an Artifact to provide up to date verdict and analysis | api.rescan() |
instance = api.rescan("275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f")
result = api.wait_for(instance)
if result.failed:
print(f'Failed to get results')
sys.exit()
positives = 0
total = 0
print('Engine Assertions:')
for assertion in result.assertions:
if assertion.verdict:
positives += 1
total += 1
print('\tEngine {} asserts {}'.\
format(assertion.author_name,
'Malicious' if assertion.verdict else 'Benign'))
print(f'Positives: {positives}')
print(f'Total: {total}')
print(f'PolyScore: {result.polyscore}\n')
print(f'sha256: {result.sha256}')
print(f'sha1: {result.sha1}')
print(f'md5: {result.md5}')
print(f'Extended type: {result.extended_type}')
print(f'First Seen: {result.first_seen}')
print(f'Last Seen: {result.last_seen}\n')
print(f'Permalink: {result.permalink}')
Downloading Artifacts
Feature | What is it for? | Package |
---|---|---|
Download Artifact | Download the file locally by searching with a hash value | api.download() |
OUTPUT_DIR = '/tmp/'
EICAR_HASH = '275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f'
artifact = api.download(OUTPUT_DIR, EICAR_HASH)
Reporting
Feature | What is it for? | Package |
---|---|---|
Create report | Create a report in html or pdf for an artifact, this endpoint is also used to create a zip file of sandbox artifacts. | api.report_create() |
Get report status | Retrieve the report creation status | api.report_get() |
Download report | Download the finished report locally | api.report_download() |
List templates | List the templates | api.report_template_list() |
Create a template | Create a new template | api.report_template_create() |
Delete a template | Delete a template | api.report_template_delete() |
Get template details | View specific template | api.report_template_get() |
Update template | Update a current template | api.report_template_update() |
Update template logo | Upload a logo for template | api.report_template_logo_update() |
Delete template logo | Delete a logo for a template | api.report_template_logo_delete() |
Download template logo | Download a logo from the template | api.report_template_logo_download() |
Create report
report = api.report_create(type='scan', format='pdf', instance_id=instance_id)
print(f'Report ID: {report.id} (State: {report.state})')
# `report_wait_for()` is a method to fetch the report progress over
# and over until is not in PENDING state anymore
report = api.report_wait_for(report.id, timeout=timeout_seconds)
if report.state == 'SUCCEEDED':
response = requests.get(report.url, stream=True)
response.raise_for_status()
with open(f'scan-{instance_id}.pdf', 'wb') as f:
response.raw.decode_content = True
shutil.copyfileobj(response.raw, f)
else:
print(f'Report failed (State: {report.state})')
Create a ZIP file of Sandbox Artifacts
The template_metadata
value can contain one or many separated by commas of: report
, raw_report
, screenshot
, recording
, dropped_file
, memory_dump
, pcap
or jarm
.
The below example highlights how to download a zip file that contains the following sandbox files: pdf report, report, raw_report, screenshot and jarm.
The sandbox_task_id
is the ID for the sandbox session that you wish to download the files from.
report = api.report_create(type='sandbox_zip', format='zip', sandbox_task_id=123456, template_metadata={'zip_report_ids': [123], 'sandbox_artifact_types': ['report', 'raw_report', 'screenshot', 'jarm'] })
List Templates
results = api.report_template_list()
for template in results:
print(f'ID: {template.id}')
print(f'Created: {template.created}')
print(f'Name: {template.template_name}')
print(f'Color: {template.primary_color}\n')
Create a template
result = api.report_template_create(template_name='testreport1')
Delete a template
result = api.report_template_delete(98453877554394669)
Get template details
result = api.report_template_get(95389624286242180)
print(f'ID: {result.id}')
print(f'Created: {result.created}')
print(f'Name: {result.template_name}')
print(f'Color: {result.primary_color}\n')
Hash Searching
Feature | What is it for? | Package |
---|---|---|
Search | Search the polyswarm dataset with a hash (sha256,md5 or sha1) value | api.search() |
# sha256, md5, and sha1 supported
EICAR_HASH = '275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f'
positives = 0
total = 0
try:
results = api.search(EICAR_HASH)
for result in results:
if result.failed:
print(f'Failed to get result.')
break
if not result.assertions:
print('Artifact not scanned yet - Run rescan for Engine Assertions.')
else:
print('Engine Assertions:')
for assertion in result.assertions:
if assertion.verdict:
positives += 1
total += 1
print('\tEngine {} asserts {}'. \
format(assertion.author_name,
'Malicious' if assertion.verdict else 'Benign'))
print(f'Positives: {positives}')
print(f'Total: {total}')
print(f'PolyScore: {result.polyscore}\n')
print(f'sha256: {result.sha256}')
print(f'sha1: {result.sha1}')
print(f'md5: {result.md5}')
print(f'Extended type: {result.extended_type}')
print(f'First Seen: {result.first_seen}')
print(f'Last Seen: {result.last_seen}\n')
print(f'Permalink: {result.permalink}')
except exceptions.NoResultsException:
print(f'No results for the provided hash.')
Metadata Searching
PolySwarm's Metadata Search is a powerful and flexible means to discover previously unknown malware. Metadata commands can be built and fed into the arguments for the below api endpoint. To understand how to build out a Metadata query see the How-To Guide.
Feature | What is it for? | Package | Parameters |
---|---|---|---|
Search | Search the polyswarm dataset for metadata | api.search_by_metadata() |
include, exclude |
The following sections will list specific examples with the scopes of the searches and real world use case examples.
query = 'artifact.sha256:"275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f"'
results = api.search_by_metadata(query)
# Our query is by cryptographic hash; we expect at most 1 result.
# Regardless, it's good practice to properly handle multiple results.
for result in results:
print(f"Artifact Attributes: {result.artifact}")
Exclude Results in the Metadata query example:
query = 'artifact.sha256:"275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f"'
results = api.search_by_metadata(query, exclude=["exiftool","lief","polyunite.malware_family"])
# Process results
for result in results:
print(f"Artifact Attributes: {result.json}") # Debugging: Print full response to verify fields
Include Results in the Metadata query example:
query = 'artifact.sha256:"275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f"'
results = api.search_by_metadata(query, include=["pefile","polyunite.malware_family"])
# Process results
for result in results:
print(f"Artifact Attributes: {result.json}") # Debugging: Print full response to verify fields
IOC Searching
IOC Searching can be split into three groups of functions, these are:
Feature | What is it for? | Package |
---|---|---|
Associated IOCs | Searching for Associated IOCs related to a Hash | api.iocs_by_hash() |
Associated Hashes | Searching for Associated Hashes to a IP, URL, imphash or MITRE TTP | api.search_by_ioc() |
Known Good Domains | Check for known good domains and IPs | api.check_known_hosts() |
Searching for Associated IOCs
results = api.iocs_by_hash('sha256', 'aac08c6f7474c979acf2a3aef1f2727820ece755001530cdebf346b5d1ae2ccb', hide_known_good=True)
for result in results:
iocs = result.json
print(f"ips: {iocs['ips']}")
print(f"urls: {iocs['urls']}")
print(f"ttps: {iocs['ttps']}")
print(f"imphash: {iocs['imphash']}")
Searching for Associated Hashes
# IOC Search by ip
for result in api.search_by_ioc(ip="108.159.227.121"):
print(f"sha256: {result.json}")
# IOC Search by domain
for result in api.search_by_ioc(domain="img-s-msn-com.akamaized.net"):
print(f"sha256: {result.json}")
# IOC Search by MITRE ttp
for result in api.search_by_ioc(ttp="T1060"):
print(f"sha256: {result.json}")
Searching for Known Good Domains and IPs
results = api.check_known_hosts(domains=["polyswarm.network"], ips=["0.0.0.0"]):
for result in results:
ioc = result.json
print(f"type:{ioc.type}, host:{ioc.host}, source:{ioc.source}, good:{ioc.good}")
A word of caution with Known Good checking!
Our list of known good domains and IPs is not all-inclusive! Our goal for this feature is to provide an easy way to find the top most commonly known good domains and IPs, so they can be excluded from analysis.
Sandboxing
Sandboxing in PolySwarm provides the ability to submit files directly to be sandboxed, submit Artifacts already in PolySwarm to be sandboxed, and review what has been submitted to be sandboxed. Sandbox Analysis will take around 2-5 minutes before the results can be accessed.
To view some commonly asked questions and answers about Sandboxing , see here
Feature | What is it for? | Package |
---|---|---|
Submit File | Submit a file to be sandboxed, define the sandbox name along with the sandbox VM. | api.sandbox_file() |
Submit URL | Submit a URL to be sandboxed, define the sandbox name along with the sandbox vm, and chosen browser. | api.sandbox_url() |
Submit | Submit an already-scanned artifact for processing, provide the instance id of the artifact, the sandbox name along with the sandbox vm. | api.sandbox() |
List | List the available Sandbox providers, to obtain the sandbox name and sandbox vm. | api.sandbox_providers() |
Lookup | Get a sandbox task by id. | api.sandbox_task_status() |
Lookup Latest | Lookup the latest sandbox task by sha256 and sandbox provider name, providing the metadata from the sandbox | api.sandbox_task_latest() |
List Tasks | List sandbox tasks that were created by you or someone on your team. | api.sandbox_my_tasks_list() |
Search | Search sandbox tasks by sha256 and sandbox, status, startdate, and/or enddate. | api.sandbox_task_list() |
Download | Download Reports and other sandbox artifacts. | api.download_id() |
Sandboxing a File
Want to know what files types are supported? See here
network_enabled This boolean controls the network access for a sandbox execution. If this value is not passed or None, the default for a public community is True and a private community is False.
result = api.sandbox_file('./malicious.exe', 'triage', 'windows11-21h2-x64')
print(result.status)
Here is another example of sending a sample inside a zip file that is protected with a password infected
:
result = api.sandbox_file('./malicious-enc.zip',
'triage',
'windows11-21h2-x64',
preprocessing={'type': 'zip', 'password': 'infected'},
network_enabled=True)
print(result.status)
Sandboxes have multiple returned statuses, these are listed below.
Status | What is it for? |
---|---|
Success |
Finished processing correctly. |
Started |
Sandbox session has started. |
Collecting Data |
Sandbox session has been successful and data is being collected. |
Failed |
Sandbox session has failed, this can be due to many reasons. |
Pending |
Sandbox session is queued up and ready to start. |
Delayed |
Sandbox session has been delayed and will start soon. |
Failed with Quota Reimbursement |
Finished processing but failed, quota will be reimbursed. |
Timed out with Quota Reimbursement |
Delayed in the queue for too long, got timed out and then reimbursement. |
Sandboxing a URL
result = api.sandbox_url('www.polyswarm.io', 'triage', 'windows11-21h2-x64', browser='edge')
print(result.status)
If the URL is stored in a QR Code image, here is how to send it:
result = api.sandbox_url(None,
'cape',
'win-10-build-19041',
artifact='/path/to/qrcode.png',
preprocessing={'type': 'qrcode'},
browser='firefox')
print(result.status)
Sandboxing an Existing Artifact
result = api.sandbox(42445563653708569, 'triage', 'windows11-21h2-x64', True)
print(result.status)
Sandboxing in a Private Community
When sandboxing in a private community, if the network_enabled
flag is not passed, it will by default be True for a public community and False for a private community.
result = api.sandbox_file('./tests/eicar.yara', 'triage', 'windows11-21h2-x64')
List Sandbox Providers
sandboxes = api.sandbox_providers()
print(sandboxes)
Lookup Sandbox Task
task = api.sandbox_task_status(53445563653708569)
print(task)
Download Sandbox Artifacts
task = api.download_id('./outdir', 53445563653708569)
print(task)
Lookup Latest Sandbox Task
latest = api.sandbox_task_latest('18e5b8fe65e8f73c3a4a637c258c02aeec8a6ab702b15b7ee73f5631a9879e40', 'triage')
print(latest)
List my Sandbox Tasks
tasks = api.sandbox_my_tasks_list(sandbox='triage')
print(tasks)
Search Sandbox Tasks
tasks = api.sandbox_task_list(sandbox='triage', start_date='2023-10-31', status="SUCCEEDED")
print(tasks)
Hunting with Yara
Hunting with Yara can be split into the below three sections:
Managing Yara Rulesets
Feature | What is it for? | Package |
---|---|---|
Create | Create a Ruleset to be used in Hunting. | api.ruleset_create() |
List | List the Rulesets that have been created. | api.ruleset_list() |
Update | Update the ruleset with new values. | api.ruleset_update() |
Delete | Delete a Ruleset permanently. | api.ruleset_delete() |
Create Ruleset
new_ruleset = api.ruleset_create(name='eicar',
rules=open('eicar.yara').read(),
description='eicar ruleset')
print(f'ID: {new_ruleset.id}')
List Rulesets
rulesets = api.ruleset_list()
for ruleset in rulesets:
print(f'ID: {ruleset.id}')
Update Ruleset
# updating the ruleset yara rules (can also update name and description)
api.ruleset_update(new_ruleset.id, rules=open('another.yara').read())
Delete Ruleset
api.ruleset_delete(new_ruleset.id)
Live Hunts
Feature | What is it for? | Package |
---|---|---|
Get Ruleset ID | Get the ruleset id required to start a Live Hunt. | api.ruleset_get() |
Start | Start a Live Hunt based on a ruleset. | api.live_start() |
View Live Results of a Live Hunt | View all the live results generated from the live hunts. | api.live_feed() |
View a Singular Result | Inspect a particular result and get a download link. | api.live_result() |
Delete | Delete a Live Hunt permanently. | api.live_feed_delete() |
Stop | Stop a Live Hunt. | api.live_stop() |
Get Ruleset ID
ruleset = api.ruleset_get(57989886451857569)
Start Live Hunt
ruleset = api.live_start(ruleset.id)
print(f'ID: {ruleset.livescan_id}')
View Live Results of a Live Hunt
# reverse chronologically ordered iterator
results = api.live_feed(since=999999)
for result in results:
print(f'ID: {result.id}')
View a Singular Result
# you can inspect more details about a single result
# based on its id, it also provides a download link
# to the file and the origial yara rule used
# these extra info does not come directly from the
# feed listing method for performance reasons
result = api.live_result(91163237970748480)
print(f'ID: {result.id}')
print(f'URL: {result.download_url}')
Delete a Result
api.live_feed_delete([91163237970748480])
Stop a Live Hunt
ruleset = api.ruleset_get(57989886451857569)
ruleset = api.live_stop(ruleset.id)
Historical Hunts
Feature | What is it for? | Package |
---|---|---|
Create | Create a Historical Hunt by providing a Yara ruleset. | api.historical_create |
Update | Update the Historical Hunt. | api.historical_update() |
List Hunts | List the Historical Hunts. | api.historical_list() |
View Details | View Historical Hunt Details. | api.historical_get() |
View Results | View the results of a Historical Hunt. | api.historical_results() |
View Single Result | View and Download a Single Result. | api.historical_result() |
Delete Result | Delete an undesirable result. | api.historical_results_delete() |
Delete Hunt | Delete an Historical Hunt. | api.historical_delete() |
Create a Historical Hunt
historical = api.historical_create(rule=open('eicar.yara').read())
print(f'ID: {historical.id}')
Update a Historical Hunt
# the only update you can perform on a historical hunt
# is to cancel the hunt before it finishes
api.historical_update(49988514210960880)
List Historical Hunts
# you can also list all historical hunts you have in your account
results = api.historical_list(since=9999999)
for result in results:
print(f'ID: {result.id}')
View Historical Hunt Details
# you can retrieve extra information about the hunt
# this also includes a consolidated results csv
historical = api.historical_get(48011760326110718)
print(f'ID: {historical.id}')
print(f'Results CSV: {historical.results_csv_uri}')
View Historical Hunt Results
# you can check the results of a historical hunt
results = api.historical_results(48011760326110718)
for result in results:
print(f'ID: {result.id}')
View a Singular Historical Hunt Result
# retrieve a single result with extra information
result = api.historical_result(89734617019442134)
print(f'ID: {result.id}')
print(f'URL: {result.download_url}')
Delete an Historical Hunt Result
# delete an undesirable result
api.historical_results_delete([89734617019442134])
Delete a Historical Hunt
# you can delete a historical hunt
# keep in mind that this is an async process and the
# hunt will be scheduled for deletio..
api.historical_delete(49988514210960880)
Get a Stream
Feature | What is it for? | Package |
---|---|---|
Stream | Fetch a Stream of data from PolySwarm. | api.stream() |
SINCE = 60 # Fetch stream from the last 60 minutes
streams = api.stream(since=SINCE)
for stream in streams:
print(f'ID: {stream.id}')
print(f'URI: {stream.uri}')
print(f'Created: {stream.created}')
print(f'Community: {stream.community}')
Stream is a paid feature that is added to an account on a case-by-case basis. If you'd like to add this feature to your account, contact us at [email protected].
Changelog
Version 3.11.0
Release Date: 2024-12-09 Breaking Changes: N/A
Item | Topic | Description |
---|---|---|
1.0 | New accounts field | Added new field features[].backing_feature to the response of account_features() . |
Version 3.10.0
Release Date: 2024-09-24 Breaking Changes: N/A
Item | Topic | Description |
---|---|---|
1.0 | New Artifact field | Added new field failed_reason to the responses of submit() , sandbox_file() and sandbox_url() calls when there is a known error. |
2.0 | Bug fix | Added check first whether a report can be downloaded to report_download() . |
Version 3.9.0
Release Date: 2024-08-07 Breaking Changes: Item 4.0
Item | Topic | Description |
---|---|---|
1.0 | Scan and Sandboxing of QR Code images with URL as payload | Implemented in the submit() and sandbox_url() APIs. |
2.0 | Get account's basic information | New API method account_whois() . |
3.0 | Get accounts' features and quota | New API method account_features() . |
4.0 | Change zip file submissions | Replace is_zip and zip_password with new preprocessing argument in the submit() and sandbox_file() APIs. |
Version 3.8.0
Release Date: 2024-06-27
Breaking Changes: N/A
Item | Topic | Description |
---|---|---|
1.0 | Support zip file submissions | New is_zip and zip_password argument in the submit() and sandbox_file() APIs. |
Version 3.7.0
Release Date: 2024-05-20
Breaking Changes: Item 3.0
Item | Topic | Description |
---|---|---|
1.0 | Reports Generation | Introduction of reports generation API: report_create , report_get and report_download |
2.0 | Reports Templates | Introduction of reports templates API: report_template_** methods. |
3.0 | Python versions supported | Minimal Python version supported is 3.7. |
Version 3.6.0
Release Date: 2024-04-30
Breaking Changes: N/A
Item | Topic | Description |
---|---|---|
1.0 | Communities | Support EU communities. |
2.0 | Permalinks | Fix permalink parsing. |
3.0 | IOC beta | New method add_known_bad_host . |
Version 3.5.2
Release Date: 2024-02-22
Breaking Changes: N/A
Item | Topic | Description |
---|---|---|
1.0 | URL Sandboxing | Introduction of URL Sandboxing (sandbox_url ) API. |
Version 3.4.3
Release Date: 2023-09-20
Breaking Changes: N/A
Item | Topic | Description |
---|---|---|
1.0 | Added Community Parameter to Live Results | Added parameter community to /v3/hunt/live/list to allow you to see results from a private community. |
2.0 | Added Community Parameter to Historical Results | Added parameter community to /v3/hunt/historical/results/list to allow you to see results from a private community. |
3.0 | New Permalink Structure | New Permalink Structure. |
Version 3.4.0
Release Date: 2023-07-12
Breaking Changes: N/A
Item | Topic | Description |
---|---|---|
1.0 | Sandbox Task Config | Added sandbox task config field on sandbox task model. |
2.0 | api.sandbox and api.sandbox_file |
Endpoint now accept provider and vm slugs. |
3.0 | api.sandbox_providers |
Now returns provider and vm config information. |
Version 3.3.2
Release Date: 2023-06-20
Breaking Changes: Item 3.0
Item | Topic | Description |
---|---|---|
1.0 | Dropping python 2.7 support. | - |
2.0 | Added New Polyswarm Lookup and Search Features. | Added api.sandbox_task_status , api.sandbox_task_latest , api.sandbox_my_tasks and api.sandbox_task_list . |
3.0 | Changed the Sandbox Submit Interface. | - |