After reading the classic paper about Skip-list, I tried to implement it by myself. But then I found a line of pseudo-code in the “Delete” function that I couldn’t understand:
Seems all the elements in “update” will point to “x”, so why do we need to check here? Maybe I can ignore this checking. Now here comes a part of my code:
def erase(self, num: int) -> bool:
update = Node(-1)
curr = self._head
for level in range(MAX_LEVEL-1, -1, -1):
while curr._forward[level] and curr._forward[level]._key < num:
curr = curr._forward[level]
update._forward[level] = curr
curr = curr._forward[0]
if curr == None or curr._key != num:
return False
curr._count -= 1
if curr._count > 0:
return True
for level in range(MAX_LEVEL):
update._forward[level]._forward[level] = curr._forward[level]
del curr
return True
But unfortunately, it failed for the test case. In the debugging process, I realized that not all elements in “update” will point to “x”. Le’ts just take the figure-1 from the paper as my example:
As above, imaging we are deleting node “17”. The “forward[1]” (index start from 0, this is the difference of my code with the paper) of node “9” is pointing to node “17” so it should be redirect to node “25”. But the “forward[3]” of node “6” is pointing to “NIL”, and shouldn’t be redirected to node “25” because “node6._forward[3]” didn’t point to node “17”. The situation is the same for “forward[4]” and beyond, of node “6”.
This is why the last few lines of my code should be:
......
for level in range(MAX_LEVEL):
if update._forward[level]._forward[level] != curr:
break
update._forward[level]._forward[level] = curr._forward[level]
del curr
return True
Just like the pseudo-code in the paper!
I am really respect to these academic guys — everytime I thought they were wrong is actually I missed something.
This paper introduced a method to extract only segments with bird sound from an audio file. Since the paper didn’t give any code, I started to write it by myself.
Here is the Python implementation:
import cv2
import time
import torch
import librosa
import soundfile as sf
import numpy as np
from torchlibrosa.stft import LogmelFilterBank, Spectrogram
class CFG:
n_fft = 2048
hop_length = 512
sample_rate = 32000
n_mels = 64
fmin = 150
fmax = 150000
class SignalExtractor:
def __init__(self):
self.spectrogram_extractor = Spectrogram(
n_fft=CFG.n_fft, hop_length=CFG.hop_length, win_length=CFG.n_fft, window="hann",
center=True, pad_mode="reflect", freeze_parameters=True)
# Logmel feature extractor
self.logmel_extractor = LogmelFilterBank(sr=CFG.sample_rate, n_fft=CFG.n_fft,
n_mels=CFG.n_mels, fmin=CFG.fmin, fmax=CFG.fmax, ref=1.0, amin=1e-10, top_db=None, freeze_parameters=True)
self.factors = [2.0, 1.8, 1.6, 1.4, 1.2, 1.1]
self.kernel_size = 15
self.sn_threshold = 0.2
def extract(self, input):
x = torch.from_numpy(input)
x = x[None, :].float()
x = self.spectrogram_extractor(x)
x = self.logmel_extractor(x)
x = x.squeeze(0).squeeze(0)
x = x.permute(1, 0).numpy()
x = x - np.amin(x)
for factor in self.factors:
sound, sn_ratio = self._factor_extract(input, x, factor)
if sn_ratio >= self.sn_threshold:
break
return sound, sn_ratio
def _factor_extract(self, input, x, factor: float):
rows, cols = x.shape
row_median = np.median(x, axis=1)
row_median_matrix = np.tile(row_median, (cols, 1)).T * factor
col_median = np.median(x, axis=0)
col_median_matrix = np.tile(col_median, (rows, 1)) * factor
y = x > row_median_matrix
z = x > col_median_matrix
res = np.logical_and(y, z) + np.zeros(x.shape)
kernel = np.ones((self.kernel_size, self.kernel_size), np.uint8)
img = cv2.dilate(res, kernel, iterations=1)
indicator = np.sum(img, axis=0)
chunk_size = input.shape[0] // indicator.shape[0]
sounds = []
for index, chunk in enumerate(indicator):
if chunk > 0:
sounds.append(input[index*chunk_size:(index+1)*chunk_size])
if len(sounds) <= 0:
return None, 0.0
sound = np.concatenate(sounds)
return sound, sound.shape[0]/input.shape[0]
The implementation has some differences from the method in the paper:
I didn’t use erosion since dilation is good enough for picking up the bird-sound segment
three times bigger than median is too strict for most audio files, so I use an array of ratios. When the 2.0 ratio couldn’t pick up any bird sound, the code will automatically try a 1.8 ratio etc.
I used a big kernel (15, 15) for dilation since it works well in my samples
Traceback (most recent call last):
File "reviewjpgs_optimaztion_testing.py", line 27, in <module>
import intel_extension_for_pytorch as ipex
File "/home/hero/.pyenv/versions/3.8.12/lib/python3.8/site-packages/intel_extension_for_pytorch/__init__.py", line 11, in <module>
from .cpu import _cpu_isa
File "/home/hero/.pyenv/versions/3.8.12/lib/python3.8/site-packages/intel_extension_for_pytorch/cpu/__init__.py", line 1, in <module>
from . import runtime
File "/home/hero/.pyenv/versions/3.8.12/lib/python3.8/site-packages/intel_extension_for_pytorch/cpu/runtime/__init__.py", line 3, in <module>
from .multi_stream import MultiStreamModule, get_default_num_streams, \
File "/home/hero/.pyenv/versions/3.8.12/lib/python3.8/site-packages/intel_extension_for_pytorch/cpu/runtime/multi_stream.py", line 4, in <module>
import intel_extension_for_pytorch._C as core
ImportError: /home/hero/.pyenv/versions/3.8.12/lib/python3.8/site-packages/intel_extension_for_pytorch/lib/libintel-ext-pt-cpu.so: undefined symbol: _ZNK3c1010TensorImpl22is_strides_like_customENS_12MemoryFormatE
The answer is quite tricky: need to install the IPEX package with the same version of PyTorch.
After the testing of both torch.jit.trace and this IPEX, we found out that `torch.jit.trace` could boost the performance of prediction significantly but IPEX could not.
I want my Python script to receive one message from a PubSub topic and then go on to other work. The code is learned from an example of the GCP document:
with subscriber:
# The subscriber pulls a specific number of messages. The actual
# number of messages pulled may be smaller than max_messages.
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
retry=retry.Retry(deadline=300),
)
if len(response.received_messages) == 0:
return
The problem is that it will receive empty messages, meaning that “len(response.received_messages)” is zero.
Where do these empty messages come from? Here is the answer:
Once a message is sent to a subscriber, the subscriber must either acknowledge or drop the message. A message is considered outstanding once it has been sent out for delivery and before a subscriber acknowledges it.
My solution is just to wait until receiving a non-empty message:
with subscriber:
# The subscriber pulls a specific number of messages. The actual
# number of messages pulled may be smaller than max_messages.
while True:
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
retry=retry.Retry(deadline=300),
)
if len(response.received_messages) > 0:
break
My first idea is depth-first-search: iterate all people, try to give them different hats. The solution got TLE (Time Limit Exceeded). Then as a hint from discussion forum, I started to iterate hat (instead of people), try to give them different people. The solution also got TLE (even I used lru_cache for function):
from collections import defaultdict
class Solution:
def numberWays(self, hats: List[List[int]]) -> int:
hp = defaultdict(set)
for index, hat in enumerate(hats):
for _id in hat:
hp[_id].add(index)
hp = [people for people in hp.values()]
@functools.lru_cache(None)
def dfs(start, path) -> int:
if len(path) == len(hats):
return 1
if start == len(hp):
return 0
total = 0
for person in (hp[start] - set(path)):
total += dfs(start + 1, tuple(list(path) + [person]))
total += dfs(start + 1, path)
return total % (10**9 + 7)
return dfs(0, tuple())
Using list as data structure to record visited node is not efficient enough in this case. Since there will be no more than 10 people, the most efficient data structure to record visited people is bits.
My final solution is still using dfs (by using lru_cache, it is also a dynamic-programming):
from collections import defaultdict
class Solution:
def numberWays(self, hats: List[List[int]]) -> int:
hp = defaultdict(set)
for index, hat in enumerate(hats):
for _id in hat:
hp[_id].add(index)
hp = [people for people in hp.values()]
@functools.lru_cache(None)
def dfs(start, mask) -> int:
if bin(mask).count('1') == len(hats):
return 1
if start == len(hp):
return 0
total = 0
for person in hp[start]:
if (1 << person) & mask > 0:
continue
mask |= 1 << person
total += dfs(start + 1, mask)
mask ^= 1 << person
total += dfs(start + 1, mask)
return total % (10**9 + 7)
return dfs(0, 0)
To solve Leetcode #1675, I wrote the below code with the help of hints:
import sys
import heapq
class Solution:
def minimumDeviation(self, nums: List[int]) -> int:
n = len(nums)
heapq.heapify(nums)
_max = max(nums)
ans = max(nums) - min(nums)
while True:
item = heapq.heappop(nums)
if item % 2 == 1:
heapq.heappush(nums, item * 2)
_max = max(_max, item * 2)
ans = min(ans, _max - heapq.nsmallest(1, nums)[0])
else:
heapq.heappush(nums, item)
break
print("stage1:", nums)
nums = [-item for item in nums]
heapq.heapify(nums)
_max = max(nums)
while True:
item = heapq.heappop(nums)
if item % 2 == 0:
heapq.heappush(nums, item // 2)
_max = max(_max, item // 2)
ans = min(ans, _max - heapq.nsmallest(1, nums)[0])
else:
break
return ans
I know, the code looks quite messy. But the more annoying problem is: it exceeded the time limit.
After learning the solutions from these smart guys, I realized how stupid I am — I can just use nums[0] instead of heapq.nsmallest(1, nums)[0] to get the smallest element in a heap, just as the official document said.
Then I just change two lines of my code and it passed all the test cases in time:
import sys
import heapq
class Solution:
def minimumDeviation(self, nums: List[int]) -> int:
n = len(nums)
heapq.heapify(nums)
_max = max(nums)
ans = max(nums) - min(nums)
while True:
item = heapq.heappop(nums)
if item % 2 == 1:
heapq.heappush(nums, item * 2)
_max = max(_max, item * 2)
ans = min(ans, _max - nums[0])
else:
heapq.heappush(nums, item)
break
print("stage1:", nums)
nums = [-item for item in nums]
heapq.heapify(nums)
_max = max(nums)
while True:
item = heapq.heappop(nums)
if item % 2 == 0:
heapq.heappush(nums, item // 2)
_max = max(_max, item // 2)
ans = min(ans, _max - nums[0])
else:
break
return ans
The first intuitive idea that jumps out of my mind after taking a look at LeetCode #127 is using the graph algorithm. But for building the graph first, I need to traverse the wordList by O(n2) times.
Here comes the time complexity analysis: the length of the wordList is about 5000, O(n2) means 5000*5000=25*106. For a python script in LeetCode, it will cost about 1 second for every 106 operations. Thus 25*106 will cost about 25 seconds, which is too long for a LeetCode question.
Therefore the best method to build a graph is not to traverse the wordList multiple times, but to just iterate all lower-case alphabets (be aware of the constraints beginWord, endWord, and wordList[i] consist of lowercase English letters.). By just iterating lower-case alphabets, I can reduce time to 260*5000=1.3*106 (the max length of words in wordList is 10).
The code below also uses my old trick of visited nodes.
from collections import defaultdict
class Solution:
def ladderLength(self, beginWord: str, endWord: str, wordList: List[str]) -> int:
words_set = set(wordList)
conns = defaultdict(set)
for word in wordList + [beginWord]:
for index in range(len(word)):
conns[word] |= {word[:index] + cand + word[index+1:] for cand in "abcdefghijklmnopqrstuvwxyz" if cand != word[index] and word[:index] + cand + word[index+1:] in words_set}
# bfs
queue = {beginWord}
already = set()
ans = 1
while queue:
new_queue = set()
for node in queue:
for _next in conns[node]:
if _next == endWord:
return ans + 1
new_queue.add(_next)
already |= queue
queue = new_queue - already
ans += 1
return 0
The popular solution for LeetCode #494 is dynamic programming. But my first idea is Divide and Conquer. Although it’s not very fast, it’s another idea:
from collections import Counter
class Solution:
def get_results(self, nums: List[int]) -> Counter:
layer = [nums[0], -nums[0]]
for num in nums[1:]:
new_layer = []
for item in layer:
for val in [-num, num]:
new_layer.append(item + val)
layer = new_layer
return Counter(layer)
def findTargetSumWays(self, nums: List[int], target: int) -> int:
n = len(nums)
if n == 1:
return [0, 1][nums[0] == target or -nums[0] == target]
half = n // 2
left = self.get_results(nums[:half])
right = self.get_results(nums[half:])
ans = 0
for lkey, lcnt in left.items():
rcnt = right[target - lkey]
ans += rcnt * lcnt
return ans
The first idea that jumped out of my mind was using Sets to track two nodes and pick up the first intersection node between these two Sets. Hence came out the first solution:
from collections import defaultdict
class Solution:
def bfs(self, node1: int, node2: int, conns, length) -> int:
set1 = {node1}
set2 = {node2}
step = 0
while step <= length:
inter = set1 & set2
if len(inter) > 0:
return min(list(inter))
new_set1 = set()
new_set2 = set()
for node in set1:
new_set1 |= conns[node]
for node in set2:
new_set2 |= conns[node]
if len(new_set1 - set1) <= 0 and len(new_set2 - set2) <= 0:
return -1
set1 |= new_set1
set2 |= new_set2
step += 1
def closestMeetingNode(self, edges: List[int], node1: int, node2: int) -> int:
conns = defaultdict(set)
for index, edge in enumerate(edges):
if edge >= 0:
conns[index].add(edge)
return self.bfs(node1, node2, conns, len(edges))
I am pretty satisfied with this the simplicity of the above code. But unfortunately, it exceeded the time limit.
Sometimes we might not need to start a new solution before optimising the first one. Maybe I don’t need to use Set since they are too expensive in Python. Using an array to track all visited nodes instead and meeting a VISITED node means “intersection”. To distinguish visiting from two different nodes, I let Node1 mark “1” in the array and Node2 mark “2”. Then comes out my second solution. It’s a little longer but uses arrays instead of Sets:
class Solution:
def closestMeetingNode(self, edges: List[int], node1: int, node2: int) -> int:
if node1 == node2:
return node1
n = len(edges)
visited = [0] * n
step = 0
visited[node1] = 1
visited[node2] = 2
while True:
ans = []
old_node1 = node1
nxt = edges[node1]
if nxt >= 0:
if visited[nxt] == 0:
visited[nxt] = 1
node1 = nxt
elif visited[nxt] == 2:
ans.append(nxt)
old_node2 = node2
nxt = edges[node2]
if nxt >= 0:
if visited[nxt] == 0:
visited[nxt] = 2
node2 = nxt
elif visited[nxt] == 1:
ans.append(nxt)
if len(ans) > 0:
return min(ans)
if old_node1 == node1 and old_node2 == node2:
return -1
return -1
As above, I use “old_node1” and “old_node2” to check for a dead loop. It beats 97% on time-spending. Not bad.