I use Splunk to report on business objects moreso than typical security operation data. For instance, helpdesk tickets rather than firewall logs. I have created various Python scripts to import these business objects from various REST and SQL sources, and I want these import scripts to be idempotent. That is, I want to import helpdesk tickets every day, but no more than once per day, regardless of how many times the import script is called. Data from more recent imports should supersede data from older imports. After considering several approaches, here is my solution to this situation.
Data Input
The data is normally imported once per day, invoked by cron or Windows Task Scheduler. In Splunk, I have an alert scheduled to run every hour to alert when a duplicate event is found. At this time, I have not automated the deletion of these duplicate events, but I plan to do that once I am confident that this process is working the way I want it to.
What is a duplicate event?
In my situation, a duplicate event is where the same primary key of the source system is imported more than once per day:
index=foo | bin span=1d _time | stats count by _time primary_key | where count > 1
How to find the oldest duplicate event?
index=foo | eval eid=_indextime | search [search index=foo | bin span=1d _time | stats min(_indextime) as eid count by _time sys_id | where count > 1 | fields eid]
I use this query as an alert to notify me that duplicates exist. When I am ready to delete the duplicate events, I append “| delete” to the end of this query and re-run it from an account that has delete permission.
Huge thanks to drippler on Splunk Answers for pointing me in the right direction!