Add postgresql storage backend#667
Conversation
- Add parsedmarc/postgres.py with PostgreSQLClient class:
- __init__: accepts connection_string or individual host/port/user/password/database params
- create_tables(): idempotent DDL for all 10 normalized tables
(dmarc_aggregate_report, dmarc_aggregate_record + dkim/spf/policy_override,
dmarc_forensic_report + sample_address,
smtp_tls_report + smtp_tls_policy + smtp_tls_failure_detail)
- save_aggregate_report_to_postgresql()
- save_forensic_report_to_postgresql()
- save_smtp_tls_report_to_postgresql()
- PostgreSQLError and AlreadySaved exception classes
- Duplicate detection via INSERT ... ON CONFLICT DO NOTHING
- Wire CLI (parsedmarc/cli.py):
- Add postgresql_* opts to Namespace defaults
- Parse [postgresql] INI config section
- Initialize PostgreSQLClient and call create_tables() at startup
- Call save_*_to_postgresql() in process_reports() for all three report types
- Add psycopg[binary]>=3.1.0 to pyproject.toml dependencies
- Document [postgresql] config section in docs/source/usage.md
…sure time-constraints and others)
There was a problem hiding this comment.
Pull request overview
This PR adds a PostgreSQL storage backend to parsedmarc as an alternative to Elasticsearch/OpenSearch. It includes the database client module, CLI integration, documentation, and a Grafana dashboard for PostgreSQL.
Changes:
- New
parsedmarc/postgres.pymodule:PostgreSQLClientclass with methods to save aggregate, forensic, and SMTP TLS reports to PostgreSQL with auto-created schema parsedmarc/cli.py: Wires the new PostgreSQL client into the existing report-processing pipeline and config file parsingdocs/source/usage.md: Documents the new[postgresql]config sectiongrafana/Grafana-DMARC_Reports-PostgreSQL.json: New Grafana dashboard targeting the PostgreSQL schemapyproject.toml: Addspsycopg[binary]>=3.1.0as a mandatory dependency
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
parsedmarc/postgres.py |
New client module with DDL and INSERT logic for all three report types |
parsedmarc/cli.py |
Integration of PostgreSQL client into the CLI config and report dispatch |
docs/source/usage.md |
Documentation for the new [postgresql] config section |
grafana/Grafana-DMARC_Reports-PostgreSQL.json |
New Grafana dashboard for the PostgreSQL schema |
pyproject.toml |
Adds psycopg[binary] as a required dependency |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| policy.get("domain"), | ||
| policy.get("policy", {}).get("policy_type"), | ||
| policy.get("policy", {}).get("policy_string") or [], | ||
| policy.get("policy", {}).get("mx_host") or [], | ||
| policy.get("summary", {}).get( | ||
| "total_successful_session_count" | ||
| ), | ||
| policy.get("summary", {}).get( | ||
| "total_failure_session_count" | ||
| ), |
There was a problem hiding this comment.
The field mapping for the SMTP TLS policy insert is incorrect. The parsed policy dictionary (produced by _parse_smtp_tls_report_policy() in parsedmarc/__init__.py) uses flat keys, not the nested raw JSON structure. Specifically:
policy.get("domain")should bepolicy.get("policy_domain")policy.get("policy", {}).get("policy_type")should bepolicy.get("policy_type")policy.get("policy", {}).get("policy_string") or []should bepolicy.get("policy_strings") or []policy.get("policy", {}).get("mx_host") or []should bepolicy.get("mx_host_patterns") or []policy.get("summary", {}).get("total_successful_session_count")should bepolicy.get("successful_session_count")policy.get("summary", {}).get("total_failure_session_count")should bepolicy.get("failed_session_count")
All these fields will be stored as NULL because the keys don't exist in the parsed dict.
| policy.get("domain"), | |
| policy.get("policy", {}).get("policy_type"), | |
| policy.get("policy", {}).get("policy_string") or [], | |
| policy.get("policy", {}).get("mx_host") or [], | |
| policy.get("summary", {}).get( | |
| "total_successful_session_count" | |
| ), | |
| policy.get("summary", {}).get( | |
| "total_failure_session_count" | |
| ), | |
| policy.get("policy_domain"), | |
| policy.get("policy_type"), | |
| policy.get("policy_strings") or [], | |
| policy.get("mx_host_patterns") or [], | |
| policy.get("successful_session_count"), | |
| policy.get("failed_session_count"), |
| receiving_mx_hostname TEXT, | ||
| receiving_mx_helo TEXT, | ||
| additional_info_uri TEXT, | ||
| failure_reason_code TEXT, |
There was a problem hiding this comment.
The ip_address column in the smtp_tls_failure_detail table definition has no corresponding field in the parsed SMTP TLS failure detail dictionaries. The parser in parsedmarc/__init__.py only produces sending_mta_ip and receiving_ip fields for failure details — there is no ip_address field. As a result, detail.get("ip_address") will always return None and the column will always be empty. This column should either be removed from the schema and INSERT statement, or documented to explain where the value comes from.
| failure_reason_code TEXT, | |
| failure_reason_code TEXT, | |
| -- NOTE: ip_address is currently not populated by the parser. | |
| -- It is retained for backward/forward compatibility and will | |
| -- typically remain NULL; use sending_mta_ip/receiving_ip instead. |
| ip_address | ||
| ) VALUES ( | ||
| %s, %s, %s, %s, %s, %s, %s, %s, %s, %s | ||
| ) | ||
| """, | ||
| ( | ||
| policy_db_id, | ||
| detail.get("result_type"), | ||
| detail.get("failed_session_count"), | ||
| detail.get("sending_mta_ip"), | ||
| detail.get("receiving_ip"), | ||
| detail.get("receiving_mx_hostname"), | ||
| detail.get("receiving_mx_helo"), | ||
| detail.get("additional_info_uri"), | ||
| detail.get("failure_reason_code"), | ||
| detail.get("ip_address"), |
There was a problem hiding this comment.
The ip_address insert parameter will always be None because detail.get("ip_address") has no corresponding key in the parsed SMTP TLS failure detail dictionary. This is consistent with the spurious column in the DDL (see the smtp_tls_failure_detail table schema).
| if connection_string: | ||
| conninfo = connection_string | ||
| else: | ||
| parts: list[str] = [] | ||
| if host: | ||
| parts.append(f"host={host}") | ||
| if port: | ||
| parts.append(f"port={port}") | ||
| if user: | ||
| parts.append(f"user={user}") | ||
| if password: | ||
| parts.append(f"password={password}") | ||
| if database: | ||
| parts.append(f"dbname={database}") | ||
| conninfo = " ".join(parts) | ||
|
|
||
| logger.debug("Connecting to PostgreSQL") | ||
| try: | ||
| self._conn: psycopg.Connection = psycopg.connect(conninfo) |
There was a problem hiding this comment.
When building the libpq keyword-value connection string manually, special characters in password, user, host, or database values are not escaped. The libpq format requires that values containing spaces, single quotes, or backslashes be quoted or escaped (e.g., password='p@ss w0rd' with backslash-escaped quotes inside). A value like my password would produce password=my password, which is malformed and will fail to connect.
Instead of building a keyword-value connection string, psycopg.connect() in psycopg 3 accepts the individual parameters directly as keyword arguments (host, port, user, password, dbname), which avoids any escaping concerns entirely.
| if connection_string: | |
| conninfo = connection_string | |
| else: | |
| parts: list[str] = [] | |
| if host: | |
| parts.append(f"host={host}") | |
| if port: | |
| parts.append(f"port={port}") | |
| if user: | |
| parts.append(f"user={user}") | |
| if password: | |
| parts.append(f"password={password}") | |
| if database: | |
| parts.append(f"dbname={database}") | |
| conninfo = " ".join(parts) | |
| logger.debug("Connecting to PostgreSQL") | |
| try: | |
| self._conn: psycopg.Connection = psycopg.connect(conninfo) | |
| logger.debug("Connecting to PostgreSQL") | |
| try: | |
| if connection_string: | |
| # Use the provided libpq connection string or URI directly. | |
| self._conn: psycopg.Connection = psycopg.connect(connection_string) | |
| else: | |
| # Pass individual parameters as keyword arguments to avoid | |
| # manual conninfo construction and escaping issues. | |
| self._conn = psycopg.connect( | |
| host=host, | |
| port=port, | |
| user=user, | |
| password=password, | |
| dbname=database, | |
| ) |
| "tqdm>=4.31.1", | ||
| "urllib3>=1.25.7", | ||
| "xmltodict>=0.12.0", | ||
| "psycopg[binary]>=3.1.0", |
There was a problem hiding this comment.
The psycopg[binary] dependency is added to the mandatory dependencies list, which means it is now a required dependency for all users of parsedmarc, even those who never use PostgreSQL. This will pull in a binary package (psycopg-binary) for every installation. Other backends like kafka-python-ng, boto3 (S3), pygelf, and opensearch-py are already required dependencies in the same way, so this is consistent with the existing pattern. However, if the project wanted to keep PostgreSQL optional (similar to an extras_require pattern), this could be moved to an optional extra. Consider discussing with maintainers whether this should be optional.
There was a problem hiding this comment.
I would rather see this is optional because they are binaries that might not exist for every platform.
| Raises: | ||
| PostgreSQLError: If a database error occurs. | ||
| """ | ||
| sample = report.get("sample", {}) or {} |
There was a problem hiding this comment.
This is a critical data mapping bug. report["sample"] is a raw RFC 822 email string (as assigned in parsedmarc/__init__.py at parsed_report["sample"] = sample), not a dict. The expression report.get("sample", {}) or {} will return the non-empty string when one is present, and then sample.get("date"), sample.get("subject"), sample.get("body"), etc. on line 505-516 will raise AttributeError: 'str' object has no attribute 'get'.
The parsed sample data (as a structured dict containing date, subject, body, headers, from, to, etc.) lives in report["parsed_sample"], not report["sample"]. This variable should be sample = report.get("parsed_sample", {}) or {}.
| sample = report.get("sample", {}) or {} | |
| sample = report.get("parsed_sample", {}) or {} |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
@fsvm88 Please look at the review. |
The parsed policy dictionary returned by _parse_smtp_tls_report_policy()
uses flat keys (policy_domain, policy_type, policy_strings,
mx_host_patterns, successful_session_count, failed_session_count),
not the nested raw JSON structure. The previous code used nested
lookups like policy.get('policy', {}).get('policy_type') which
would always resolve to None, storing NULL for all policy fields.
…e_detail The parser (_parse_smtp_tls_failure_details) never produces an 'ip_address' key — it only provides 'sending_mta_ip' and 'receiving_ip'. The column was always NULL. Removed from both the DDL and the INSERT statement.
…fo string The previous code built a libpq keyword-value connection string by concatenating f-strings, which does not escape special characters in values (e.g. spaces or quotes in passwords). psycopg.connect() natively accepts host, port, user, password, and dbname as keyword arguments, which avoids escaping issues entirely.
…t data
report['sample'] contains the raw RFC 822 email string, not a dict.
Calling .get('date'), .get('subject'), etc. on a string raises
AttributeError. The structured dict with date, subject, body,
headers, from, to, etc. lives in report['parsed_sample'].
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| record.get("begin_date"), | ||
| record.get("end_date"), |
There was a problem hiding this comment.
The save_aggregate_report_to_postgresql method reads record.get("begin_date") and record.get("end_date") to populate the interval_begin and interval_end columns, but the parsed record dictionary uses "interval_begin" and "interval_end" as field names (set by _append_parsed_record in parsedmarc/__init__.py, lines 272-273 and 289-290). Since neither "begin_date" nor "end_date" exists in individual records (those keys belong to the report metadata level), these will always be None, causing the interval_begin and interval_end columns to always be empty.
| record.get("begin_date"), | |
| record.get("end_date"), | |
| record.get("interval_begin"), | |
| record.get("interval_end"), |
| try: | ||
| with self._conn.transaction(): | ||
| with self._conn.cursor() as cur: | ||
| for stmt in ddl_statements: | ||
| cur.execute(stmt) | ||
| logger.debug("PostgreSQL tables verified / created") | ||
| except psycopg.Error as exc: | ||
| raise PostgreSQLError(str(exc)) from exc |
There was a problem hiding this comment.
The create_tables method only creates the tables but no indexes beyond the primary keys and the UNIQUE constraints (which create implicit indexes). The Grafana dashboard queries repeatedly filter by rpt.begin_date (e.g., WHERE $__timeFilter(rpt.begin_date)) and join on r.report_id and r.header_from. As data grows, queries will become slow without indexes on dmarc_aggregate_report.begin_date, dmarc_aggregate_record.report_id, and dmarc_aggregate_record.header_from. Consider adding CREATE INDEX IF NOT EXISTS statements for these columns in create_tables().
…imestamps
The per-record INSERT used record.get("begin_date") and
record.get("end_date"), but the parser stores these fields as
"interval_begin" and "interval_end" on each record. The keys
begin_date/end_date only exist in report_metadata. This caused the
interval_begin and interval_end columns to always be NULL.
The [postgresql] configuration section did not mention that the [general] save_aggregate, save_forensic, and save_smtp_tls flags must be set to True for data to actually be written to PostgreSQL. Add an explanatory note after the configuration examples.
The Grafana dashboard queries repeatedly filter on dmarc_aggregate_report.begin_date, join on dmarc_aggregate_record.report_id, and filter on dmarc_aggregate_record.header_from. Without indexes these queries will degrade to sequential scans as data grows. Add CREATE INDEX IF NOT EXISTS statements for: - dmarc_aggregate_report(begin_date) - dmarc_aggregate_record(report_id) - dmarc_aggregate_record(header_from) - dmarc_forensic_report(arrival_date_utc) - smtp_tls_report(begin_date) - smtp_tls_policy(report_id)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| begin_date TIMESTAMPTZ NOT NULL, | ||
| end_date TIMESTAMPTZ NOT NULL, |
There was a problem hiding this comment.
The begin_date and end_date values stored in the dmarc_aggregate_report table come from meta.get("begin_date") which is a string produced by timestamp_to_human(). This function uses datetime.fromtimestamp() (local time, not UTC), so the string does not include timezone information. When PostgreSQL inserts this into a TIMESTAMPTZ NOT NULL column, it interprets the string using the current PostgreSQL session timezone rather than UTC.
For deployments where the PostgreSQL session timezone is not set to UTC, the stored timestamps will be incorrect. The interval_begin/interval_end fields in dmarc_aggregate_record have the same issue.
To fix this, the UTC-aware begin_dt datetime objects (already computed in the parser for UTC) should be used, or the timezone-aware string format "YYYY-MM-DD HH:MM:SS+00" should be generated. Alternatively, the column type could be changed to TEXT to match the forensic report approach, though TIMESTAMPTZ is the correct semantic type here.
| begin_date TIMESTAMPTZ NOT NULL, | |
| end_date TIMESTAMPTZ NOT NULL, | |
| begin_date TEXT NOT NULL, | |
| end_date TEXT NOT NULL, |
| arrival_date TEXT, | ||
| arrival_date_utc TEXT, |
There was a problem hiding this comment.
The arrival_date_utc and arrival_date columns in dmarc_forensic_report are declared as TEXT, but the Grafana dashboard queries them with ::TIMESTAMPTZ casts (e.g., f.arrival_date_utc::TIMESTAMPTZ BETWEEN $__timeFrom() AND $__timeTo()). The index idx_forensic_report_arrival_date is created on the TEXT column, but this index will not be utilized when the column is used with a TIMESTAMPTZ cast in a WHERE clause.
Declaring these columns as TIMESTAMPTZ would improve query performance and type safety. The arrival_date_utc value from the parser is already a UTC string in %Y-%m-%d %H:%M:%S format and would insert cleanly into a TIMESTAMPTZ column if the UTC offset is appended (e.g., by using +00 suffix).
| arrival_date TEXT, | |
| arrival_date_utc TEXT, | |
| arrival_date TIMESTAMPTZ, | |
| arrival_date_utc TIMESTAMPTZ, |
| logger.debug("Connecting to PostgreSQL") | ||
| try: | ||
| if connection_string: | ||
| # Use the provided libpq connection string or URI directly. | ||
| self._conn: psycopg.Connection = psycopg.connect( | ||
| connection_string | ||
| ) | ||
| else: | ||
| # Pass individual parameters as keyword arguments to avoid | ||
| # manual conninfo construction and escaping issues. | ||
| self._conn = psycopg.connect( | ||
| host=host, | ||
| port=port, | ||
| user=user, | ||
| password=password, | ||
| dbname=database, | ||
| ) | ||
| self._conn.autocommit = False | ||
| except psycopg.Error as exc: | ||
| raise PostgreSQLError(str(exc)) from exc |
There was a problem hiding this comment.
The PostgreSQLClient holds a single persistent connection (self._conn) for the lifetime of the process. When parsedmarc runs in watch mode (mailbox_watch = true), the process runs indefinitely and calls process_reports repeatedly. If the PostgreSQL server drops the connection (e.g., due to a server restart, idle_in_transaction_session_timeout, tcp_keepalives_idle expiry, or idle connection reaping), all subsequent save_* calls will raise a psycopg.OperationalError, which gets wrapped as PostgreSQLError and logged — but the client is never reconnected. The tool will silently stop persisting reports to PostgreSQL without terminating.
Consider adding connection health checking or reconnection logic (e.g., checking self._conn.closed before use and reconnecting if needed), or using a connection pool.
| ] | ||
| } | ||
| ], | ||
| "refresh": "10s", |
There was a problem hiding this comment.
The dashboard default refresh interval is set to "10s" (10 seconds). For a DMARC reporting dashboard where data changes infrequently (reports arrive via email at most a few times a day), a 10-second auto-refresh creates unnecessary polling load on the PostgreSQL server. Consider using "1m" or "5m" as a more appropriate default, or set "refresh": "" to disable auto-refresh by default and let users enable it manually.
| "refresh": "10s", | |
| "refresh": "1m", |
…d timezone ambiguity timestamp_to_human() produces timezone-naive local-time strings (e.g. '2024-01-15 09:30:00') without any UTC offset indicator. Storing these in TIMESTAMPTZ columns causes PostgreSQL to interpret them using the session timezone rather than UTC, resulting in incorrect timestamps when the session timezone differs. Change begin_date, end_date, interval_begin and interval_end in aggregate report/record tables, and begin_date/end_date in the smtp_tls_report table, from TIMESTAMPTZ to TEXT to match the actual data format produced by the parser.
The Grafana dashboard queries cast arrival_date_utc with ::TIMESTAMPTZ (e.g. f.arrival_date_utc::TIMESTAMPTZ BETWEEN ...). When the column is TEXT, the index on arrival_date_utc cannot be utilised for these casts. Change arrival_date and arrival_date_utc from TEXT to TIMESTAMPTZ for proper type safety and index usage. Add _ensure_utc_suffix() helper to append '+00' to the parser's UTC strings so PostgreSQL interprets them correctly.
When parsedmarc runs in watch mode (mailbox_watch = true), the process can stay alive indefinitely. If PostgreSQL drops the connection (server restart, idle timeout, TCP keep-alive expiry), all subsequent save_* calls would fail silently. Store connection parameters at init time and add _connect() / _ensure_connected() methods. Each public method now calls _ensure_connected() before using the connection, transparently re-establishing it if the previous connection was lost.
DMARC reports arrive infrequently (at most a few times per day), so a 10-second auto-refresh creates unnecessary polling load on PostgreSQL. A 1-minute interval is a much more appropriate default.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
…amps and contact_info before insert - Change begin_date/end_date columns from TEXT to TIMESTAMPTZ in dmarc_aggregate_report, dmarc_aggregate_record (interval_begin/end), and smtp_tls_report so Grafana $__timeFilter/$__timeGroup macros work correctly. - Add _naive_local_to_timestamptz() helper for aggregate report dates produced by timestamp_to_human() (local-time, no offset). Re-parses and attaches the local timezone offset for unambiguous TIMESTAMPTZ inserts. - Wrap record-level interval_begin/interval_end (already UTC) and SMTP TLS begin_date/end_date (ISO 8601 with Z) with _ensure_utc_suffix() as a safety net. - Improve _ensure_utc_suffix() negative-offset detection to only check after position 10 in the string, avoiding false positives from date separator hyphens in YYYY-MM-DD. - Add _normalize_arrival_date() for forensic arrival_date: parses RFC 2822 or ISO 8601 via human_timestamp_to_datetime, returns clean UTC string with +00 offset. - Add _contact_info_to_text() to handle Union[str, List[str]] from TLS-RPT contact-info field; joins list values with comma separator to prevent psycopg type error on TEXT column insert.
Add 33 tests covering the PostgreSQL backend: - TestPostgreSQLHelpers (16 tests): Unit tests for pure helper functions - _ensure_utc_suffix: None, empty, naive UTC, already-has-offset, Z suffix - _naive_local_to_timestamptz: None, empty, valid conversion, bad format - _normalize_arrival_date: None, empty, ISO, RFC 2822, already UTC, unparseable - _contact_info_to_text: None, string, list, empty list, numeric - TestPostgreSQLClientSave (9 tests): Mock-based integration tests - Aggregate: INSERT calls, AlreadySaved on conflict, timestamp normalization - Forensic: INSERT calls with sample addresses - SMTP TLS: INSERT calls with policy+failure, AlreadySaved, contact_info list - Reconnection: _ensure_connected reconnects on closed connection - TestPostgreSQLWithSamples (3 tests): End-to-end with real sample data - Parses real aggregate/forensic/smtp_tls samples from samples/ directory - Feeds parsed data through mock save methods to verify no type errors - Validates that real parser output is compatible with the PostgreSQL backend All tests use unittest.mock to avoid requiring a live PostgreSQL instance.
|
I have re-checked all columns for their type, and added tests too (manually checked and making sense, passing). I'll build another docker image and test it for a week or so, but I think most of the work is done. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
tests.py:173
- The
callname is imported fromunittest.mockbut is never used anywhere in the test file. This is an unused import that will cause a ruffF401lint failure in CI (ruff check .).
sample_paths = glob("samples/aggregate/*")
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def save_forensic_report_to_postgresql(self, report: dict) -> None: | ||
| """Saves a parsed forensic (RUF) DMARC report to PostgreSQL. | ||
|
|
||
| Args: | ||
| report: A parsed forensic report dictionary as returned by | ||
| :func:`parsedmarc.parse_report_file`. | ||
|
|
||
| Raises: | ||
| PostgreSQLError: If a database error occurs. | ||
| """ | ||
| self._ensure_connected() | ||
| sample = report.get("parsed_sample", {}) or {} | ||
| src = report.get("source", {}) or {} | ||
|
|
||
| try: | ||
| with self._conn.transaction(): | ||
| with self._conn.cursor() as cur: | ||
| cur.execute( | ||
| """ | ||
| INSERT INTO dmarc_forensic_report ( | ||
| feedback_type, user_agent, version, | ||
| original_envelope_id, original_mail_from, | ||
| original_rcpt_to, arrival_date, arrival_date_utc, | ||
| authentication_results, delivery_result, | ||
| auth_failure, authentication_mechanisms, | ||
| dkim_domain, reported_domain, sample_headers_only, | ||
| source_ip_address, source_country, | ||
| source_reverse_dns, source_base_domain, | ||
| source_name, source_type, | ||
| sample, sample_date, sample_subject, | ||
| sample_body, sample_has_defects, | ||
| sample_headers, sample_from, sample_to | ||
| ) VALUES ( | ||
| %s, %s, %s, | ||
| %s, %s, | ||
| %s, %s, %s, | ||
| %s, %s, | ||
| %s, %s, | ||
| %s, %s, %s, | ||
| %s, %s, | ||
| %s, %s, | ||
| %s, %s, | ||
| %s, %s, %s, | ||
| %s, %s, | ||
| %s, %s, %s | ||
| ) | ||
| RETURNING id | ||
| """, | ||
| ( | ||
| report.get("feedback_type"), | ||
| report.get("user_agent"), | ||
| report.get("version"), | ||
| report.get("original_envelope_id"), | ||
| report.get("original_mail_from"), | ||
| report.get("original_rcpt_to"), | ||
| _normalize_arrival_date( | ||
| report.get("arrival_date") | ||
| ), | ||
| _ensure_utc_suffix( | ||
| report.get("arrival_date_utc") | ||
| ), | ||
| report.get("authentication_results"), | ||
| report.get("delivery_result"), | ||
| report.get("auth_failure") or [], | ||
| report.get("authentication_mechanisms") or [], | ||
| report.get("dkim_domain"), | ||
| report.get("reported_domain"), | ||
| report.get("sample_headers_only"), | ||
| src.get("ip_address"), | ||
| src.get("country"), | ||
| src.get("reverse_dns"), | ||
| src.get("base_domain"), | ||
| src.get("name"), | ||
| src.get("type"), | ||
| report.get("sample"), | ||
| sample.get("date"), | ||
| sample.get("subject"), | ||
| sample.get("body"), | ||
| sample.get("has_defects"), | ||
| psycopg_types.json.Jsonb(sample.get("headers")) | ||
| if sample.get("headers") | ||
| else None, | ||
| psycopg_types.json.Jsonb(sample.get("from")) | ||
| if sample.get("from") | ||
| else None, | ||
| psycopg_types.json.Jsonb(sample.get("to")) | ||
| if sample.get("to") | ||
| else None, | ||
| ), | ||
| ) | ||
| report_db_id: int = cur.fetchone()[0] | ||
|
|
||
| for addr_type in ("to", "cc", "bcc", "reply_to"): | ||
| entries = sample.get(addr_type) or [] | ||
| if isinstance(entries, dict): | ||
| entries = [entries] | ||
| for entry in entries: | ||
| cur.execute( | ||
| """ | ||
| INSERT INTO dmarc_forensic_sample_address | ||
| (report_id, address_type, | ||
| display_name, address) | ||
| VALUES (%s, %s, %s, %s) | ||
| """, | ||
| ( | ||
| report_db_id, | ||
| addr_type, | ||
| entry.get("display_name"), | ||
| entry.get("address"), | ||
| ), | ||
| ) | ||
|
|
||
| except psycopg.Error as exc: | ||
| raise PostgreSQLError(str(exc)) from exc |
There was a problem hiding this comment.
The dmarc_forensic_report table has no uniqueness constraint and save_forensic_report_to_postgresql performs no duplicate detection before inserting. Unlike the aggregate and SMTP TLS reports (which use ON CONFLICT DO NOTHING), forensic reports can be inserted multiple times if the same email is reprocessed (e.g., in watch mode if an email fails to archive). The other backends (Elasticsearch, OpenSearch) perform duplicate detection by querying for an existing report with the same arrival date, sender, and subject before inserting. Consider adding a similar check or a uniqueness constraint.
No problem, makes sense, I'm not in a rush 👍 I'll keep testing and cleaning up whatever I run into |
Hi, I've been using the project for a while, and in my 15 years of IT career (systems/cloud engineer), I always struggled with ElasticSearch.
I feel it's clunky and heavy, I don't understand the architecture, and every attempt at learning it pushed me away from it (pretty much like Kafka). It takes ~5m to start a single-node cluster on fast HDDs (so I have to work it around by making the
parsedmarccontainer 5m before attempting to connect), and I always have to search the right command-line incantations and env variables to make it actually work. I'm unable to perform basic or periodic maintenance because I just can't understand it (and the curl-interface doesn't really help).I recently ran into this article and it strongly resonated, so I decided to give it a shot and add a Postgres backend to this project as an alternative to ElasticSearch. Disclaimer: it's 99% LLM.
Claude Opus 4.6 basically just oneshotted the python code at the first try. I built a test docker image and ran it for the past week with zero issues so far (I have not received forensic reports or SMTP TLS reports though).
The Grafana dashboard went through a couple iterations to refine the queries, and it also works well for now (again, no forensic reports or SMTP TLS reports to show).
Postgres is much easier and transparent to manage, it's SQL, doesn't need customized command-line, I can perform maintenance and understand what's happening, and it's also very efficient:
I hope it's a useful addition to the project. Let me know if you need me to change or fix anything :)