This is the second post in a series that will be based on my Suricon 2022 talk “Jupyter Playbooks for Suricata”. You can also find this blog as a section of a fully public notebook. The goal of this post is to introduce Jupyter notebooks to a wider audience. Readers are welcome to also view the notebook version, as code examples can easily be copied from that. Technically minded users are encouraged to not only read the notebook, but also interact with it.
This blog does a walkthrough of a simple data exploration case to extract a payload from a Log4j scanning event. It assumes familiarity with setting up Suricata and parsing EVE JSON into a pandas dataframe. This was covered in Part 1 of this series.
The prior section focused on the basics of Jupyter, pandas, and Suricata EVE JSON. Now it is time to cover a basic data exploration use-case. Some data science specific considerations will be explained, mostly to provide context to quirks when working with Pandas. We will not get into data mining or machine learning quite yet. The code examples used assume that the MTA web scanning PCAP has been downloaded, unpacked, and parsed with Suricata. You can download that here if you have not yet done so.
I'm no stranger to teaching classroom courses, but one topic I always struggle to cover is threat hunting. Why? Because it is not a linear process, each hunting case can be vastly different, and each requires specialized knowledge. In other words, it's detective work where one always needs to consider new evidence. And picking up the right clues often comes down to gut feeling that is based on experience. In fact, it’s quite similar to research, as threat hunting is basically proving a hypothesis. And just like researchers and data scientists, threat hunters will need to exit their comfort zones and gather knowledge well outside their own domain. After all, to understand how something can be exploited requires first understanding how it is supposed to work. It also requires an open mind to realize how something can be used, rather than how it’s meant to be used.
Thus, if hunting requires case-by-case lessons, then let's start with the first one. We will explore a web scanning dataset with the goal of revealing a high-profile Java exploit. For that purpose we need to:
The first step is data preparation. This might sound like a small step, but is actually by far the most important. Data scientists can easily spend 80-90 percent of their working time here, and I believe that estimation to be accurate also for security analysts. After all, an organization should always be improving detection capabilities whenever not actively handling an ongoing incident, and that can technically be classified as data preparation. Even organizations that are constantly handling incident response cases, such as those that provide managed cyber security services to other companies, likely have dedicated staff for data and detection engineering. Indeed, even we in Stamus Networks spend most of our working days on data preparation, as we build and support a product that will be operated by our customers.
But what is data preparation? In practice, any operation that transforms or enriches existing data can be considered data preparation. But in data science, it refers to a collection of methods and techniques for collecting data, dealing with missing values, combining, restructuring, and in general preparing it for whatever algorithm the analyst wishes to apply. Of course, should that algorithm not produce meaningful results, then analysts would likely need to choose another and apply different preparation techniques as a result. Hence why researchers spend so much time here.
By comparison, a threat hunter would follow a trail in data, only for it to reveal another trail. Sometimes the trail leads to information that does not exist, but can be derived from existing data. This is called pivoting. Experienced hunters know it to be a normal part of the process, and the best we can really do is prepare data to the best of our knowledge, accepting that eventually we always need to fall back to external analysis. The important thing is to try and ensure that next time we don't have to do the same work.
This post will try and present one of those hunting cases while explaining techniques we apply from both disciplines. Consider the following dataframe, does something seem wrong?
First, the flow_id is formatted in scientific notation. In fact, pandas has converted this value into a floating point type, whereas anyone familiar with EVE JSON knows it's actually supposed to be a large randomly generated integer. This poses a big problem for flow correlation, as events generated for the same flow will have identical flow_id to time them together. We need to be able to look up individual flow_id values, meaning we need to be able to copy them from the display. Quite difficult with a floating point number formatted in power of ten.
To fix that, we will simply apply a conversion over the entire dataframe column, then replace the original column with transformed values. Note that columns can be selected both with method or dictionary notation. In other words, df.flow_id and df["flow_id"] are both valid. It's mostly a matter of preference, as method notation is cleaner and is mostly supported by IDE autocompletion. However, dictionary notation is needed in dynamic code, for example when looping over a dynamically generated list of columns. Method notation can also collide with built-in method or dataframe object parameters, so keep that in mind as well.
All we need to do here is use the .astype method to convert the entire column of data into another type. If any single item fails, though, the entire conversion breaks. And indeed it will on this dataset, as the full dataframe also contains Suricata stats events that lack a flow_id key. A missing value can simply be omitted from JSON, but must be somehow represented in a vector. By default, pandas uses NaN, or not a number value. We have already established that NaN value is actually a special floating point and not a null or None. The problem is that other types have nothing equivalent, so astype conversion simply breaks. Here the workaround is trivial – we simply replace missing values with a zero using the fillna method. I call it a workaround and not a solution since replacing missing statistical measurements with zero values might not actually be a good idea. But here it's fine, as flow_id is not a measurement.
Similar conversions can be applied to other fields, such as source and destination ports. Likewise, they are not numerical measurements like bytes or packets tracked in flow statistics. Instead, it's categorical data, meaning they correspond to a finite number of categories. Port numbers have possible values between 1 and 65535, with low ports being assigned for services and high ports being dynamically used by clients connecting to them. For example, port 22 is used by SSH, port 53 is used by DNS, 80 is standard for unencrypted HTTP, 443 for encrypted HTTPS, etc. Calculating floating point statistics on those numbers would be pointless, but we can deduce useful information from the categories. A HTTP TLS connection to ports other than 443 can be suspicious. Attackers might also try to exfiltrate data over port 53, as it's required for functional DNS and therefore not filtered by most organizations. Note that Suricata does not depend on port numbers for protocol detection. That's handled by an application level parser. But, if Suricata parses any application level protocol on a well known port other than what it's actually assigned to, then it can safely be escalated as a declaration of compromise like any non-DNS traffic on port 53. Analysts might also be interested in grouping traffic based on port ranges. For example, flow where both sides of the connection use high ephemeral ports can be considered suspicious or a policy violation.
We can also apply type conversions. For example, a timestamp field is currently considered simply a text, but we can convert it into a timestamp object instead, which would enable better filtering and data aggregation.
The result is a somewhat cleaner representation of the data. Notice how flow ID values and port numbers are easier to read. There's also a subtle change in timestamp formatting, though that is not actually important. What matters is internal representation.
A lot more could be done to clean up and improve the data, but this will come in due time.
The full dataframe has 25905 rows. That's too much and we need to start somewhere. Suricata started as an IDS project and alerting is still very much its main use-case so let's start there. But make no mistake, alerts are simply a starting point and rarely tell the whole data. They are simply a convenient entry point into the NSM data through a technique called flow ID correlation.
We can easily subset a dataframe by rows or columns. Column selection is easy - simply pass a list of columns using python indexing notation. Row filtering requires more work. We need to create an index series that can be passed into pandas loc method. Simply put, a pandas series is an object that holds vectors of data and indexing values for each item in that vector. A dataframe is basically a collection of series objects, with each corresponding to a column.
Applying comparison operations on a dataframe column results in a new pandas series of truth values. True means the row matches the filter, False means it does not.
Passing this index series to loc method selects relevant rows. These index selections can also be combined with multiple loc calls, with each call adding an extra set of filters. Once filtered, we sort the values by timestamp to ensure that alerts are chronological. The latest events could be shown first by setting the parameter ascending=false.
Also, we are only interested in columns that contain actual data. An alert event might not have full context as Suricata emits this event as soon as possible. Other event types – such as http, smb, or flow – arrive later. Flow events might not be flushed until flow timeout, which by default is 10 minutes for TCP connections. In other words, flow records might arrive a full 10 minutes after the initial alert, or even later with a higher timeout configuration. Alert records might hold data from flow, but it's incomplete. The flow end timestamp would be missing, for example, and would manifest as NaN value in the dataframe. In fact, if the data frame consists only of alerts, then all flow.end values would be NaN. Why is that a problem? Our subset dataframe would show all columns available in the original, yet most are just empty values. And the display cannot fit all, so the view is truncated. Explicitly selecting columns with data would be tedious as we'd need to view each one to verify each, assuming the analyst is totally unfamiliar with the columns. Luckily we can use the dropna method to remove columns where all values are NaN values. Thus, pandas can already do a lot of initial exploration for us.
Filtered indices can also be stored as variables and joined in a single loc statement. The following code is functionally identical to the previous example.
The resulting dataframe still has 48 columns to explore. Readers might observe that some columns in the middle are even omitted. While we could override this behavior, the result would still be quite difficult to grasp visually. A better method would be to limit the column scope. In other words, select a small set of columns for initial exploration. Selecting the following columns provides a fairly concise representation of alerts.
We have already discussed timestamp and flow_id. The former is pretty intuitive, but the latter will be needed as a pivot point to more data. For alerts, I usually start simply with alert.signature when doing initial exploration. It is the description of the signature defined in rule msg option. Alert.category is useful for additional context and contains the general categorization, such as network trojan, denial of service, network scan, policy violation, etc.
Instead of src_ip and dest_ip, I have opted to display flow.src_ip and flow.dest_ip. Those are new fields added in Suricata 7 and are mainly relevant for analyzing alert event types. Why? Because signatures can trigger in any direction. EVE NSM logs follow the direction of the flow. The client endpoint is src_ip and the server is dest_ip. Alerts work differently. A rule could be written for either request or response traffic. For example, HTTP GET is part of HTTP request, whereas HTTP status code is part of response. The Suricata rule parser does not even allow mixing those keywords in the same rule. A rule that matches on the HTTP method would report the client as src_ip, as expected. But a rule written to match on HTTP status code would mark the source of the one-sided response flow as the src_ip, which would actually reverse the IP pair since the source is the server responding to the request.
This has caused a lot of confusion for the analysts, as one always needs to translate what actually happened. Indeed, this is a big contributor to the alert fatigue that our product in Stamus Networks is attempting to alleviate. Luckily, flow.src_ip and flow.dest_ip maintain the real source information from two-sided flow. Simply put, source and destination IP values are not flipped depending on the signature.
For this exercise, let's drill into Log4j alerts. Why? All of the alerts within the set indicate scanning activity and are easily triggered by exploitation attempts. Emphasis on attempt. Usually that does not mean that the target was compromised. Furthermore, a public service gets scanned all the time – every few minutes for a small service. Larger ones might see even more action than that! Anyone who has monitored public services is aware of this, and alerts like this mostly go ignored. These alerts are not false positives. They are simply boring, but not useless.
Normally, the scanners are disposable botnet nodes - compromised computers, proxy servers in the cloud, traffic originating from TOR nodes, etc. There isn’t much interesting data we can extract there without the help of ISPs and police. Log4j exploits are slightly different, however. A lot has been published on this particular CVE, including prior blog posts by Stamus Networks. So I will only present a quick recap.
Log4j is the standard logging library used in Java applications. That library is generally for writing entries into log files, but also supports evaluating commands. A malicious actor could trigger a log entry with special jndi: content that would cause the logging library to evaluate it as system command. This is known as RCE, or Remote Code Execution, allowing remote users to essentially execute commands on remote systems. The vulnerability was quickly dubbed Log4shell. Since interacting with a remote application entirely via requests triggering log entries is a bit of a hassle, the typical use-case would be to inject a small payload that would connect to remote infrastructure or download malicious payloads, working as stage 1 downloader or initialization of command and control communication.
It would be interesting to extract this payload and see what is being injected and perhaps even reveal malicious c2 infrastructure behind it! These are the alerts we are investigating.
First, having selected initial alerts, we want to see all corresponding alerts. During the Suricon presentation, I showed the Jupyter input() method, which allows users to manually enter a value and store it as a variable. One could do something like this:
FLOW_ID = input()
While Suricata flow_id values are consistent within events that belong to the same flow, the value is not based on anything. Unlike community_id, which is a hash value generated from the flow 5-tuple (protocol, IP pair and port pairs), the flow_id is just a random integer that's assigned to an entry in the flow table. In other words, parsing the same PCAP multiple times would result in different flow_id values every time, even if the actual events remain the same.
This is a problem for a notebook that aims to be repeatable, as execution would halt and the notebook would expect input from the user. But with some code we can still get around the problem and create a fully repeatable notebook. All we need to do is extract unique flow_id values and store them in a variable.
Once prepared, we can use the isin method on a pandas Series object to filter on multiple values. We can also sort the rows based on multiple columns. We want entries within the same flow to be grouped together, but also have consistent ordering of event types.
One might already notice strange values in HTTP columns, but the dataframe printout is still too large to visually explore.
The next step would be to understand data columns that we can explore. Remember, the dropna method used when creating DF_LOG4J ensures that each column actually has some data to explore. Those columns can be extracted from the dataframe. A nice trick is also to pass this column array into a pandas dataframe to leverage a nice Jupyter display.
In addition to columns we explored before, the following fields can reveal useful information:
On closer inspection, we notice that both connections attempt to execute the same exploitation script.
Likewise, we can verify that http.http_refer is actually the same script with additional bypass to sanitation. Connection parameters are the same, and so is the base64 encoded payload.
The most interesting part of this injection is the base64 encoded string. Base64 is an encoding method to convert binary data into a text, thus enabling transfer over channels that only support text. Actually, the value after Base64/ holds another script. It is encoded to avoid script syntax breaking the HTTP request. It also hides the true nature of the malicious command from a quick view.
Let's assume we need to extract these payloads for forensics or for automating a hunting case. First we need to extract the base64 payload. A quick solution is to define an extraction function that uses regular expression match groups. A base64 is a text value consisting of upper or lowercase characters, numbers, and might end with equal signs.
This code will not be fast. Both Python and regular expressions are notorious for being slow, but it's good enough, and any good SoC engineer knows that sometimes a simple trick can save the day. The function can be passed directly into pandas apply method, which would in turn execute it for every value on the required column. As we've already seen, a value could be missing or simply not contain interesting values. Our function would return NumPy NaN in those cases, as that's how pandas usually denotes missing values. Note that we need to use pd.isna or pd.isnull functions since NaN is a special floating point value that has to be matched correctly. Python programmers that are used to relying on “is None” checks might get a bad surprise otherwise.
Pandas' apply method accepts function as an argument. Here we call it in on the http.url column. The row value is implicitly passed as the argument to that function, for every value. Think of it as a more concise “for loop”.
Then we can build a dataframe consisting only of extracted base64 values.
Finally, we can apply the base64 decode function over the extracted payload, revealing the actual script inside.
Pandas dataframes can easily be exported to various formats. This is quite handy for incident reports.
Note that at the end of the exploration, we basically only had a single unique payload to decode. Sure, the example was trivial, but uniqueness is a surprisingly effective hunting method even on big datasets. Previously, the reader might have observed pandas’ unique method, but we can do better. Aggregations over data groupings can reveal great insights.
Suppose we want to explore not only the log4j exploitation URL, but all URLs associated with HTTP alerts. One could simply group data by unique alert signatures by using the pandas groupby method. These groupings are not usable by themselves, but need to be passed into an aggregation method.
The cool thing is that pandas supports doing many aggregations at once. Users can call upon a number of existing aggregations, or even define custom functions. For example: getting the largest and smallest values using max and min functions, presenting unique values, counting events per grouping, number of unique values for a particular field, etc.
We basically got the same information without having to manually dig inside the data. We could even apply the extraction functions on the aggregations instead of raw values, drastically reducing the processing overhead. Accessing aggregate columns might seem a bit strange at first, but it actually works exactly as it would over any dataframe. Two differences to keep in mind are:
The explode method would create a new row for each entry in the list, even if the entry is a missing value. This can be convenient, but will consume a lot of memory with bigger dataframes.
We could also write an apply function that assumes list input. The result is a smaller dataframe, but we cannot use native pandas filtering or transformation methods on the resulting list.
This allows us to achieve the same result in a much more concise manner. Aggregating the data and simply looking into unique values basically did the pivoting for us.
This concludes the second post in our series on Jupyter playbooks for Suricata. We presented a walkthrough of a simple data exploration case involving scanning activity. Pandas filtering and subsetting capabilities were used to filter and subset alert logs, then pivot into HTTP events via flow ID correlation. Those contained Log4j exploitation payloads that were decoded with a simple python function. Finally, we introduced pandas aggregations and showed how uniqueness is a powerful hunting tool.
This series will continue in Part 3.