Skip to content

Commit e96ec7d

Browse files
committed
Refactor KafkaBatchUpdatePoints methods to enhance error handling, improve submission filtering, and aggregate results for point resending operations
1 parent 623f9ce commit e96ec7d

File tree

1 file changed

+106
-66
lines changed

1 file changed

+106
-66
lines changed

app/models/kafka_batch_update_points.rb

Lines changed: 106 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -3,103 +3,143 @@
33
class KafkaBatchUpdatePoints < ApplicationRecord
44
belongs_to :course
55

6-
# Optionally restrict to submissions within `interval` (ActiveSupport::Duration)
7-
# e.g. send_points_again_for_user_and_course(course_id, user_id, interval: 2.weeks)
6+
# Re-enqueue user progress and points updates for a given (course, user) pair.
7+
#
8+
# Only considers exercises the user has submitted in that course.
9+
# Optionally limits to submissions within `interval` (ActiveSupport::Duration).
10+
#
11+
# Returns:
12+
# { status: :ok, course_id:, user_id:, progress_enqueued:, user_points_enqueued: }
13+
# or { status: :skip_<reason>, course_id:, user_id: }
814
def self.send_points_again_for_user_and_course(course_id, user_id, interval: nil)
915
if interval && !interval.is_a?(ActiveSupport::Duration)
10-
raise ArgumentError,
11-
'Invalid interval: expected an ActiveSupport::Duration (e.g. 1.week, 3.days). ' \
12-
'Call like: KafkaBatchUpdatePoints.send_points_again_for_user_and_course(42, 7, interval: 1.week)'
16+
raise ArgumentError, 'Invalid interval: expected an ActiveSupport::Duration (e.g. 1.week, 3.days)'
1317
end
1418

15-
course = Course.find_by!(id: course_id)
19+
course = Course.find_by(id: course_id)
20+
return { status: :skip_course_missing, course_id: course_id, user_id: user_id } if course.nil?
21+
return { status: :skip_no_moocfi_id, course_id: course.id, user_id: user_id } if course.moocfi_id.blank?
1622

17-
# Skip if course doesn't exist or has no moocfi_id
18-
if course.moocfi_id.nil? || course.moocfi_id.blank?
19-
puts "⚠️ Skipping course_id=#{course_id} (missing or empty moocfi_id)"
20-
return
21-
end
22-
23-
submissions_scope = Submission
24-
.where(course_id: course.id, user_id: user_id)
25-
.where.not(exercise_name: [nil, ''])
26-
submissions_scope = submissions_scope.where(created_at: (Time.current - interval)..Time.current) if interval
27-
28-
submitted_names = submissions_scope.distinct.pluck(:exercise_name)
29-
if submitted_names.empty?
30-
puts "ℹ️ No submissions found for user_id=#{user_id} in course_id=#{course.id}"
31-
return
32-
end
23+
scope = Submission.where(course_id: course.id, user_id: user_id).where.not(exercise_name: [nil, ''])
24+
scope = scope.where(created_at: (Time.current - interval)..Time.current) if interval
3325

34-
exercise_ids = Exercise
35-
.where(course_id: course.id, name: submitted_names)
36-
.distinct
37-
.pluck(:id)
26+
names = scope.distinct.pluck(:exercise_name)
27+
return { status: :skip_no_submissions, course_id: course.id, user_id: user_id } if names.empty?
3828

39-
if exercise_ids.empty?
40-
puts "⚠️ No exercises matched submission names for user_id=#{user_id} in course_id=#{course.id}"
41-
return
42-
end
29+
exercise_ids = Exercise.where(course_id: course.id, name: names).distinct.pluck(:id)
30+
return { status: :skip_no_matching_exercises, course_id: course.id, user_id: user_id } if exercise_ids.empty?
4331

