1- using System ;
1+ using Hangfire . Storage . SQLite . Entities ;
2+ using System ;
23using System . Collections . Generic ;
34using System . Text ;
45using System . Threading ;
@@ -30,7 +31,6 @@ public SQLiteJobQueue(HangfireDbContext connection, SQLiteStorageOptions storage
3031 /// <returns></returns>
3132 public IFetchedJob Dequeue ( string [ ] queues , CancellationToken cancellationToken )
3233 {
33- /*
3434 if ( queues == null )
3535 {
3636 throw new ArgumentNullException ( nameof ( queues ) ) ;
@@ -41,51 +41,35 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken)
4141 throw new ArgumentException ( "Queue array must be non-empty." , nameof ( queues ) ) ;
4242 }
4343
44- var fetchConditions = new[]
45- {
46- Query.EQ("FetchedAt", null),
47- Query.LT("FetchedAt", DateTime.UtcNow.AddSeconds(_storageOptions.InvisibilityTimeout.Negate().TotalSeconds))
48- };
49- var fetchConditionsIndex = 0;
50-
5144 JobQueue fetchedJob = null ;
5245 while ( fetchedJob == null )
5346 {
5447 cancellationToken . ThrowIfCancellationRequested ( ) ;
5548
56- var fetchCondition = fetchConditions[fetchConditionsIndex];
57-
5849 foreach ( var queue in queues )
5950 {
6051 var lockQueue = string . Intern ( $ "f13333e1-a0c8-48c8-bf8c-788e89030329_{ queue } ") ;
6152 lock ( lockQueue )
6253 {
63- fetchedJob = _connection.JobQueue.FindOne(Query.And(fetchCondition, Query.EQ("Queue", queue)));
54+ fetchedJob = _dbContext . JobQueueRepository . FirstOrDefault ( _ => _ . Queue == queue &&
55+ ( _ . FetchedAt == DateTime . MinValue ) ) ;
6456
6557 if ( fetchedJob != null )
6658 {
6759 fetchedJob . FetchedAt = DateTime . UtcNow ;
68- _connection.JobQueue.Update(fetchedJob);
60+ _dbContext . Database . Update ( fetchedJob ) ;
61+
6962 break ;
7063 }
7164 }
7265 }
7366
74- if (fetchedJob == null && fetchConditionsIndex == fetchConditions.Length - 1)
75- {
76- // ...and we are out of fetch conditions as well.
77- // Wait for a while before polling again.
78- cancellationToken.WaitHandle.WaitOne(_storageOptions.QueuePollInterval);
79- cancellationToken.ThrowIfCancellationRequested();
80- }
81-
82- // Move on to next fetch condition
83- fetchConditionsIndex = (fetchConditionsIndex + 1) % fetchConditions.Length;
67+ // Wait for a while before polling again.
68+ cancellationToken . WaitHandle . WaitOne ( _storageOptions . QueuePollInterval ) ;
69+ cancellationToken . ThrowIfCancellationRequested ( ) ;
8470 }
8571
86- return new LiteDbFetchedJob(_connection, fetchedJob.Id, fetchedJob.JobId, fetchedJob.Queue);
87- */
88- return null ;
72+ return new SQLiteFetchedJob ( _dbContext , fetchedJob . Id , fetchedJob . JobId , fetchedJob . Queue ) ;
8973 }
9074
9175 /// <summary>
@@ -95,13 +79,11 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken)
9579 /// <param name="jobId"></param>
9680 public void Enqueue ( string queue , string jobId )
9781 {
98- /*
99- _connection.JobQueue.Insert(new JobQueue
82+ _dbContext . Database . Insert ( new JobQueue
10083 {
10184 JobId = int . Parse ( jobId ) ,
10285 Queue = queue
10386 } ) ;
104- */
10587 }
10688 }
10789}
0 commit comments