# Join with baseline revenue
targets_008 = (
targets_008_base
.join(baseline_2026_revenue, on="customer_code", how="left")
.with_columns(pl.col("baseline_revenue").fill_null(0))
)
# Calculate threshold for customers WITH baseline revenue
# Formula: baseline_revenue / 56 * 31 * 1.2 (tax applied later in adjusted threshold)
targets_008 = targets_008.with_columns(
pl.when(pl.col("baseline_revenue") > 0)
.then((pl.col("baseline_revenue") / BASELINE_DAYS * 31 * 1.2))
.otherwise(pl.lit(None))
.alias("calculated_threshold")
)
# Get median of calculated thresholds (for customers without baseline)
calculated_thresholds = targets_008.filter(
pl.col("calculated_threshold").is_not_null())["calculated_threshold"]
median_threshold = calculated_thresholds.median()
print(
f"Customers with baseline revenue: {(targets_008['baseline_revenue'] > 0).sum():,}")
print(
f"Customers without baseline revenue: {(targets_008['baseline_revenue'] == 0).sum():,}")
print(f"Median calculated threshold: ${median_threshold:,.0f}")
# Apply median threshold to customers without baseline
targets_008 = targets_008.with_columns(
pl.when(pl.col("calculated_threshold").is_not_null())
.then(pl.col("calculated_threshold"))
.otherwise(pl.lit(median_threshold))
.alias("threshold")
)
# Rebate percentage (applied later to adjusted threshold)
REBATE_PERCENTAGE = 0.03
# Add flag for threshold source
targets_008 = targets_008.with_columns(
pl.when(pl.col("baseline_revenue") > 0)
.then(pl.lit("計算"))
.otherwise(pl.lit("中位數"))
.alias("threshold_source")
)
# Sort by threshold descending
targets_008 = targets_008.sort("threshold", descending=True)
# =============================================================================
# CALCULATE JANUARY 2026 MTD SPEND AND LAST ORDER DATE
# =============================================================================
# Load January 2026 sales
jan_sales = con.sql("""
SELECT
so.customer_code,
sol.order_type,
sol.order_number,
sol.line_number,
COALESCE(sol.pretax_subtotal, 0) as amount
FROM cosmos_sync.sales_orders so
JOIN cosmos_sync.sales_order_lines sol
ON so.order_type = sol.order_type
AND so.order_number = sol.order_number
WHERE so.confirmed_code = 'Y'
AND so.order_date >= '20260101'
AND so.order_date <= '20260131'
AND so.delivery_route IN ('A-PT', 'A-PN', 'K1', 'K2')
""").to_polars()
# Filter out deleted order lines and aggregate
jan_sales = (
jan_sales.filter(
~pl.struct(["order_type", "order_number", "line_number"]).map_elements(
lambda x: (x["order_type"], x["order_number"], x["line_number"]) in deleted_order_keys,
return_dtype=pl.Boolean
)
)
.group_by("customer_code")
.agg(pl.col("amount").sum().alias("sales_amount"))
)
# Load January 2026 returns
jan_returns = con.sql("""
SELECT
sr.customer_code,
srl.return_type,
srl.return_number,
srl.line_number,
COALESCE(srl.pretax_subtotal, 0) as amount
FROM cosmos_sync.sales_returns sr
JOIN cosmos_sync.sales_return_lines srl
ON sr.return_type = srl.return_type
AND sr.return_number = srl.return_number
WHERE sr.confirmed_code = 'Y'
AND sr.return_date >= '20260101'
AND sr.return_date <= '20260131'
AND sr.delivery_route IN ('A-PT', 'A-PN', 'K1', 'K2')
""").to_polars()
# Filter out deleted return lines and aggregate
jan_returns = (
jan_returns.filter(
~pl.struct(["return_type", "return_number", "line_number"]).map_elements(
lambda x: (x["return_type"], x["return_number"], x["line_number"]) in deleted_return_keys,
return_dtype=pl.Boolean
)
)
.group_by("customer_code")
.agg(pl.col("amount").sum().alias("returns_amount"))
)
# Combine sales and returns
jan_2026_mtd = (
jan_sales
.join(jan_returns, on="customer_code", how="full", coalesce=True)
.with_columns([
pl.col("sales_amount").fill_null(0),
pl.col("returns_amount").fill_null(0),
])
.with_columns(
(pl.col("sales_amount") - pl.col("returns_amount")).alias("jan_mtd_spend")
)
.select(["customer_code", "jan_mtd_spend"])
)
# Get last order date for all target customers (any route, not just target routes)
# Load order dates with line keys for deletion filtering
last_order_raw = con.sql("""
SELECT
so.customer_code,
so.order_date,
sol.order_type,
sol.order_number,
sol.line_number
FROM cosmos_sync.sales_orders so
JOIN cosmos_sync.sales_order_lines sol
ON so.order_type = sol.order_type
AND so.order_number = sol.order_number
WHERE so.confirmed_code = 'Y'
""").to_polars()
# Filter out deleted records and get max order date per customer
last_order_008 = (
last_order_raw.filter(
~pl.struct(["order_type", "order_number", "line_number"]).map_elements(
lambda x: (x["order_type"], x["order_number"], x["line_number"]) in deleted_order_keys,
return_dtype=pl.Boolean
)
)
.group_by("customer_code")
.agg(pl.col("order_date").max().alias("last_order_date"))
)
# Join MTD spend and last order date to targets
targets_008 = (
targets_008
.join(jan_2026_mtd, on="customer_code", how="left")
.join(last_order_008, on="customer_code", how="left")
.with_columns(pl.col("jan_mtd_spend").fill_null(0))
)
# Calculate adjusted threshold:
# 1. If customer already exceeded threshold: 1.25x current MTD spend
# 2. If customer has 0 MTD spend: threshold / 2
# 3. Otherwise: keep original threshold
# Then: cap at 2.5x MTD spend (if MTD > 0), and multiply by 1.05 for sales tax
targets_008 = targets_008.with_columns(
pl.when(pl.col("jan_mtd_spend") >= pl.col("threshold"))
.then(pl.col("jan_mtd_spend") * 1.25)
.when(pl.col("jan_mtd_spend") == 0)
.then(pl.col("threshold") / 2)
.otherwise(pl.col("threshold"))
.alias("adjusted_threshold_raw")
)
# Cap at 2.5x MTD spend (only when MTD > 0), then multiply by 1.05 for sales tax
targets_008 = targets_008.with_columns(
pl.when(pl.col("jan_mtd_spend") > 0)
.then(
pl.min_horizontal(
pl.col("adjusted_threshold_raw"),
pl.col("jan_mtd_spend") * 2.5
) * 1.05
)
.otherwise(pl.col("adjusted_threshold_raw") * 1.05)
.alias("adjusted_threshold_pretax")
)
# Floor at $3000, then round up to nearest $1000
THRESHOLD_FLOOR = 3000
THRESHOLD_ROUND_UP = 1000
targets_008 = targets_008.with_columns(
(
(pl.max_horizontal(pl.col("adjusted_threshold_pretax"), pl.lit(THRESHOLD_FLOOR)) / THRESHOLD_ROUND_UP)
.ceil() * THRESHOLD_ROUND_UP
)
.cast(pl.Int64)
.alias("adjusted_threshold")
)
# Calculate rebate: 3% of adjusted threshold, rounded down to nearest $50
REBATE_ROUND_DOWN = 50
targets_008 = targets_008.with_columns(
(
(pl.col("adjusted_threshold") * REBATE_PERCENTAGE / REBATE_ROUND_DOWN)
.floor() * REBATE_ROUND_DOWN
)
.cast(pl.Int64)
.alias("rebate_amount")
)
# Count customers with adjusted thresholds
exceeded_count = (targets_008["jan_mtd_spend"] >=
targets_008["threshold"]).sum()
zero_spend_count = (targets_008["jan_mtd_spend"] == 0).sum()
floored_count = (targets_008["adjusted_threshold_pretax"] < THRESHOLD_FLOOR).sum()
print(f"Customers with zero MTD spend (threshold halved): {zero_spend_count:,}")
print(f"Customers who exceeded threshold (adjusted): {exceeded_count:,}")
print(f"Customers floored at ${THRESHOLD_FLOOR:,}: {floored_count:,}")
print(f"\n=== CP-26-008 FINAL RESULTS ===")
print(f"Total CP-26-008 targets: {len(targets_008):,}")
print(f"Total adjusted threshold: ${targets_008['adjusted_threshold'].sum():,.0f}")
print(f"Total rebate amount: ${targets_008['rebate_amount'].sum():,.0f}")
# =============================================================================
# VERIFICATION: Ensure no overlap between CP-26-006 and CP-26-008
# =============================================================================
codes_006 = set(targets_006["customer_code"].to_list())
codes_008 = set(targets_008["customer_code"].to_list())
overlap = codes_006 & codes_008
print(f"\n=== OVERLAP VERIFICATION ===")
if len(overlap) == 0:
print("✓ No overlap: CP-26-006 and CP-26-008 targets are mutually exclusive")
else:
print(f"✗ WARNING: {len(overlap)} customers appear in BOTH campaigns!")
print(f" Overlapping codes: {list(overlap)[:10]}...")