4432
transaction do
4533
create!(course_id: course.id, user_id: user_id, realtime: false, task_type: 'user_progress')
46-
4734
exercise_ids.each do |exercise_id|
4835
create!(course_id: course.id, user_id: user_id, exercise_id: exercise_id, realtime: false, task_type: 'user_points')
4936
end
5037
end
5138

52-
puts "✅ Resent points for user_id=#{user_id}, course_id=#{course.id} (#{exercise_ids.size} exercises)"
39+
{ status: :ok, course_id: course.id, user_id: user_id, progress_enqueued: 1, user_points_enqueued: exercise_ids.size }
5340
end
5441

42+
# Re-enqueue points for all courses a user has submissions in.
43+
#
44+
# Accepts optional `interval` (ActiveSupport::Duration) to restrict submissions.
45+
# Returns aggregate counts by status and total enqueued items.
5546
def self.send_points_again_for_user_and_all_courses(user_id, interval: nil)
5647
if interval && !interval.is_a?(ActiveSupport::Duration)
57-
raise ArgumentError,
58-
'Invalid interval: expected an ActiveSupport::Duration (e.g. 1.week, 3.days). ' \
59-
'Call like: KafkaBatchUpdatePoints.send_points_again_for_user_and_all_courses(7, interval: 3.days)'
48+
raise ArgumentError, 'Invalid interval: expected an ActiveSupport::Duration (e.g. 1.week, 3.days)'
6049
end
6150

62-
course_ids = Submission.where(user_id: user_id).distinct.pluck(:course_id)
63-
puts "🔁 Processing #{course_ids.size} courses for user_id=#{user_id}..."
64-
65-
course_ids.each do |course_id|
66-
send_points_again_for_user_and_course(course_id, user_id, interval: interval)
51+
totals = {
52+
ok: 0, skip_course_missing: 0, skip_no_moocfi_id: 0, skip_no_submissions: 0, skip_no_matching_exercises: 0,
53+
progress_enqueued: 0, user_points_enqueued: 0, processed_courses: 0
54+
}
55+
56+
Submission.where(user_id: user_id).distinct.pluck(:course_id).each do |course_id|
57+
r = send_points_again_for_user_and_course(course_id, user_id, interval: interval)
58+
totals[:processed_courses] += 1
59+
case r[:status]
60+
when :ok
61+
totals[:ok] += 1
62+
totals[:progress_enqueued] += r[:progress_enqueued]
63+
totals[:user_points_enqueued] += r[:user_points_enqueued]
64+
else
65+
totals[r[:status]] += 1 if totals.key?(r[:status])
66+
end
6767
end
68+
69+
totals
6870
end
6971

70-
# Requires an ActiveSupport::Duration, e.g. 1.week, 3.days, etc.
71-
def self.resend_points_for_recent_submissions(duration)
72+
# Re-enqueue points for all (course, user) pairs with submissions in the last `duration`.
73+
#
74+
# Prints a per-batch summary for each batch (`batch_size` pairs per batch),
75+
# then prints overall totals at the end.
76+
# Wraps the whole operation in one transaction; if it aborts, prints a clear message.
77+
# Returns overall totals hash.
78+
def self.resend_points_for_recent_submissions(duration, batch_size: 1000)
7279
unless duration.is_a?(ActiveSupport::Duration)
73-
raise ArgumentError,
74-
'Invalid argument: expected an ActiveSupport::Duration (e.g. 1.week, 3.days). ' \
75-
'Call it like: KafkaBatchUpdatePoints.resend_points_for_recent_submissions(1.week)'
80+
raise ArgumentError, 'Invalid argument: expected an ActiveSupport::Duration (e.g. 1.week, 3.days)'
7681
end
7782

