Tutorial: SPL to KQL Query Conversion¶
Status: Authored 2026-04-30 Audience: SOC Analysts, Detection Engineers, Threat Hunters Purpose: 20+ common Splunk SPL queries converted to KQL with detailed explanations for SOC analysts transitioning to Microsoft Sentinel
How to use this tutorial¶
Each example includes:
- Scenario -- the security use case
- SPL query -- the original Splunk query
- KQL query -- the Sentinel equivalent
- Explanation -- line-by-line breakdown of the conversion
- Key differences -- important behavioral differences to watch for
Work through these examples sequentially to build your KQL fluency, or jump to specific scenarios relevant to your SOC operations.
Authentication and access¶
1. Failed authentication attempts (brute force detection)¶
Scenario: Detect accounts with excessive failed logon attempts in the last hour.
SPL:
index=auth sourcetype=WinEventLog:Security EventCode=4625
| stats count as failure_count, values(src_ip) as source_ips by user
| where failure_count > 20
| sort -failure_count
KQL:
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4625
| summarize
failure_count = count(),
source_ips = make_set(IpAddress, 10)
by TargetAccount
| where failure_count > 20
| sort by failure_count desc
Explanation:
index=authbecomes the specific tableSecurityEventEventCode=4625becomesEventID == 4625(field name changes)stats countbecomessummarize count()values(src_ip)becomesmake_set(IpAddress, 10)(KQL limits set size for performance)sort -failure_countbecomessort by failure_count desc(explicit direction)
2. Password spray detection¶
Scenario: Detect a single IP attempting to authenticate against many accounts.
SPL:
index=auth sourcetype=WinEventLog:Security EventCode=4625
| stats dc(user) as unique_users, count as total_attempts by src_ip
| where unique_users > 10 AND total_attempts > 50
KQL:
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4625
| summarize
unique_users = dcount(TargetAccount),
total_attempts = count()
by IpAddress
| where unique_users > 10 and total_attempts > 50
Key differences:
dc()(distinct count) maps todcount()AND(SPL) maps toand(KQL, case-insensitive)
3. Successful logon after multiple failures¶
Scenario: Detect a successful logon following multiple failed attempts (potential compromise).
SPL:
index=auth sourcetype=WinEventLog:Security (EventCode=4625 OR EventCode=4624)
| stats count(eval(EventCode=4625)) as failures,
count(eval(EventCode=4624)) as successes,
latest(_time) as last_event
by src_ip, user
| where failures > 10 AND successes > 0
KQL:
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID in (4624, 4625)
| summarize
failures = countif(EventID == 4625),
successes = countif(EventID == 4624),
last_event = max(TimeGenerated)
by IpAddress, TargetAccount
| where failures > 10 and successes > 0
Explanation:
count(eval(EventCode=4625))becomescountif(EventID == 4625)-- cleaner syntaxlatest(_time)becomesmax(TimeGenerated)(EventCode=4625 OR EventCode=4624)becomesEventID in (4624, 4625)
4. Logon from unusual country (Entra ID)¶
Scenario: Detect sign-ins from countries not in the approved list.
SPL:
index=azure sourcetype=azure:aad:signin Status.errorCode=0
| lookup approved_countries Country
| where NOT match(approved, "yes")
| stats count by UserPrincipalName, Country, City, IPAddress
KQL:
SigninLogs
| where TimeGenerated > ago(24h)
| where ResultType == 0
| where LocationDetails.countryOrRegion !in (
_GetWatchlist('ApprovedCountries') | project Country
)
| summarize
count() by UserPrincipalName,
Country = tostring(LocationDetails.countryOrRegion),
City = tostring(LocationDetails.city),
IPAddress
Key differences:
- Splunk
lookupmaps to KQL_GetWatchlist()+in/!inoperator - JSON field access in KQL uses dot notation:
LocationDetails.countryOrRegion
Lateral movement¶
5. RDP lateral movement¶
Scenario: Detect hosts making RDP connections to multiple internal hosts.
SPL:
index=firewall sourcetype=pan:traffic dest_port=3389 action=allowed
| stats dc(dest_ip) as unique_destinations by src_ip
| where unique_destinations > 5
| lookup asset_db ip AS src_ip OUTPUT hostname, department
KQL:
CommonSecurityLog
| where TimeGenerated > ago(1h)
| where DestinationPort == 3389 and DeviceAction == "Allow"
| summarize
unique_destinations = dcount(DestinationIP)
by SourceIP
| where unique_destinations > 5
| join kind=leftouter (
_GetWatchlist('AssetInventory')
| project SourceIP = IPAddress, hostname = HostName, department = Department
) on SourceIP
6. PsExec / remote service installation¶
Scenario: Detect new service installations that may indicate PsExec or similar remote execution.
SPL:
index=windows sourcetype=WinEventLog:System EventCode=7045
| rex field=Service_File_Name "(?<executable>[^\\\]+)$"
| where NOT match(Service_Name, "^(Windows|Microsoft|vmware)")
| stats count by ComputerName, Service_Name, executable, Service_Start_Type
KQL:
Event
| where TimeGenerated > ago(24h)
| where EventLog == "System" and EventID == 7045
| extend ServiceName = extract("Service Name:\\s+(.*?)\\.", 1, RenderedDescription)
| extend ServiceFile = extract("Service File Name:\\s+(.*?)\\.", 1, RenderedDescription)
| extend executable = extract("[^\\\\]+$", 0, ServiceFile)
| where ServiceName !startswith "Windows"
and ServiceName !startswith "Microsoft"
and ServiceName !startswith "vmware"
| summarize count() by Computer, ServiceName, executable
Data exfiltration¶
7. Large outbound data transfer¶
Scenario: Detect hosts sending unusually large amounts of data externally.
SPL:
index=firewall sourcetype=pan:traffic direction=outbound
| stats sum(bytes_out) as total_bytes by src_ip
| eval total_mb = round(total_bytes/1024/1024, 2)
| where total_mb > 500
| sort -total_mb
KQL:
CommonSecurityLog
| where TimeGenerated > ago(1h)
| where CommunicationDirection == "Outbound"
| summarize total_bytes = sum(SentBytes) by SourceIP
| extend total_mb = round(total_bytes / 1048576.0, 2)
| where total_mb > 500
| sort by total_mb desc
8. DNS exfiltration detection¶
Scenario: Detect anomalous DNS query patterns that may indicate DNS tunneling.
SPL:
index=dns sourcetype=dns
| eval query_length=len(query)
| stats avg(query_length) as avg_len, max(query_length) as max_len,
count as query_count by src_ip
| where avg_len > 50 OR query_count > 1000
KQL:
DnsEvents
| where TimeGenerated > ago(1h)
| extend query_length = strlen(Name)
| summarize
avg_len = avg(query_length),
max_len = max(query_length),
query_count = count()
by ClientIP
| where avg_len > 50 or query_count > 1000
9. Unusual file sharing activity¶
Scenario: Detect users sharing an unusual number of files externally via SharePoint/OneDrive.
SPL:
index=o365 sourcetype=o365:management:activity Operation=SharingSet
| stats dc(ObjectId) as files_shared, values(ObjectId) as shared_files by UserId
| where files_shared > 20
KQL:
OfficeActivity
| where TimeGenerated > ago(24h)
| where Operation == "SharingSet"
| summarize
files_shared = dcount(OfficeObjectId),
shared_files = make_set(OfficeObjectId, 25)
by UserId
| where files_shared > 20
Privileged access¶
10. New admin account creation¶
Scenario: Detect creation of new accounts with administrative privileges.
SPL:
index=windows sourcetype=WinEventLog:Security EventCode=4720
| join user [search index=windows EventCode=4732 TargetSid="S-1-5-32-544"]
| table _time, SubjectUserName, user, ComputerName
KQL:
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4720
| join kind=inner (
SecurityEvent
| where EventID == 4732
| where TargetSid == "S-1-5-32-544" // Administrators group
| project AddedAccount = MemberName, AddedTime = TimeGenerated, AddedBy = SubjectAccount
) on $left.TargetAccount == $right.AddedAccount
| project TimeGenerated, CreatedBy = SubjectAccount, NewAdmin = TargetAccount, Computer
11. Privilege escalation via group modification¶
Scenario: Detect users being added to high-privilege security groups.
SPL:
index=windows sourcetype=WinEventLog:Security EventCode=4728 OR EventCode=4732 OR EventCode=4756
| lookup privileged_groups TargetSid OUTPUT group_name, risk_level
| where isnotnull(risk_level)
| stats count by SubjectUserName, MemberName, group_name, risk_level
KQL:
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID in (4728, 4732, 4756)
| join kind=inner (
_GetWatchlist('PrivilegedGroups')
| project TargetSid = SID, group_name = GroupName, risk_level = RiskLevel
) on TargetSid
| summarize count() by SubjectAccount, MemberName, group_name, risk_level
12. Service account interactive logon¶
Scenario: Detect service accounts used for interactive (non-automated) logon.
SPL:
index=auth sourcetype=WinEventLog:Security EventCode=4624 Logon_Type=2 OR Logon_Type=10
| lookup service_accounts user
| where is_service_account="yes"
| stats count by user, src_ip, Logon_Type, ComputerName
KQL:
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4624
| where LogonType in (2, 10) // Interactive, RemoteInteractive
| where TargetAccount in (
_GetWatchlist('ServiceAccounts') | project Account
)
| summarize count() by TargetAccount, IpAddress, LogonType, Computer
Malware and endpoint¶
13. Suspicious process execution¶
Scenario: Detect known-malicious process names or suspicious parent-child process relationships.
SPL:
index=sysmon sourcetype=XmlWinEventLog:Microsoft-Windows-Sysmon/Operational EventCode=1
| eval proc_lower=lower(Image)
| where match(proc_lower, "(mimikatz|psexec|procdump|lazagne|bloodhound)")
OR (ParentImage="*\\cmd.exe" AND Image="*\\powershell.exe")
KQL:
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where FileName has_any ("mimikatz", "psexec", "procdump", "lazagne", "bloodhound")
or (InitiatingProcessFileName == "cmd.exe" and FileName == "powershell.exe")
| project TimeGenerated, DeviceName, AccountName, FileName,
ProcessCommandLine, InitiatingProcessFileName
Key differences:
- Sysmon data in Splunk often uses
XmlWinEventLogsourcetype; in Sentinel, useDeviceProcessEvents(Defender for Endpoint) orEvent(Sysmon via AMA) - SPL
match()with regex maps to KQLhas_any()for simple substring matching ormatches regexfor full regex
14. PowerShell encoded command detection¶
Scenario: Detect Base64-encoded PowerShell commands often used by malware.
SPL:
index=sysmon sourcetype=XmlWinEventLog EventCode=1
| where Image="*\\powershell.exe"
| where match(CommandLine, "-[eE]nc|-[eE]ncodedCommand")
| rex field=CommandLine "-[eE]nc(?:odedCommand)?\s+(?<encoded_cmd>\S+)"
| eval decoded=base64decode(encoded_cmd)
KQL:
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where FileName =~ "powershell.exe"
| where ProcessCommandLine matches regex "-[eE]nc(odedCommand)?\\s+"
| extend encoded_cmd = extract("-[eE]nc(?:odedCommand)?\\s+(\\S+)", 1, ProcessCommandLine)
| extend decoded = base64_decode_tostring(encoded_cmd)
| project TimeGenerated, DeviceName, AccountName, ProcessCommandLine, decoded
Network and firewall¶
15. Port scan detection¶
Scenario: Detect hosts scanning multiple ports on a target.
SPL:
index=firewall sourcetype=pan:traffic action=denied
| stats dc(dest_port) as ports_scanned, count as connection_attempts
by src_ip, dest_ip
| where ports_scanned > 20
KQL:
CommonSecurityLog
| where TimeGenerated > ago(1h)
| where DeviceAction == "Deny"
| summarize
ports_scanned = dcount(DestinationPort),
connection_attempts = count()
by SourceIP, DestinationIP
| where ports_scanned > 20
16. Connection to known-bad IP (threat intelligence)¶
Scenario: Detect outbound connections to IP addresses on the threat intelligence list.
SPL:
index=firewall sourcetype=pan:traffic action=allowed direction=outbound
| lookup threat_intel_ip ip AS dest_ip OUTPUT threat_category, threat_source, confidence
| where isnotnull(threat_category)
| stats count, values(threat_category) as categories by src_ip, dest_ip, threat_source
KQL:
CommonSecurityLog
| where TimeGenerated > ago(1h)
| where DeviceAction == "Allow" and CommunicationDirection == "Outbound"
| join kind=inner (
ThreatIntelligenceIndicator
| where Active == true
| where ExpirationDateTime > now()
| where isnotempty(NetworkIP)
| project ThreatIP = NetworkIP, ThreatType, ConfidenceScore, SourceSystem
) on $left.DestinationIP == $right.ThreatIP
| summarize
count(),
ThreatTypes = make_set(ThreatType)
by SourceIP, DestinationIP, SourceSystem
17. DNS to suspicious TLD¶
Scenario: Detect DNS queries to suspicious top-level domains.
SPL:
index=dns sourcetype=dns
| rex field=query "\.(?<tld>[^.]+)$"
| where tld IN ("xyz", "top", "club", "work", "buzz", "tk", "ml", "ga", "cf", "gq")
| stats count, dc(query) as unique_queries by src_ip, tld
| where count > 10
KQL:
DnsEvents
| where TimeGenerated > ago(24h)
| extend tld = tostring(split(Name, ".")[-1])
| where tld in ("xyz", "top", "club", "work", "buzz", "tk", "ml", "ga", "cf", "gq")
| summarize
query_count = count(),
unique_queries = dcount(Name)
by ClientIP, tld
| where query_count > 10
Cloud and identity¶
18. Impossible travel detection¶
Scenario: Detect a user signing in from two geographically distant locations within a short time window.
SPL:
index=azure sourcetype=azure:aad:signin Status.errorCode=0
| stats earliest(_time) as first_login, latest(_time) as last_login,
values(Country) as countries, dc(Country) as country_count
by UserPrincipalName
| where country_count > 1
| eval time_diff_hours = round((last_login - first_login) / 3600, 2)
| where time_diff_hours < 2
KQL:
SigninLogs
| where TimeGenerated > ago(24h)
| where ResultType == 0
| summarize
first_login = min(TimeGenerated),
last_login = max(TimeGenerated),
countries = make_set(tostring(LocationDetails.countryOrRegion)),
country_count = dcount(tostring(LocationDetails.countryOrRegion))
by UserPrincipalName
| where country_count > 1
| extend time_diff_hours = round(datetime_diff('hour', last_login, first_login) * 1.0, 2)
| where time_diff_hours < 2
19. Azure resource deletion spree¶
Scenario: Detect a principal deleting multiple Azure resources in a short time.
SPL:
index=azure sourcetype=azure:activity operationName="*delete*"
| stats count, dc(resourceId) as resources_deleted,
values(resourceType) as resource_types
by caller
| where resources_deleted > 5
KQL:
AzureActivity
| where TimeGenerated > ago(1h)
| where OperationNameValue contains "delete"
| where ActivityStatusValue == "Success"
| summarize
count(),
resources_deleted = dcount(ResourceId),
resource_types = make_set(ResourceProviderValue)
by Caller
| where resources_deleted > 5
20. Conditional access policy bypass¶
Scenario: Detect sign-ins that bypassed conditional access policies.
SPL:
index=azure sourcetype=azure:aad:signin
| spath output=ca_results path=ConditionalAccessPolicies{}.result
| mvexpand ca_results
| where ca_results="notApplied" OR ca_results="reportOnlyNotApplied"
| stats count by UserPrincipalName, AppDisplayName, IPAddress
| where count > 5
KQL:
SigninLogs
| where TimeGenerated > ago(24h)
| mv-expand CAPolicy = ConditionalAccessPolicies
| where CAPolicy.result == "notApplied" or CAPolicy.result == "reportOnlyNotApplied"
| summarize count() by UserPrincipalName, AppDisplayName, IPAddress
| where count_ > 5
Advanced patterns¶
21. Time-based anomaly detection¶
Scenario: Detect logon activity outside normal business hours.
SPL:
index=auth sourcetype=WinEventLog:Security EventCode=4624 Logon_Type=10
| eval hour=strftime(_time, "%H")
| eval day_of_week=strftime(_time, "%u")
| where (hour < 6 OR hour > 22) OR day_of_week > 5
| stats count by user, src_ip, ComputerName
| where count > 3
KQL:
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4624 and LogonType == 10
| extend hour = datetime_part("hour", TimeGenerated)
| extend day_of_week = dayofweek(TimeGenerated) / 1d // 0=Sun, 6=Sat
| where (hour < 6 or hour > 22) or day_of_week in (0, 6)
| summarize count() by TargetAccount, IpAddress, Computer
| where count_ > 3
22. Statistical outlier detection¶
Scenario: Detect data transfer volumes that are statistical outliers (more than 3 standard deviations from normal).
SPL:
index=proxy sourcetype=web_proxy
| stats sum(bytes_out) as total_bytes by user
| eventstats avg(total_bytes) as mean_bytes, stdev(total_bytes) as std_bytes
| eval zscore = (total_bytes - mean_bytes) / std_bytes
| where zscore > 3
KQL:
let baseline = CommonSecurityLog
| where TimeGenerated > ago(24h)
| summarize total_bytes = sum(SentBytes) by SourceUserName
| summarize mean_bytes = avg(total_bytes), std_bytes = stdev(total_bytes);
CommonSecurityLog
| where TimeGenerated > ago(24h)
| summarize total_bytes = sum(SentBytes) by SourceUserName
| extend zscore = (total_bytes - toscalar(baseline | project mean_bytes))
/ toscalar(baseline | project std_bytes)
| where zscore > 3
23. Multi-stage attack correlation¶
Scenario: Correlate reconnaissance (port scan), initial access (brute force), and lateral movement (RDP) from the same source IP.
SPL:
index=firewall action=denied
| stats dc(dest_port) as scan_ports by src_ip
| where scan_ports > 20
| join src_ip [
search index=auth action=failure
| stats count as brute_attempts by src_ip
| where brute_attempts > 50
]
| join src_ip [
search index=firewall dest_port=3389 action=allowed
| stats dc(dest_ip) as rdp_targets by src_ip
| where rdp_targets > 3
]
| table src_ip, scan_ports, brute_attempts, rdp_targets
KQL:
let reconnaissance = CommonSecurityLog
| where TimeGenerated > ago(4h)
| where DeviceAction == "Deny"
| summarize scan_ports = dcount(DestinationPort) by SourceIP
| where scan_ports > 20;
let brute_force = SecurityEvent
| where TimeGenerated > ago(4h)
| where EventID == 4625
| summarize brute_attempts = count() by IpAddress
| where brute_attempts > 50
| project SourceIP = IpAddress, brute_attempts;
let lateral_movement = CommonSecurityLog
| where TimeGenerated > ago(4h)
| where DestinationPort == 3389 and DeviceAction == "Allow"
| summarize rdp_targets = dcount(DestinationIP) by SourceIP
| where rdp_targets > 3;
reconnaissance
| join kind=inner brute_force on SourceIP
| join kind=inner lateral_movement on SourceIP
| project SourceIP, scan_ports, brute_attempts, rdp_targets
Quick reference card¶
| SPL concept | KQL equivalent | Example |
|---|---|---|
index=X | Table name | SecurityEvent |
sourcetype=Y | Table name or where filter | SigninLogs |
earliest=-1h | where TimeGenerated > ago(1h) | Always add time filter first |
\| stats count by field | \| summarize count() by field | Core aggregation |
\| eval x=if(a,b,c) | \| extend x=iff(a,b,c) | Conditional logic |
\| rex "(?<f>regex)" | \| extend f=extract("regex",1,field) | Regex extraction |
\| lookup table key | \| join (_GetWatchlist('name')) | Reference data lookup |
\| dedup field | \| summarize arg_max(TimeGenerated,*) by field | Deduplication |
\| transaction id | \| summarize make_list() by id | Session grouping |
isnotnull(x) | isnotempty(x) | Null checking |
\| timechart span=1h | \| summarize by bin(TimeGenerated,1h) | Time bucketing |
Next steps:
- Detection Rules Migration -- complete conversion pattern reference
- Tutorial: SIEM Migration Experience -- automated conversion tool
- Feature Mapping -- full feature comparison
Maintainers: csa-inabox core team Last updated: 2026-04-30