Getting Investigations data via HDFS/Presto
Overview
Sophos Linux Sensor's (SLS) recommended deployment for non-cloud environments is to use HDFS (Hadoop Distributed File System) for storage and Presto for the querying engine.
Configuration
You confirgure Investigations in the /etc/sophos/runtimedetections-rules.yaml
file. By default, the Process Events, Sensor, and Container Events are enabled. This means that if no table
key is provided, those tables are turned on automatically with a default row size - unique for each MetaEvent type. To configure additional tables, you must specify them directly.
This is an example with every MetaEvent type configured to write to HDFS:
Investigations:
reporting_interval: 5m
sinks:
- name: >-
[The address of the name node here]:9000/[directory on hdfs to store
data, absolute path]
backend: hdfs
automated: true
credentials: null
blob_storage_hdfs_user:'[hadoop username that has write access]'
blob_storaee_create_buckets_enabled: true
flight_recorder:
enabled: true
Tables:
- name: shell_commands
enabled: true
- name: tty_data
enabled: true
- name: file_events
enabled: false
- name: connections
enabled: true
- name: sensor_metadata
enabled: true
- name: alerts
enabled: true
- name: sensors
enabled: true
- name: process_events
enabled: true
- name: container_events
enabled: true
Storage solutions
This section provides guides to aid in the installation and setup of storage solutions for SLS's Investigations data.
HDFS
Credentials
Currently, only insecure HDFS is supported. Only the username of a user that has write access to the directory that would store the Investigations data is required. In addition to this, every sensor writing to HDFS will need to be able to access the namenodes on ports 8020/9000 and all of the datanodes on port 50010 and 50020.
SLS configuration
To write MetaEvents to HDFS, you can configure it with a address of the name node with port and a user.
Here's an example:
investigations:
reporting_interval: 5m
sinks:
- name: '[The address of the name node here]:9000/[directory on hdfs to store data, absolute path]'
backend: hdfs
automated: true
credentials:
blob_storage_hdfs_user: '[hadoop username that has write access]'
blob_storage_create_buckets_enabled: true
Create bucket
It's highly recommended to have blob_storage_create_buckets_enabled: true
set for HDFS. This is because of the hierarchical nature of HDFS versus the flat nature of blob storage. If a table subdirectory or partition folder doesn't exist, it will fail to write.
Automatic
The following settings ensure that folders are created in HDFS if they don't exist. In /etc/sophos/runtimedetections-rules.yaml enable the blob_storage_create_buckets_enabled
field. See the following example configuration:
blob_storage_create_buckets_enabled: true blob_storage_hdfs_user:
Query solutions
This section provides guides to aid in the installation/setup of query solutions with SLS's investigations.
Presto: Manual
HDFS configuration
Example queries
Queries are run using SQL syntax. This section provides a common example queries. For a complete reference of all the available fields that can be queried, see Using MetaEvent filters.
Who has run a command through Sudo?
SELECT from_unixtime(process_events.unix_nano_timestamp /
1000000000) as timestamp,
pid, path, username, login_username
FROM process_events where event_type = 0 and username !=
login_username;
Which programs and their users connected to a given IP?
SELECT DISTINCT from_unixtime(connections.unix_nano_timestamp
/ 1000000000) AS timestamp,
sensors.hostname,
process_events.path,
container_events.container_name,
container_events.image_name,
connections.dst_addr,
connections.dst_port
FROM connections, sensors, container_events, process_events
WHERE connections.process_uuid = process_events.process_uuid
AND container_events.process_uuid = ## Storage solutionsprocess_events.proce
ss_id
AND connections.dst_addr ='$DESTINATION_IP';
What containers or images ran on my cluster and where?
SELECT sensors.hostname
container_events.image_name, from_unix_time(container_events.unix_nano_timestamp / 1000000000) as timestamp
FROM sensors, container_events;
Get all alerts that are part of an incident
SELECT *
FROM alerts where incident_id ='$INCIDENT_ID';
Get all shell commands that are part of an incident
SELECT from_unixtime(shell_commands.unix_nano_timestamp /
1000000000) AS timestamp,
sensors.hostname,
array_join(shell_commands.program_arguments,' ') as args,
shell_commands.username
FROM shell_commands
JOIN sensors ON sensors.sensor_id = shell_commands.sensor_id
WHERE shell_commands.incident_id ='$INCIDENT_ID';