78-
pairs = Submission
79-
.where(created_at: (Time.current - duration)..Time.current)
80-
.select(:course_id, :user_id)
81-
.distinct
82-
.pluck(:course_id, :user_id)
83-
84-
puts "🔍 Found #{pairs.size} (course_id, user_id) pairs in the last #{duration.inspect}"
85-
86-
processed = 0
87-
skipped = 0
83+
pairs = Submission.where(created_at: (Time.current - duration)..Time.current)
84+
.select(:course_id, :user_id)
85+
.distinct
86+
.pluck(:course_id, :user_id)
87+
88+
overall = {
89+
ok: 0, skip_course_missing: 0, skip_no_moocfi_id: 0, skip_no_submissions: 0, skip_no_matching_exercises: 0,
90+
progress_enqueued: 0, user_points_enqueued: 0, total_pairs: pairs.size
91+
}
92+
93+
begin
94+
transaction do
95+
pairs.each_slice(batch_size).with_index(1) do |batch_pairs, batch_idx|
96+
batch = {
97+
ok: 0, skip_course_missing: 0, skip_no_moocfi_id: 0, skip_no_submissions: 0, skip_no_matching_exercises: 0,
98+
progress_enqueued: 0, user_points_enqueued: 0, processed_in_batch: batch_pairs.size
99+
}
100+
101+
batch_pairs.each do |course_id, user_id|
102+
r = send_points_again_for_user_and_course(course_id, user_id, interval: duration)
103+
case r[:status]
104+
when :ok
105+
batch[:ok] += 1
106+
batch[:progress_enqueued] += r[:progress_enqueued]
107+
batch[:user_points_enqueued] += r[:user_points_enqueued]
108+
else
109+
batch[r[:status]] += 1 if batch.key?(r[:status])
110+
end
111+
end
88112

89-
transaction do
90-
pairs.each do |course_id, user_id|
91-
before = KafkaBatchUpdatePoints.count
92-
send_points_again_for_user_and_course(course_id, user_id, interval: duration)
93-
after = KafkaBatchUpdatePoints.count
94-
95-
if after == before
96-
skipped += 1
97-
else
98-
processed += 1
113+
# fold batch into overall
114+
overall.keys.each do |k|
115+
next if k == :total_pairs
116+
overall[k] += batch[k] if batch.key?(k)
99117
end
118+
119+
skipped = batch.values_at(:skip_course_missing, :skip_no_moocfi_id, :skip_no_submissions, :skip_no_matching_exercises).sum
120+
puts(
121+
"Batch #{batch_idx} (size=#{batch[:processed_in_batch]}): " \
122+
"ok=#{batch[:ok]}, skipped=#{skipped} " \
123+
"(missing_course=#{batch[:skip_course_missing]}, no_moocfi=#{batch[:skip_no_moocfi_id]}, " \
124+
"no_submissions=#{batch[:skip_no_submissions]}, no_match=#{batch[:skip_no_matching_exercises]}), " \
125+
"enqueued(progress=#{batch[:progress_enqueued]}, user_points=#{batch[:user_points_enqueued]})"
126+
)
100127
end
128+
end
129+
130+
# Only printed if transaction committed
131+
total_skipped = overall.values_at(:skip_course_missing, :skip_no_moocfi_id, :skip_no_submissions, :skip_no_matching_exercises).sum
132+
puts(
133+
"TOTALS: pairs=#{overall[:total_pairs]}, ok=#{overall[:ok]}, skipped=#{total_skipped} " \
134+
"(missing_course=#{overall[:skip_course_missing]}, no_moocfi=#{overall[:skip_no_moocfi_id]}, " \
135+
"no_submissions=#{overall[:skip_no_submissions]}, no_match=#{overall[:skip_no_matching_exercises]}), " \
136+
"enqueued(progress=#{overall[:progress_enqueued]}, user_points=#{overall[:user_points_enqueued]})"
137+
)
138+
rescue => e
139+
puts "⛔ Transaction aborted in resend_points_for_recent_submissions: #{e.class}: #{e.message}"
140+
raise
101141
end
102142

103-
puts "✅ Done! Processed #{processed} pairs, skipped #{skipped} (#{pairs.size} total)"
143+
overall
104144
end
105145
end

0 commit comments

Comments
 (0)